.NET / Scala 互操作使用 RabbitMQ






4.82/5 (11投票s)
使用 RabbitMQ 让 .NET 代码与 Scala 代码互相通信
引言
我最近开始了一份新工作,我们混合使用了多种技术,大量的 .NET,以及新的、基于 Scala 的技术。我们使用 Scala 是为了能够利用 JVM 提供的丰富工具集,例如 Apache Spark/Cassandra/ZooKeeper。然而,我们也有很多现有的 .NET 应用需要与之通信。
我们基本确定新代码将大致基于微服务,并使用某种 Actor 框架(可能是 Akka 和 Akka.NET)。但在我写作时,Akka 和 Akka.NET 之间并不直接通信,没有“线上”协议支持这一点。我读过一篇非常有趣的帖子(链接已丢失,抱歉),来自 Akka 的主要开发者之一,他表示他们希望做到这一点,但这将是一项非常大的工作,而且不会很快实现。
因此,如果你想让 Akka 代码与 Akka .NET 代码通信,你只能自己想办法。所以让我们来思考一下,让我们完全从图中移除 Actor 框架的需求。我们想实现什么?本质上,我们想找到一种方法,让 Scala 代码中的某个“事件/消息”触发 .NET 代码中的某个操作。
有一些很棒的库允许 .NET 使用 Java 类,例如 IKVM,还有一个很棒的项目 jnbridge,它在其主页上声称“可以在任何地方连接任何 Java 和任何 .NET”。
我见过 IKVM 的使用,虽然它是一个不错的库,但从 .NET 使用 Java 类并不是我们用例的需要。至于 jnbridge,我没有使用过它,但确实有一个区域看起来很酷,那就是能够将 Java 消息服务 (JMS) 用于 .NET,你可以在这里阅读:http://jnbridge.com/software/jms-adapter-for-net/overview
这看起来可能有效,但我想要一些更轻量级、更熟悉的东西。多年来我使用过多种不同的消息传递技术,但其中一种总是很容易上手,那就是 RabbitMQ。
RabbitMQ 有许多客户端,包括 Java 和 .NET 的客户端。我们的用例是 Scala 到 .NET,但由于 Scala 是 JVM 目标语言,我们可以使用 Java 版的 RabbitMQ 客户端。
所以,本质上这篇文章是关于如何让 Scala 代码与 .NET 代码进行通信以及反之亦然。我选择使用 **JSON** 作为消息格式。我之所以这样做,是因为大多数语言都有不同的 JSON 库,而且它是一种轻量级格式。
通用先决条件
为了使演示应用程序正常工作,双方都需要一些东西,以及一些特定的东西(实际上只有一个,就是 JSON 序列化库)。所以本节概述了通用先决条件,而本文的 .NET / Scala 部分将概述特定语言所需的内容。
然而,正如我所说,双方都使用了 RabbitMQ,因此需要安装并运行 RabbitMQ。所以你需要安装以下组件才能使 RabbitMQ 工作:
- Erlang OTP:https://erlang.org.cn/download.html
- RabbitMQ:https://rabbitmq.cn/download.html
安装完这些组件后,最好确保执行以下操作:
- 确保 RabbitMQ 设置为自动启动服务。服务名称应为“RabbitMQ”。
- 确保 RabbitMQ Web 管理插件已安装并正常工作。你可以在这里找到一篇关于如何操作的文章:https://rabbitmq.cn/management.html
代码在哪里
代码可以在我的 GitHub 账户找到:
https://github.com/sachabarber/ScalaToDotNetInteropUsingRabbitMq
演示应用程序
演示应用程序由以下部分组成:
- .NET 端有一个单独的 VS2013 解决方案。
- 2 个不同的 IntelliJ IDEA 项目
- Scala 发布者
- Scala 订阅者
重要提示
演示应用程序是让 .NET 与 Scala 通信以及反之亦然所需的最小组件。它完全不适合生产环境。没有任何异常处理,所以请将其视为演示。
下表说明了我亲自测试过的场景(如果箭头看起来有点粗糙,那是对的,我在 Word(强大的绘图工具)中忘记将它们设为单向,所以不得不使用 Paint .NET 进行修复,请原谅我)。
|
.NET 发布者到 .NET 订阅者 |
|
Scala 发布者到 Scala 订阅者 |
|
.NET 发布者到 Scala 订阅者 |
|
Scala 发布者到 .NET 订阅者 |
基本思路如下:
- 我们将使用 RabbitMQ 进行跨进程通信。
- 我们将使用 RabbitMQ 的头部(headers)传递一个 Int 值,该值指示发送消息的类型,以便订阅者知道如何反序列化接收到的消息数据。
- 消息体本身将是 JSON。
以下是一些工作截图:
Scala 发布者 -> .NET 订阅者
点击查看大图
.NET 发布者 -> Scala 订阅者
点击查看大图
.NET 端
本节概述了 .NET 代码的工作原理以及运行它所需的组件。
.NET 先决条件
- JSON .NET:这用于 JSON 序列化,可以通过 Nuget 安装。包名是
Newtonsoft.Json
。 - RabbitMQ 客户端:这仅仅是使用 Nuget 上的官方支持的 .NET 客户端。包名是
RabbitMQ.Client
。
通用消息数据
对于演示应用程序,使用了这个简单的数据传输对象。
public class Person
{
[JsonProperty(PropertyName="age")]
public int Age { get; set; }
[JsonProperty(PropertyName = "name")]
public string Name { get; set; }
public override string ToString()
{
return string.Format("Name : {0}, Age : {1}", Name, Age);
}
}
细心的读者会注意到我使用了 JsonProperty
属性来控制序列化属性的名称。我们需要这样做,因为在 Scala 中使用其属性包类(case class)的等价类时,有一个约定是属性名使用小写。
因此,我们需要在某个地方进行调整,我选择在 .NET 端进行调整,因为我比 Scala Play JSON 库更熟悉 JSON .NET。
如果没有对序列化的控制,Scala Play JSON 库将无法反序列化 .NET 对象,因为大小写不同。
.NET 发布者
这是 .NET 发布者代码:
using System;
using RabbitMQ.Client;
using System.Text;
using Common;
using Newtonsoft.Json;
using RabbitMQ.Client.Framing;
using System.Collections.Generic;
using System.Threading;
namespace Publisher
{
class Program
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
Random rand = new Random();
while (true)
{
var message = GetMessage(rand);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties();
properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("type", "1");
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
Thread.Sleep(1000);
}
}
Console.ReadLine();
}
private static string GetMessage(Random rand)
{
Person p = new Person();
p.Age = rand.Next(100);
p.Name = string.Format("named from .NET {0}", p.Age);
return JsonConvert.SerializeObject(p);
}
}
}
这里有几点需要注意:
- 我们创建一个新的
Person
对象,并将其序列化为 JSON。 - 我们将一个头部值(Type = 1)添加到 RabbitMQ 的头部集合中。这使得消费者可以检查头部以了解如何处理消息数据。
- 我们使用标准的 RabbitMQ 代码发送 JSON 序列化后的
Person
对象。
.NET 订阅者
这是 .NET 订阅者代码:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Collections.Generic;
using Common;
using Newtonsoft.Json;
namespace Subscriber
{
class Program
{
public static void Main()
{
Dictionary<int, Type> typelookup = new Dictionary<int, Type>();
typelookup.Add(1, typeof(Person));
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var typeToLookupBytes = (Byte[])ea.BasicProperties.Headers["type"];
var typeToLookup = int.Parse(Encoding.UTF8.GetString(typeToLookupBytes));
var messageType = typelookup[typeToLookup];
var message = Encoding.UTF8.GetString(body);
var person = JsonConvert.DeserializeObject(message, messageType);
Console.WriteLine("[Recieved] message : {0}", person);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
这里有几点需要注意:
- 我们挂载一个标准的 RabbitMQ 消费者来监听传入的消息。
- 我们首先检查头部,以了解正在接收的消息类型。
- 通过反序列化传入的消息体(这是一个 JSON 序列化后的
Person
实例),我们创建一个新的Person
对象。
Scala 端
本节概述了 Scala 代码的工作原理以及运行它所需的组件。
Scala 先决条件
- Play JSON 库,可以通过 SBT 安装。
- 官方 Java RabbitMQ 库,可以通过 SBT 安装。
这是 Scala 订阅者 SBT 文件的示例:
name := "Subscriber" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "com.rabbitmq" % "amqp-client" % "3.5.5", "com.typesafe.play" % "play-json_2.11" % "2.4.3" )
通用消息数据
对于演示应用程序,使用了这个简单的数据传输对象。
import play.api.libs.json.{JsPath, Json, Writes, Reads}
import play.api.libs.functional.syntax._
trait RabbitJsonMessage
case class Person(name: String, age: Int) extends RabbitJsonMessage
{
override def toString : String = {
s"Name : $name, Age : $age"
}
}
object JsonImplicits {
implicit val personWrites = new Writes[Person] {
def writes(person: Person) = Json.obj(
"name" -> person.name,
"age" -> person.age
)
}
implicit val personReads : Reads[Person] = (
(JsPath \ "name").read[String] and
(JsPath \ "age").read[Int]
)(Person.apply _)
}
这比我们在 .NET 端看到的简单 JSON .NET 代码要复杂一些。这很大程度上归因于 Play JSON 库如何与 Scala 配合工作,它要求你使用 Reads/Writes
特征,如上所示,使用 Scala 的隐式值 (implicit val) 来提供。
Writes
特征(特征类似于带有部分实现的接口)将Person
case class 写入JsValue
。Reads
特征从JsValue
读取到Person
case class。
Scala 发布者
这是 Scala 发布者代码:
import com.rabbitmq.client._
import java.util.HashMap
import java.nio.charset.StandardCharsets
import play.api.libs.json._
import JsonImplicits ._
object PublisherDemo {
def main (args:Array[String]):Unit = {
val r = new PublisherDemo ()
r.Send
}
}
class PublisherDemo {
val EXCHANGE_NAME = "logs"
def Send () = {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
for (i <- (0 to 100)) {
val person = new Person("named from scala " + i.toString, i)
val message = Json.toJson(person)
val bytes = message.toString.getBytes(StandardCharsets.UTF_8)
val headers = new HashMap[String,AnyRef]()
headers.putIfAbsent("type","1")
val props = new AMQP.BasicProperties.Builder().headers(headers)
channel.basicPublish(EXCHANGE_NAME,"",props.build() , bytes)
System.out.println(" [x] Sent '" + message + "'")
Thread.sleep(1000)
}
channel.close()
connection.close()
}
}
这里有几点需要注意:
- 我们创建一个新的
Person
对象,并将其序列化为 JSON。 - 我们将一个头部值(Type = 1)添加到 RabbitMQ 的头部集合中。这使得消费者可以检查头部以了解如何处理消息数据。
- 我们使用标准的 RabbitMQ 代码发送 JSON 序列化后的
Person
对象。
Scala 订阅者
这是 Scala 订阅者代码:
import java.util.HashMap
import com.rabbitmq.client._
import scala.reflect.ClassTag
import scala.runtime.RichInt
import scala.reflect.runtime.universe._
import play.api.libs.json.{JsValue, Json, Writes}
import JsonImplicits ._
object SubscriberDemo {
def main (args:Array[String]): Unit = {
val r = new SubscriberDemo()
r.Receive()
}
}
class SubscriberDemo {
val EXCHANGE_NAME = "logs"
def Receive() = {
val factory = new ConnectionFactory()
factory.setHost("localhost")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
val queueName = channel.queueDeclare().getQueue()
channel.queueBind(queueName, EXCHANGE_NAME, "")
val typelookup = new HashMap[Int, JsValue => RabbitJsonMessage]()
typelookup.putIfAbsent(1,value =>
{
val person = Json.fromJson[Person](value).get
person.asInstanceOf[RabbitJsonMessage]
})
System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope,
properties: AMQP.BasicProperties, body: scala.Array[scala.Byte] ) =
{
val typeToLookup = properties.getHeaders().get("type").toString().toInt
val jsonConverter = typelookup.get(typeToLookup)
val messageBody = new String(body, "UTF-8")
val jsonObject = Json.parse(messageBody)
val person = jsonConverter(jsonObject).asInstanceOf[Person]
System.out.println(" [x] Received '" + person + "'");
}
}
channel.basicConsume(queueName, true, consumer)
}
}
这里有几点需要注意:
- 我们挂载一个标准的 RabbitMQ 消费者来监听传入的消息。
- 我们首先检查头部,以了解正在接收的消息类型。
- 通过反序列化传入的消息体(这是一个 JSON 序列化后的
Person
实例),我们创建一个新的Person
对象。
就这些
这次就这些内容了,如果你喜欢这篇文章,并认为它很有用,欢迎投票和评论。