65.9K
CodeProject 正在变化。 阅读更多。
Home

Crossbar.io 快速概览

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (15投票s)

2017年4月24日

CPOL

8分钟阅读

viewsIcon

19718

对 crossbar.io (Autobahn 消息平台) 的一瞥

引言

不久前,有人在 www.codeproject.com 上发布了另一篇关于 SignalR 的文章,我曾说过,你们应该看看这个东西:crossbar.io。作为一个不愿“言不由衷”的人(当然是在言行一致的基础上),我决定付诸行动,写一篇关于 crossbar.io 的小文章。

 

那么,crossbar.io 是什么?简单来说,它是一个消息代理,支持多种语言绑定,所有这些语言绑定应该能够无缝地相互通信。 

以下是 crossbar.io 的开发者对他们自己产品的介绍:

Crossbar.io 是一个面向分布式和微服务应用程序的开源网络平台。它实现了开放的 Web 应用程序消息协议 (WAMP),功能丰富、可扩展、健壮且安全。让 Crossbar.io 来处理消息传递的难点,这样您就可以专注于应用程序的功能。

 

 

所以,能多告诉我一些关于 Crossbar.IO 的信息吗?

crossbar.io 声称拥有以下语言绑定:

  • 浏览器中的 JavaScript *
  • Node.js 中的 JavaScript *
  • Python
  • PHP
  • Java
  • C#
  • Erlang

 

从表面上看,这似乎是一个非常不错的库,可以允许许多不同的、不相关的应用程序相互通信。

值得指出的是,没有星号 (*) 的语言绑定是由社区编写的,它们**不由** crossbar.io 的开发者维护。因此,您可能会发现官方维护的 Crossbar.io 语言绑定和这些社区编写的绑定之间存在差异。

不幸的是,生活就是这样。使用其他具有多语言绑定的消息传递框架(如 RabbitMQKafka(我们稍后会详细介绍))时,情况也是如此。

 

 

代码在哪里?

本文所使用的演示应用程序的所有代码都可以在我的 GitHub 账户上找到: https://github.com/sachabarber/CrossbarioDemo

 

安装

本节将指导您完成安装过程

 

安装 Python

  1. 下载 python,将其安装到默认位置,而不是“Program Files”,确保“Add to path”设置为 YES。
  2. 在安装 python 的目录下打开一个**命令提示符**。
  3. 在 python 命令提示符中使用 **pip**:pip install pypiwin32
  4. 在 python 命令提示符中使用 **pip**:pip install crossbar


验证 Crossbar.io 安装

1. 验证安装的命令提示符:which crossbar 

点击查看大图

 

2. 验证安装的命令提示符:crossbar version

点击查看大图

 

 

 

演示

本节将在我们深入演示代码之前,概述一些核心的 crossbar.io 概念。

核心概念

crossbar 节点

crossbar.io 有“节点”这个概念。“节点”基本上是一个正在运行的 crossbar.exe 进程实例,该实例正在运行一个活动的配置。通常每个机器/虚拟机上会有一个节点。

.crossbar 文件夹

为了配置正在运行的 crossbar 节点,有一个它会查找的特殊文件夹,称为“.crossbar”。在此文件夹中,有许多文件,其中最重要的文件是“config.json”,它用于实际配置 crossbar.exe 进程启动时的行为。这个文件夹是神奇的/特殊的,将在您使用 crossbar start 命令时被检查(尽管我们实际上不想这样做,但稍后会详细介绍)。

当我们发出 crossbar start 命令时,crossbar.exe 将拾取它在 .crossbar 名称指定的文件夹中找到的配置文件。

那么,“config.json”文件是什么样的呢?

这是 .NET 演示中附带的一个文件: https://github.com/crossbario/crossbar-examples,具体来说是 https://github.com/crossbario/crossbar-examples/tree/master/hello/csharp 演示(crossbar.io 网站说您应该能够使用命令行来为您选择的语言生成一个新项目,但实际上您不能,他们发现维护模板工作量太大,所以您需要从示例文件夹中获取它们)。

