65.9K
CodeProject 正在变化。 阅读更多。
Home

.NET / Scala 互操作使用 RabbitMQ

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.82/5 (11投票s)

2015 年 10 月 8 日

CPOL

8分钟阅读

viewsIcon

29447

使用 RabbitMQ 让 .NET 代码与 Scala 代码互相通信

引言

我最近开始了一份新工作,我们混合使用了多种技术,大量的 .NET,以及新的、基于 Scala 的技术。我们使用 Scala 是为了能够利用 JVM 提供的丰富工具集,例如 Apache Spark/Cassandra/ZooKeeper。然而,我们也有很多现有的 .NET 应用需要与之通信。

我们基本确定新代码将大致基于微服务,并使用某种 Actor 框架(可能是 Akka 和 Akka.NET)。但在我写作时,Akka 和 Akka.NET 之间并不直接通信,没有“线上”协议支持这一点。我读过一篇非常有趣的帖子(链接已丢失,抱歉),来自 Akka 的主要开发者之一,他表示他们希望做到这一点,但这将是一项非常大的工作,而且不会很快实现。

因此,如果你想让 Akka 代码与 Akka .NET 代码通信,你只能自己想办法。所以让我们来思考一下,让我们完全从图中移除 Actor 框架的需求。我们想实现什么?本质上,我们想找到一种方法,让 Scala 代码中的某个“事件/消息”触发 .NET 代码中的某个操作。

有一些很棒的库允许 .NET 使用 Java 类,例如 IKVM,还有一个很棒的项目 jnbridge,它在其主页上声称“可以在任何地方连接任何 Java 和任何 .NET”。

我见过 IKVM 的使用,虽然它是一个不错的库,但从 .NET 使用 Java 类并不是我们用例的需要。至于 jnbridge,我没有使用过它,但确实有一个区域看起来很酷,那就是能够将 Java 消息服务 (JMS) 用于 .NET,你可以在这里阅读:http://jnbridge.com/software/jms-adapter-for-net/overview

这看起来可能有效,但我想要一些更轻量级、更熟悉的东西。多年来我使用过多种不同的消息传递技术,但其中一种总是很容易上手,那就是 RabbitMQ

RabbitMQ 有许多客户端,包括 Java 和 .NET 的客户端。我们的用例是 Scala 到 .NET,但由于 Scala 是 JVM 目标语言,我们可以使用 Java 版的 RabbitMQ 客户端。

所以,本质上这篇文章是关于如何让 Scala 代码与 .NET 代码进行通信以及反之亦然。我选择使用 **JSON** 作为消息格式。我之所以这样做,是因为大多数语言都有不同的 JSON 库,而且它是一种轻量级格式。

 

 

通用先决条件

为了使演示应用程序正常工作,双方都需要一些东西,以及一些特定的东西(实际上只有一个,就是 JSON 序列化库)。所以本节概述了通用先决条件,而本文的 .NET / Scala 部分将概述特定语言所需的内容。

 

然而,正如我所说,双方都使用了 RabbitMQ,因此需要安装并运行 RabbitMQ。所以你需要安装以下组件才能使 RabbitMQ 工作:

  1. Erlang OTP:https://erlang.org.cn/download.html
  2. RabbitMQ:https://rabbitmq.cn/download.html

 

安装完这些组件后,最好确保执行以下操作:

  1. 确保 RabbitMQ 设置为自动启动服务。服务名称应为“RabbitMQ”。
  2. 确保 RabbitMQ Web 管理插件已安装并正常工作。你可以在这里找到一篇关于如何操作的文章:https://rabbitmq.cn/management.html

完成这些操作后,你应该可以通过以下 URL 检查 Web 管理插件:https://:15672/#/

这应该会显示类似下图的内容。

 

点击查看大图

 

 

代码在哪里

代码可以在我的 GitHub 账户找到:

https://github.com/sachabarber/ScalaToDotNetInteropUsingRabbitMq

 

 

 

演示应用程序

演示应用程序由以下部分组成:

  • .NET 端有一个单独的 VS2013 解决方案。
  • 2 个不同的 IntelliJ IDEA 项目
    • Scala 发布者
    • Scala 订阅者

 

重要提示

演示应用程序是让 .NET 与 Scala 通信以及反之亦然所需的最小组件。它完全不适合生产环境。没有任何异常处理,所以请将其视为演示。

 

下表说明了我亲自测试过的场景(如果箭头看起来有点粗糙,那是对的,我在 Word(强大的绘图工具)中忘记将它们设为单向,所以不得不使用 Paint .NET 进行修复,请原谅我)。

 


点击查看大图

.NET 发布者到 .NET 订阅者


点击查看大图

Scala 发布者到 Scala 订阅者


点击查看大图

.NET 发布者到 Scala 订阅者


点击查看大图

