Crossbar.io 快速概览






4.95/5 (15投票s)
对 crossbar.io (Autobahn 消息平台) 的一瞥
引言
不久前,有人在 www.codeproject.com 上发布了另一篇关于 SignalR 的文章,我曾说过,你们应该看看这个东西:crossbar.io。作为一个不愿“言不由衷”的人(当然是在言行一致的基础上),我决定付诸行动,写一篇关于 crossbar.io 的小文章。
那么,crossbar.io 是什么?简单来说,它是一个消息代理,支持多种语言绑定,所有这些语言绑定应该能够无缝地相互通信。
以下是 crossbar.io 的开发者对他们自己产品的介绍:
Crossbar.io 是一个面向分布式和微服务应用程序的开源网络平台。它实现了开放的 Web 应用程序消息协议 (WAMP),功能丰富、可扩展、健壮且安全。让 Crossbar.io 来处理消息传递的难点,这样您就可以专注于应用程序的功能。
所以,能多告诉我一些关于 Crossbar.IO 的信息吗?
crossbar.io 声称拥有以下语言绑定:
- 浏览器中的 JavaScript *
- Node.js 中的 JavaScript *
- Python
- PHP
- Java
- C#
- Erlang
从表面上看,这似乎是一个非常不错的库,可以允许许多不同的、不相关的应用程序相互通信。
值得指出的是,没有星号 (*) 的语言绑定是由社区编写的,它们**不由** crossbar.io 的开发者维护。因此,您可能会发现官方维护的 Crossbar.io 语言绑定和这些社区编写的绑定之间存在差异。
不幸的是,生活就是这样。使用其他具有多语言绑定的消息传递框架(如 RabbitMQ 和 Kafka(我们稍后会详细介绍))时,情况也是如此。
代码在哪里?
本文所使用的演示应用程序的所有代码都可以在我的 GitHub 账户上找到: https://github.com/sachabarber/CrossbarioDemo
安装
本节将指导您完成安装过程
安装 Python
- 下载 python,将其安装到默认位置,而不是“Program Files”,确保“Add to path”设置为 YES。
- 在安装 python 的目录下打开一个**命令提示符**。
- 在 python 命令提示符中使用 **pip**:pip install pypiwin32
- 在 python 命令提示符中使用 **pip**:pip install crossbar
验证 Crossbar.io 安装
1. 验证安装的命令提示符:which crossbar
点击查看大图
2. 验证安装的命令提示符:crossbar version
点击查看大图
演示
本节将在我们深入演示代码之前,概述一些核心的 crossbar.io 概念。
核心概念
crossbar 节点
crossbar.io 有“节点”这个概念。“节点”基本上是一个正在运行的 crossbar.exe 进程实例,该实例正在运行一个活动的配置。通常每个机器/虚拟机上会有一个节点。
.crossbar 文件夹
为了配置正在运行的 crossbar 节点,有一个它会查找的特殊文件夹,称为“.crossbar”。在此文件夹中,有许多文件,其中最重要的文件是“config.json
”,它用于实际配置 crossbar.exe 进程启动时的行为。这个文件夹是神奇的/特殊的,将在您使用 crossbar start
命令时被检查(尽管我们实际上不想这样做,但稍后会详细介绍)。
当我们发出 crossbar start
命令时,crossbar.exe 将拾取它在 .crossbar 名称指定的文件夹中找到的配置文件。
那么,“config.json
”文件是什么样的呢?
这是 .NET 演示中附带的一个文件: https://github.com/crossbario/crossbar-examples,具体来说是 https://github.com/crossbario/crossbar-examples/tree/master/hello/csharp 演示(crossbar.io 网站说您应该能够使用命令行来为您选择的语言生成一个新项目,但实际上您不能,他们发现维护模板工作量太大,所以您需要从示例文件夹中获取它们)。
{
"version": 2,
"controller": {},
"workers": [
{
"type": "router",
"options": {
"pythonpath": [
".."
]
},
"realms": [
{
"name": "realm1",
"roles": [
{
"name": "anonymous",
"permissions": [
{
"uri": "",
"match": "prefix",
"allow": {
"call": true,
"register": true,
"publish": true,
"subscribe": true
},
"disclose": {
"caller": false,
"publisher": false
},
"cache": true
}
]
}
]
}
],
"transports": [
{
"type": "web",
"endpoint": {
"type": "tcp",
"port": 8080
},
"paths": {
"/": {
"type": "static",
"directory": "../src/Web"
},
"ws": {
"type": "websocket"
}
}
}
]
},
{
"type": "guest",
"executable": "Hello.exe",
"arguments": [
"ws://127.0.0.1:8080/ws",
"realm1"
],
"options": {
"workdir": "../src/DotNet/Hello/bin/Debug/"
}
}
]
}
这个配置文件中有两个实际的工作进程:
- 一个网站
- 一个 .NET 应用程序 (Hello.exe)
所以,让我们仔细想想,我们有这个配置文件,并且我们应该用这个配置文件来运行 crossbar.exe,它会启动 Hello.exe 和一个网站。嗯,如果 Crossbar.exe 运行 Hello.exe,我该如何调试 Hello.exe 呢?
在我看来,这与我们想要的效果正好相反。如何调试呢?有一些相关的帖子:
- http://stackoverflow.com/questions/28251494/how-to-log-debug-a-crossbar-guest-worker
- http://stackoverflow.com/questions/27476238/debugging-crossbar-io-app-in-intellij/27544912#27544912
我决定采取另一种方法。我认为使用 Web 开发工具(Chrome 中的 F12)来调试 JavaScript 没问题,但对于 .NET,我想使用 Visual Studio。所以我让我的 .NET 代码启动 Crossbar.io 作为一个额外的进程,我在代码中配置 .NET 工作进程,并向新的 Crossbar.exe 进程传递一个修改过的上述文件版本,从中删除了 .NET Hello.exe 的配置。
我必须说,如果我使用的是 RabbitMQ 或 Kafka,这本将是轻而易举的事。我更喜欢嵌入式 API,而不是将我的代码提交给其他东西来运行。当然,您必须能够自己配置事物,但为什么一开始不能就这样做呢?
RPC
crossbar.io 允许工作进程对不同语言的工作进程进行远程过程调用。
发布/订阅
crossbar.io 允许工作进程在不同语言的工作进程之间发布和接收消息。
演示代码
演示代码将展示一个单独的 .NET 工作进程和一个单独的 JavaScript 工作进程,它们将相互发布/订阅和进行 RPC 调用。
.NET 代码
以下是用于发布者/订阅者和 RPC 可调用 .NET 程序的全部 .NET 代码。
using System;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using WampSharp.Core.Listener;
using WampSharp.V2;
using WampSharp.V2.Client;
using WampSharp.V2.Core.Contracts;
using WampSharp.V2.Realm;
using WampSharp.V2.Rpc;
using System.Diagnostics;
namespace Hello
{
public class Program
{
static void Main(string[] args)
{
#if DEBUG
//use this for debugging
Task.Factory.StartNew(() =>
{
Process process = new Process();
process.StartInfo.FileName = @"c:\Users\sacha\AppData\Local\Programs\Python\Python36-32\Scripts\crossbar.exe";
process.StartInfo.Arguments = @"start --cbdir C:\Users\sacha\Desktop\CrossbarIOExample\CrossBarDotNetExample\.crossbar";
process.StartInfo.WindowStyle = ProcessWindowStyle.Maximized;
process.Start();
// Waits here for the process to exit, but since this is
// dedicated thread main thread is not blocked
process.WaitForExit();
}, TaskCreationOptions.LongRunning);
//give CrossBar.io process time to start
System.Threading.Thread.Sleep(1000 * 20);
#endif
Console.WriteLine("WampSharp Hello demo starting ...");
string wsuri = "ws://127.0.0.1:8080/ws";
string realm = "realm1";
if (args.Length > 0) {
wsuri = args[0];
if (args.Length > 1) {
realm = args[1];
}
}
Task runTask = Run(wsuri, realm);
Console.ReadLine();
}
private async static Task Run(string wsuri, string realm)
{
Console.WriteLine("Connecting to {0}, realm {1}", wsuri, realm);
DefaultWampChannelFactory factory = new DefaultWampChannelFactory();
IWampChannel channel =
factory.CreateJsonChannel(wsuri, realm);
IWampClientConnectionMonitor monitor = channel.RealmProxy.Monitor;
monitor.ConnectionBroken += OnClose;
monitor.ConnectionError += OnError;
await channel.Open().ConfigureAwait(false);
IWampRealmServiceProvider services = channel.RealmProxy.Services;
// SUBSCRIBE to a topic and receive events
ISubject<string> helloSubject =
services.GetSubject<string>("com.example.onhello");
IDisposable subscription =
helloSubject.Subscribe
(x => Console.WriteLine("event for 'onhello' received: {0}", x));
Console.WriteLine("subscribed to topic 'onhello'");
// REGISTER a procedure for remote calling
Add2Service callee = new Add2Service();
await services.RegisterCallee(callee)
.ConfigureAwait(false);
Console.WriteLine("procedure add2() registered");
// PUBLISH and CALL every second... forever
ISubject<int> onCounterSubject =
services.GetSubject<int>("com.example.oncounter");
ISubject<int> onDotNetCounterSubject =
services.GetSubject<int>("com.example.ondotnetcounter");
IMul2Service proxy =
services.GetCalleeProxy<IMul2Service>();
int counter = 0;
while (true)
{
// PUBLISH an event
onCounterSubject.OnNext(counter);
Console.WriteLine("published to 'oncounter' with counter {0}", counter);
onDotNetCounterSubject.OnNext(counter);
Console.WriteLine("published to 'ondotnetcounter' with counter {0}", counter);
counter++;
// CALL a remote procedure
try
{
int result = await proxy.Multiply(counter, 3)
.ConfigureAwait(false);
Console.WriteLine("mul2() called with result: {0}", result);
}
catch (WampException ex)
{
if (ex.ErrorUri != "wamp.error.no_such_procedure")
{
Console.WriteLine("call of mul2() failed: " + ex);
}
}
await Task.Delay(TimeSpan.FromSeconds(1))
.ConfigureAwait(false);
}
}
#region Callee
public interface IAdd2Service
{
[WampProcedure("com.example.add2")]
int Add(int x, int y);
}
public class Add2Service : IAdd2Service
{
public int Add(int x, int y)
{
Console.WriteLine("add2() called with {0} and {1}", x, y);
return x + y;
}
}
#endregion
#region Caller
public interface IMul2Service
{
[WampProcedure("com.example.mul2")]
Task<int> Multiply(int x, int y);
}
#endregion
private static void OnClose(object sender, WampSessionCloseEventArgs e)
{
Console.WriteLine("connection closed. reason: " + e.Reason);
}
private static void OnError(object sender, WampConnectionErrorEventArgs e)
{
Console.WriteLine("connection error. error: " + e.Exception);
}
}
}
这使用了唯一的 .NET 绑定 WampSharp
Nuget 包。该包由社区成员开发,因此更新可能不及时。这里充满未知(至少可能是这样)。
JavaScript 代码
以下是用于发布者/订阅者和 RPC 可调用 JavaScript 程序的全部 JavaScript 代码。
<!DOCTYPE html>
<html>
<body>
<h1>Hello WAMP</h1>
<p>Open JavaScript console to watch output.</p>
<script>AUTOBAHN_DEBUG = false;</script>
<script src="js/autobahn.min.js"></script>
<script>
// the URL of the WAMP Router (Crossbar.io)
//
var wsuri;
if (document.location.origin == "file://") {
wsuri = "ws://127.0.0.1:8080/ws";
} else {
wsuri = (document.location.protocol === "http:" ? "ws:" : "wss:") + "//" +
document.location.host + "/ws";
}
// the WAMP connection to the Router
//
var connection = new autobahn.Connection({
url: wsuri,
realm: "realm1"
});
// timers
//
var t1, t2;
// fired when connection is established and session attached
//
connection.onopen = function (session, details) {
console.log("Connected");
// SUBSCRIBE to a topic and receive events
//
function on_counter (args) {
var counter = args[0];
console.log("on_counter() event received with counter " + counter);
}
session.subscribe('com.example.oncounter', on_counter).then(
function (sub) {
console.log('subscribed to topic');
},
function (err) {
console.log('failed to subscribe to topic', err);
}
);
//SUBSCRIBE to a topic and receive events
function on_dotnetcounter (args) {
var counter = args[0];
console.log("DOTNET : on_dotnetcounter() event received with counter " + counter);
}
session.subscribe('com.example.ondotnetcounter', on_dotnetcounter).then(
function (sub) {
console.log('subscribed to topic');
},
function (err) {
console.log('failed to subscribe to topic', err);
}
);
// PUBLISH an event every second
//
t1 = setInterval(function () {
session.publish('com.example.onhello', ['Hello from JavaScript (browser)']);
console.log("published to topic 'com.example.onhello'");
}, 1000);
// REGISTER a procedure for remote calling
//
function mul2 (args) {
var x = args[0];
var y = args[1];
console.log("mul2() called with " + x + " and " + y);
return x * y;
}
session.register('com.example.mul2', mul2).then(
function (reg) {
console.log('procedure registered');
},
function (err) {
console.log('failed to register procedure', err);
}
);
// CALL a remote procedure every second
//
var x = 0;
t2 = setInterval(function () {
session.call('com.example.add2', [x, 18]).then(
function (res) {
console.log("add2() result:", res);
},
function (err) {
console.log("add2() error:", err);
}
);
x += 3;
}, 1000);
};
// fired when connection was lost (or could not be established)
//
connection.onclose = function (reason, details) {
console.log("Connection lost: " + reason);
if (t1) {
clearInterval(t1);
t1 = null;
}
if (t2) {
clearInterval(t2);
t2 = null;
}
}
// now actually open the connection
//
connection.open();
</script>
</body>
</html>
由于 JavaScript 是官方支持的绑定,它应该由 crossbar.io 的开发人员及时更新。
运行时截图
这就是我们在 Visual Studio 中运行解决方案时看到的情况。
点击查看大图
如何调试 Crossbar 并同时运行它?
- 对于 .NET 代码,您可以使用我的技巧,即启动一个额外的 crossbar.exe 进程,并告诉它使用哪个修改过的配置文件。
- 对于 Web 应用程序,只需使用浏览器的开发工具即可。
与其他技术的比较
公平地说,接下来我将将其与我喜欢/使用过的其他几个消息传递框架进行比较。值得注意的是,在进行任何比较之前,所有这些都需要安装一些额外的东西。
- Crossbar 需要 python。
- RabbitMQ 需要 Erlang。
- Kafka 需要 JDK。
多语言 | 持久化消息 | 集群 | 配置 | 调试体验 | 发布/订阅 | RPC | |
Crossbar.io | 是 | 否 (1) | 否 | 通过一些奇怪的 .crossbar 文件夹。可能可以在代码中实现相同的功能,但可能非常麻烦。 | 说实话,有点差。因为 crossbar 想使用 .crossbar 文件夹配置来启动您的“crossbar 节点”。这意味着它正在运行应用程序,而不是您的 IDE。所以您被迫从您的应用程序启动 crossbar.exe,并向其传递一个配置文件命令行参数,然后等待它运行。 嗯。 | 是的,轻而易举。 | 是的,轻而易举。 |
RabbitMQ | 是 | 是 | 是 | 通过标准代码/配置。 | 只需运行浏览器/IDE 并设置断点。 | 是的,轻而易举(队列)。 | 是的,通过 correlationId。 |
Kafka | 是 | 是 | 是 | 通过标准代码/配置。 | 只需运行浏览器/IDE 并设置断点。 | 是的,轻而易举(主题)。 | 是的,通过 correlationId + 额外主题,不如其他方法方便。 |
1 处理此问题的一种方法是在中间放置一个数据库,以便所有生产者先写入数据库,然后您只需使用事件溯源将事件提供给 crossbar.io 节点,它会将其广播出去。
因此,如您所见,存在更好的解决方案,但最终还是取决于您的需求。
结论
这是一个棘手的部分,因为一方面我喜欢它相对易于使用,但另一方面,有太多我不喜欢的因素,不足以使其成为我首选的消息传递解决方案。
我喜欢不同事物可以相对容易地相互通信(尽管每种语言绑定提供的 API 都非常不同)。我不喜欢调试的难度,以及我必须使用这个 .crossbar 文件夹来配置事物。
我也不喜欢消息的不可靠性。当然,您可以解决这个问题,如我所说,但有更好的解决方案。
对我来说,如果您真正想要的是“即发即忘”的非持久化消息传递,那还可以。对于其他任何事情,我会使用 RabbitMQ 或 Kafka。