{
    "version": 2,
    "controller": {},
    "workers": [
        {
            "type": "router",
            "options": {
                "pythonpath": [
                    ".."
                ]
            },
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": false,
                                        "publisher": false
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ]
                }
            ],
            "transports": [
                {
                    "type": "web",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8080
                    },
                    "paths": {
                        "/": {
                            "type": "static",
                            "directory": "../src/Web"
                        },
                        "ws": {
                            "type": "websocket"
                        }
                    }
                }
            ]
        },
        {
            "type": "guest",
            "executable": "Hello.exe",
            "arguments": [
                "ws://127.0.0.1:8080/ws",
                "realm1"
            ],
            "options": {
                "workdir": "../src/DotNet/Hello/bin/Debug/"
            }
        }
    ]
}

 

这个配置文件中有两个实际的工作进程:

  • 一个网站
  • 一个 .NET 应用程序 (Hello.exe)

 

所以,让我们仔细想想,我们有这个配置文件,并且我们应该用这个配置文件来运行 crossbar.exe,它会启动 Hello.exe 和一个网站。嗯,如果 Crossbar.exe 运行 Hello.exe,我该如何调试 Hello.exe 呢?

在我看来,这与我们想要的效果正好相反。如何调试呢?有一些相关的帖子:

我决定采取另一种方法。我认为使用 Web 开发工具(Chrome 中的 F12)来调试 JavaScript 没问题,但对于 .NET,我想使用 Visual Studio。所以我让我的 .NET 代码启动 Crossbar.io 作为一个额外的进程,我在代码中配置 .NET 工作进程,并向新的 Crossbar.exe 进程传递一个修改过的上述文件版本,从中删除了 .NET Hello.exe 的配置。

我必须说,如果我使用的是 RabbitMQKafka,这本将是轻而易举的事。我更喜欢嵌入式 API,而不是将我的代码提交给其他东西来运行。当然,您必须能够自己配置事物,但为什么一开始不能就这样做呢?

 

RPC

crossbar.io 允许工作进程对不同语言的工作进程进行远程过程调用。

 

发布/订阅

crossbar.io 允许工作进程在不同语言的工作进程之间发布和接收消息。

 

演示代码

演示代码将展示一个单独的 .NET 工作进程和一个单独的 JavaScript 工作进程,它们将相互发布/订阅和进行 RPC 调用。

 

.NET 代码

以下是用于发布者/订阅者和 RPC 可调用 .NET 程序的全部 .NET 代码。

using System;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using WampSharp.Core.Listener;
using WampSharp.V2;
using WampSharp.V2.Client;
using WampSharp.V2.Core.Contracts;
using WampSharp.V2.Realm;
using WampSharp.V2.Rpc;
using System.Diagnostics;

namespace Hello
{
    public class Program
    {
        static void Main(string[] args)
        {

#if DEBUG
            //use this for debugging
            Task.Factory.StartNew(() =>
            {
                Process process = new Process();
                process.StartInfo.FileName = @"c:\Users\sacha\AppData\Local\Programs\Python\Python36-32\Scripts\crossbar.exe";
                process.StartInfo.Arguments = @"start --cbdir C:\Users\sacha\Desktop\CrossbarIOExample\CrossBarDotNetExample\.crossbar";
                process.StartInfo.WindowStyle = ProcessWindowStyle.Maximized;
                process.Start();
                // Waits here for the process to exit, but since this is 
                // dedicated thread main thread is not blocked
                process.WaitForExit();

            }, TaskCreationOptions.LongRunning);

            //give CrossBar.io process time to start
            System.Threading.Thread.Sleep(1000 * 20);
#endif


            Console.WriteLine("WampSharp Hello demo starting ...");

            string wsuri = "ws://127.0.0.1:8080/ws";
            string realm = "realm1";
            if (args.Length > 0) {
               wsuri = args[0];
               if (args.Length > 1) {
                  realm = args[1];
               }
            }
            
            Task runTask = Run(wsuri, realm);

            Console.ReadLine();
        }