Scala 发布者到 .NET 订阅者

 

基本思路如下:

  • 我们将使用 RabbitMQ 进行跨进程通信。
  • 我们将使用 RabbitMQ 的头部(headers)传递一个 Int 值,该值指示发送消息的类型,以便订阅者知道如何反序列化接收到的消息数据。
  • 消息体本身将是 JSON。

 

以下是一些工作截图:

 

Scala 发布者 -> .NET 订阅者

点击查看大图

 

 

.NET 发布者 -> Scala 订阅者

点击查看大图

 

 

.NET 端

本节概述了 .NET 代码的工作原理以及运行它所需的组件。

 

.NET 先决条件

  • JSON .NET:这用于 JSON 序列化,可以通过 Nuget 安装。包名是 Newtonsoft.Json
  • RabbitMQ 客户端:这仅仅是使用 Nuget 上的官方支持的 .NET 客户端。包名是 RabbitMQ.Client
 

通用消息数据

对于演示应用程序,使用了这个简单的数据传输对象。

public class Person
{
    [JsonProperty(PropertyName="age")]
    public int Age { get; set; }

    [JsonProperty(PropertyName = "name")]
    public string Name { get; set; }

    public override string ToString()
    {
        return string.Format("Name : {0}, Age : {1}", Name, Age);
    }
}

细心的读者会注意到我使用了 JsonProperty 属性来控制序列化属性的名称。我们需要这样做,因为在 Scala 中使用其属性包类(case class)的等价类时,有一个约定是属性名使用小写。

因此,我们需要在某个地方进行调整,我选择在 .NET 端进行调整,因为我比 Scala Play JSON 库更熟悉 JSON .NET。

如果没有对序列化的控制,Scala Play JSON 库将无法反序列化 .NET 对象,因为大小写不同。

 

.NET 发布者

这是 .NET 发布者代码:

using System;
using RabbitMQ.Client;
using System.Text;
using Common;
using Newtonsoft.Json;
using RabbitMQ.Client.Framing;
using System.Collections.Generic;
using System.Threading;

namespace Publisher
{
    class Program
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                Random rand = new Random();


                while (true)
                {
                    var message = GetMessage(rand);
                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = new BasicProperties();
                    properties.Headers = new Dictionary<string, object>();
                    properties.Headers.Add("type", "1");

                    channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: properties,
                                         body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                    Thread.Sleep(1000);
                }
            }

           
            Console.ReadLine();
        }

        private static string GetMessage(Random rand)
        {
            Person p = new Person();
            p.Age = rand.Next(100);
            p.Name = string.Format("named from .NET {0}", p.Age);
            return JsonConvert.SerializeObject(p);
        }
    }
}

 

这里有几点需要注意:

  1. 我们创建一个新的 Person 对象,并将其序列化为 JSON。
  2. 我们将一个头部值(Type = 1)添加到 RabbitMQ 的头部集合中。这使得消费者可以检查头部以了解如何处理消息数据。
  3. 我们使用标准的 RabbitMQ 代码发送 JSON 序列化后的 Person 对象。

 

.NET 订阅者

这是 .NET 订阅者代码:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Collections.Generic;
using Common;
using Newtonsoft.Json;

namespace Subscriber
{
    class Program
    {
        public static void Main()
        {
            Dictionary<int, Type> typelookup = new Dictionary<int, Type>();
            typelookup.Add(1, typeof(Person));

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");

                Console.WriteLine(" [*] Waiting for logs.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var typeToLookupBytes = (Byte[])ea.BasicProperties.Headers["type"];
                    var typeToLookup = int.Parse(Encoding.UTF8.GetString(typeToLookupBytes));
                    var messageType = typelookup[typeToLookup];
                    var message = Encoding.UTF8.GetString(body);
                    var person = JsonConvert.DeserializeObject(message, messageType);

                    Console.WriteLine("[Recieved] message : {0}", person);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

这里有几点需要注意:

  1. 我们挂载一个标准的 RabbitMQ 消费者来监听传入的消息。
  2. 我们首先检查头部,以了解正在接收的消息类型。
  3. 通过反序列化传入的消息体(这是一个 JSON 序列化后的 Person 实例),我们创建一个新的 Person 对象。

 

 

 

 

 

Scala 端

本节概述了 Scala 代码的工作原理以及运行它所需的组件。

 

Scala 先决条件

 

这是 Scala 订阅者 SBT 文件的示例:


name := "Subscriber"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "com.rabbitmq" % "amqp-client" % "3.5.5",
  "com.typesafe.play" % "play-json_2.11" % "2.4.3"
)

 

通用消息数据

对于演示应用程序,使用了这个简单的数据传输对象。

import play.api.libs.json.{JsPath, Json, Writes, Reads}
import play.api.libs.functional.syntax._



trait RabbitJsonMessage
case class Person(name: String, age: Int) extends  RabbitJsonMessage
{
  override def toString : String = {
    s"Name : $name, Age : $age"
  }
}

object JsonImplicits {

  implicit val personWrites = new Writes[Person] {
    def writes(person: Person) = Json.obj(
      "name" -> person.name,
      "age" -> person.age
    )
  }

  implicit val personReads : Reads[Person] = (
    (JsPath \ "name").read[String] and
      (JsPath \ "age").read[Int]
    )(Person.apply _)
}

 

这比我们在 .NET 端看到的简单 JSON .NET 代码要复杂一些。这很大程度上归因于 Play JSON 库如何与 Scala 配合工作,它要求你使用 Reads/Writes 特征,如上所示,使用 Scala 的隐式值 (implicit val) 来提供。

 

  • Writes 特征(特征类似于带有部分实现的接口)将 Person case class 写入 JsValue
  • Reads 特征从 JsValue 读取到 Person case class。

 

 

 

Scala 发布者

这是 Scala 发布者代码:

import com.rabbitmq.client._

import java.util.HashMap
import java.nio.charset.StandardCharsets

import play.api.libs.json._

import JsonImplicits ._



object PublisherDemo {

  def main (args:Array[String]):Unit = {
    val r = new PublisherDemo ()
    r.Send
  }
}




class PublisherDemo {
  val EXCHANGE_NAME = "logs"

  def Send () = {

    val factory = new ConnectionFactory()
    factory.setHost("localhost")
    val connection = factory.newConnection()
    val channel = connection.createChannel()

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout")

     for (i <- (0 to 100)) {

       val person = new Person("named from scala " + i.toString, i)
       val message = Json.toJson(person)
       val bytes =  message.toString.getBytes(StandardCharsets.UTF_8)
       val headers = new HashMap[String,AnyRef]()
       headers.putIfAbsent("type","1")


       val props = new AMQP.BasicProperties.Builder().headers(headers)
       channel.basicPublish(EXCHANGE_NAME,"",props.build() , bytes)
       System.out.println(" [x] Sent '" + message + "'")
       Thread.sleep(1000)
    }
    channel.close()
    connection.close()
  }
}

 

这里有几点需要注意:

  1. 我们创建一个新的 Person 对象,并将其序列化为 JSON。
  2. 我们将一个头部值(Type = 1)添加到 RabbitMQ 的头部集合中。这使得消费者可以检查头部以了解如何处理消息数据。
  3. 我们使用标准的 RabbitMQ 代码发送 JSON 序列化后的 Person 对象。

 

 

Scala 订阅者

这是 Scala 订阅者代码:

import java.util.HashMap
import com.rabbitmq.client._
import scala.reflect.ClassTag
import scala.runtime.RichInt
import scala.reflect.runtime.universe._
import play.api.libs.json.{JsValue, Json, Writes}
import JsonImplicits ._


object SubscriberDemo {

  def main (args:Array[String]): Unit = {
    val r = new SubscriberDemo()
    r.Receive()
  }
}


class SubscriberDemo {
  val EXCHANGE_NAME = "logs"

  def Receive() = {

      val factory = new ConnectionFactory()
      factory.setHost("localhost")
      val connection = factory.newConnection()
      val channel = connection.createChannel()

      channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
      val queueName = channel.queueDeclare().getQueue()
      channel.queueBind(queueName, EXCHANGE_NAME, "")

      val typelookup = new HashMap[Int, JsValue => RabbitJsonMessage]()
      typelookup.putIfAbsent(1,value =>
      {
          val person = Json.fromJson[Person](value).get
          person.asInstanceOf[RabbitJsonMessage]
      })

      System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
      val consumer = new DefaultConsumer(channel) {


      override def handleDelivery(consumerTag: String, envelope: Envelope, 
		properties: AMQP.BasicProperties, body: scala.Array[scala.Byte] ) =
      {
          val typeToLookup = properties.getHeaders().get("type").toString().toInt
          val jsonConverter = typelookup.get(typeToLookup)
          val messageBody = new String(body, "UTF-8")
          val jsonObject = Json.parse(messageBody)
          val person = jsonConverter(jsonObject).asInstanceOf[Person]
          System.out.println(" [x] Received '" + person + "'");
      }

    }
    channel.basicConsume(queueName, true, consumer)

  }
}

这里有几点需要注意:

  1. 我们挂载一个标准的 RabbitMQ 消费者来监听传入的消息。
  2. 我们首先检查头部,以了解正在接收的消息类型。
  3. 通过反序列化传入的消息体(这是一个 JSON 序列化后的 Person 实例),我们创建一个新的 Person 对象。

 

 

 

 

就这些

这次就这些内容了,如果你喜欢这篇文章,并认为它很有用,欢迎投票和评论。

© . All rights reserved.