        private async static Task Run(string wsuri, string realm)
        {
            Console.WriteLine("Connecting to {0}, realm {1}", wsuri, realm);

            DefaultWampChannelFactory factory = new DefaultWampChannelFactory();

            IWampChannel channel =
                factory.CreateJsonChannel(wsuri, realm);

            IWampClientConnectionMonitor monitor = channel.RealmProxy.Monitor;
            
            monitor.ConnectionBroken += OnClose;
            monitor.ConnectionError += OnError;

            await channel.Open().ConfigureAwait(false);

            IWampRealmServiceProvider services = channel.RealmProxy.Services;

            // SUBSCRIBE to a topic and receive events
            ISubject<string> helloSubject = 
                services.GetSubject<string>("com.example.onhello");

            IDisposable subscription =
                helloSubject.Subscribe
                    (x => Console.WriteLine("event for 'onhello' received: {0}", x));

            Console.WriteLine("subscribed to topic 'onhello'");


            // REGISTER a procedure for remote calling
            Add2Service callee = new Add2Service();

            await services.RegisterCallee(callee)
                .ConfigureAwait(false);
            
            Console.WriteLine("procedure add2() registered");


            // PUBLISH and CALL every second... forever
            ISubject<int> onCounterSubject =
                services.GetSubject<int>("com.example.oncounter");

            ISubject<int> onDotNetCounterSubject =
              services.GetSubject<int>("com.example.ondotnetcounter");


            IMul2Service proxy =
                services.GetCalleeProxy<IMul2Service>();

            int counter = 0;

            while (true)
            {
                // PUBLISH an event
                onCounterSubject.OnNext(counter);
                Console.WriteLine("published to 'oncounter' with counter {0}", counter);

                onDotNetCounterSubject.OnNext(counter);
                Console.WriteLine("published to 'ondotnetcounter' with counter {0}", counter);
                counter++;


                // CALL a remote procedure
                try
                {
                    int result = await proxy.Multiply(counter, 3)
                        .ConfigureAwait(false);

                    Console.WriteLine("mul2() called with result: {0}", result);
                }
                catch (WampException ex)
                {
                    if (ex.ErrorUri != "wamp.error.no_such_procedure")
                    {
                        Console.WriteLine("call of mul2() failed: " + ex);
                    }
                }


                await Task.Delay(TimeSpan.FromSeconds(1))
                    .ConfigureAwait(false);
            }
        }

        #region Callee

        public interface IAdd2Service
        {
            [WampProcedure("com.example.add2")]
            int Add(int x, int y);
        }

        public class Add2Service : IAdd2Service
        {
            public int Add(int x, int y)
            {
                Console.WriteLine("add2() called with {0} and {1}", x, y);
                return x + y;
            }
        }

        #endregion

        #region Caller

        public interface IMul2Service
        {
            [WampProcedure("com.example.mul2")]
            Task<int> Multiply(int x, int y);             
        }

        #endregion

        private static void OnClose(object sender, WampSessionCloseEventArgs e)
        {
            Console.WriteLine("connection closed. reason: " + e.Reason);
        }

        private static void OnError(object sender, WampConnectionErrorEventArgs e)
        {
            Console.WriteLine("connection error. error: " + e.Exception);
        }
    }
}

这使用了唯一的 .NET 绑定 WampSharp Nuget 包。该包由社区成员开发,因此更新可能不及时。这里充满未知(至少可能是这样)。

 

JavaScript 代码

以下是用于发布者/订阅者和 RPC 可调用 JavaScript 程序的全部 JavaScript 代码。

<!DOCTYPE html>
<html>
   <body>
      <h1>Hello WAMP</h1>
      <p>Open JavaScript console to watch output.</p>
      <script>AUTOBAHN_DEBUG = false;</script>
       <script src="js/autobahn.min.js"></script>

      <script>
         // the URL of the WAMP Router (Crossbar.io)
         //
         var wsuri;
         if (document.location.origin == "file://") {
            wsuri = "ws://127.0.0.1:8080/ws";

         } else {
            wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" +
                        document.location.host + "/ws";
         }


         // the WAMP connection to the Router
         //
         var connection = new autobahn.Connection({
            url: wsuri,
            realm: "realm1"
         });


         // timers
         //
         var t1, t2;


         // fired when connection is established and session attached
         //
         connection.onopen = function (session, details) {

            console.log("Connected");

            // SUBSCRIBE to a topic and receive events
            //
            function on_counter (args) {
               var counter = args[0];
               console.log("on_counter() event received with counter " + counter);
            }
            session.subscribe('com.example.oncounter', on_counter).then(
               function (sub) {
                  console.log('subscribed to topic');
               },
               function (err) {
                  console.log('failed to subscribe to topic', err);
               }
            );
			
			
	//SUBSCRIBE to a topic and receive events
            
            function on_dotnetcounter (args) {
               var counter = args[0];
               console.log("DOTNET : on_dotnetcounter() event received with counter " + counter);
            }
            session.subscribe('com.example.ondotnetcounter', on_dotnetcounter).then(
               function (sub) {
                  console.log('subscribed to topic');
               },
               function (err) {
                  console.log('failed to subscribe to topic', err);
               }
            );


            // PUBLISH an event every second
            //
            t1 = setInterval(function () {

               session.publish('com.example.onhello', ['Hello from JavaScript (browser)']);
               console.log("published to topic 'com.example.onhello'");
            }, 1000);


            // REGISTER a procedure for remote calling
            //
            function mul2 (args) {
               var x = args[0];
               var y = args[1];
               console.log("mul2() called with " + x + " and " + y);
               return x * y;
            }
            session.register('com.example.mul2', mul2).then(
               function (reg) {
                  console.log('procedure registered');
               },
               function (err) {
                  console.log('failed to register procedure', err);
               }
            );


            // CALL a remote procedure every second
            //
            var x = 0;

            t2 = setInterval(function () {

               session.call('com.example.add2', [x, 18]).then(
                  function (res) {
                     console.log("add2() result:", res);
                  },
                  function (err) {
                     console.log("add2() error:", err);
                  }
               );

               x += 3;
            }, 1000);
         };


         // fired when connection was lost (or could not be established)
         //
         connection.onclose = function (reason, details) {
            console.log("Connection lost: " + reason);
            if (t1) {
               clearInterval(t1);
               t1 = null;
            }
            if (t2) {
               clearInterval(t2);
               t2 = null;
            }
         }


         // now actually open the connection
         //
         connection.open();

      </script>
   </body>
</html>

由于 JavaScript 是官方支持的绑定,它应该由 crossbar.io 的开发人员及时更新。

 

运行时截图

这就是我们在 Visual Studio 中运行解决方案时看到的情况。

点击查看大图

 

如何调试 Crossbar 并同时运行它?

  • 对于 .NET 代码,您可以使用我的技巧,即启动一个额外的 crossbar.exe 进程,并告诉它使用哪个修改过的配置文件。
  • 对于 Web 应用程序,只需使用浏览器的开发工具即可。

 

 

与其他技术的比较

公平地说,接下来我将将其与我喜欢/使用过的其他几个消息传递框架进行比较。值得注意的是,在进行任何比较之前,所有这些都需要安装一些额外的东西。

  • Crossbar 需要 python。
  • RabbitMQ 需要 Erlang。
  • Kafka 需要 JDK。

 

 多语言持久化消息集群配置调试体验发布/订阅RPC
Crossbar.io(1)通过一些奇怪的 .crossbar 文件夹。可能可以在代码中实现相同的功能,但可能非常麻烦。说实话,有点差。因为 crossbar 想使用 .crossbar 文件夹配置来启动您的“crossbar 节点”。这意味着它正在运行应用程序,而不是您的 IDE。所以您被迫从您的应用程序启动 crossbar.exe,并向其传递一个配置文件命令行参数,然后等待它运行。

嗯。
是的,轻而易举。是的,轻而易举。
RabbitMQ通过标准代码/配置。只需运行浏览器/IDE 并设置断点。是的,轻而易举(队列)。是的,通过 correlationId。
Kafka通过标准代码/配置。只需运行浏览器/IDE 并设置断点。是的,轻而易举(主题)。是的,通过 correlationId + 额外主题,不如其他方法方便。

 

1 处理此问题的一种方法是在中间放置一个数据库,以便所有生产者先写入数据库,然后您只需使用事件溯源将事件提供给 crossbar.io 节点,它会将其广播出去。

因此,如您所见,存在更好的解决方案,但最终还是取决于您的需求。

 

结论

这是一个棘手的部分,因为一方面我喜欢它相对易于使用,但另一方面,有太多我不喜欢的因素,不足以使其成为我首选的消息传递解决方案。

我喜欢不同事物可以相对容易地相互通信(尽管每种语言绑定提供的 API 都非常不同)。我不喜欢调试的难度,以及我必须使用这个 .crossbar 文件夹来配置事物。

我也不喜欢消息的不可靠性。当然,您可以解决这个问题,如我所说,但有更好的解决方案。

对我来说,如果您真正想要的是“即发即忘”的非持久化消息传递,那还可以。对于其他任何事情,我会使用 RabbitMQKafka

 

 

© . All rights reserved.