DotNetMQ:.NET 的完整消息队列系统






4.94/5 (190投票s)
一个全新的、独立的开源消息队列系统,完全使用 C# 和 .NET Framework 3.5 构建。
文章大纲
- 引言
- 什么是**消息传递**?
- 什么是**DotNetMQ**?
- 为什么要新建一个**消息中间件**?
- **安装**和运行 DotNetMQ
- 使用 DotNetMQ 的**第一个应用程序**
- 将应用程序**注册**到 DotNetMQ
- 开发**Application1**
- 开发**Application2**
- 消息的**传输规则**属性
- MDSClient 的**通信方式**属性
- MDSClient 的**出错重连**属性
- MDSClient 的**自动确认消息**属性
- **配置** DotNetMQ
- 网络**消息传递**
- **请求/响应**式消息传递
- DotNetMQ 上的**面向服务**架构
- DotNetMQ 的**性能**
- 历史
- 参考文献
引言
在本文中,我将介绍一个全新的、独立的**开源消息队列系统**,它完全使用 C# 和 .NET Framework 3.5 构建。**DotNetMQ** 是一个消息中间件,具有保证传递、路由、负载均衡、服务器图等多种特性。我将从解释消息传递概念和消息中间件的需求开始。然后,我将探讨 DotNetMQ 是什么以及如何使用它。
什么是消息传递?
**消息传递**是一种**异步**通信方式,允许运行在相同或不同计算机上的应用程序进行可靠的传递。程序通过发送称为消息的数据包进行通信 [1]。
消息可以是字符串、字节数组、对象等。通常,**发送方(生产者)**程序创建一个消息并将其推送到消息队列,而**接收方(消费者)**程序从队列中获取消息并进行处理。发送方和接收方程序不必同时运行,因为消息传递是一个异步过程。这被称为**松耦合**通信。
另一方面,Web 服务方法调用(远程方法调用)是一种**紧耦合**和**同步**通信(两个应用程序必须在整个通信过程中都运行并可用;如果 Web 服务离线或在方法调用期间发生错误,客户端应用程序将收到一个异常)。
在上图中,两个应用程序以松耦合的方式通过消息队列进行通信。如果接收方消耗消息的速度慢于发送方生成消息的速度,队列中的消息数量将会增加。此外,接收方在发送方发送消息时可能离线。在这种情况下,接收方将在其上线时(启动并加入队列时)从队列中获取消息。
**消息队列**通常由消息中间件提供。**消息中间件**是一个独立的应用程序(服务),其他应用程序可以连接到它并发送/接收消息。消息中间件负责存储消息,直到接收方接收它们。消息中间件可以将消息路由到不同计算机,以将消息传递给目标应用程序,并且可以尝试传递消息直到接收方正确处理它。消息中间件有时被称为**面向消息的中间件**(**MOM**)或简称为**消息队列**(**MQ**)。
什么是 DotNetMQ?
**DotNetMQ** 是一个开源的消息中间件,具有多种特性:
- **持久化**或**非持久化**的消息传递。
- 即使在系统崩溃的情况下,也能**保证持久化消息的传递**。
- 在自定义的**机器图**中**自动**和**手动****路由**消息。
- 支持**多种数据库**(目前支持**MS SQL Server、MySQL**、**SQLite**,以及内存存储)。
- 支持**不存储、直接发送**式消息传递。
- 支持**请求/响应**式消息传递。
- 易于使用的客户端库,用于与 DotNetMQ 消息中间件通信。
- 内置框架,可轻松在消息队列之上构建**RMI 服务**。
- 支持将**消息传递给 ASP.NET Web 服务**。
- 基于 GUI 的**管理**和**监控**工具。
- 易于安装、管理和使用。
- 完全用**C#**编写(使用 .NET Framework 3.5)。
在最初创建 DotNetMQ 时,我倾向于将其命名为 MDS(消息传递系统)。因为它不仅被设计为消息队列,而且是一个直接将消息传递给应用程序的系统,并提供构建应用程序服务的框架环境。我称之为**DotNetMQ**,因为它完全使用 .NET 开发,并且 DotNetMQ 这个名字更容易记住。所以,它的原始名称(和内部项目名称)是 MDS,并且应用程序中有许多以 **MDS** 为前缀的类。
为什么要新建消息中间件?
消息中间件的需求
首先,我将演示一个需要消息中间件的简单场景。
在我多年的商业实践中,我观察到过非常糟糕且不常见的异步企业应用程序集成解决方案。通常有一个应用程序运行在服务器上,执行一些任务并生成数据,然后将结果数据发送到另一台服务器上的另一个应用程序。第二个应用程序对数据执行其他任务或评估结果(服务器在同一网络或通过互联网连接)。此外,消息数据必须是持久化的。即使远程应用程序不工作或网络不可用,**消息也必须在第一次机会时传递**。
让我们看看下图中的设计。
**应用程序 - 1** 和 **应用程序 - 2** 是可执行的应用程序(或 Windows 服务),而**发送方服务**是一个 Windows 服务。应用程序 - 1 执行某项任务,生成数据,然后调用**服务器 - B** 上的**远程 Web 服务**方法来传输数据。该 Web 服务将数据插入**数据库表**。应用程序 - 2 定期检查该表以获取新传入的数据行并处理它们(并从表中删除它们或将其标记为已处理,以免再次处理相同的数据)。
如果在 Web 服务调用期间或在 Web 服务中处理数据时发生**错误**,数据不应丢失,而应稍后发送。但是,应用程序 - 1 还有其他任务要做,所以它不能一遍又一遍地尝试发送数据。它只是将数据插入数据库表。另一个 Windows 服务(或应用程序 - 1 中的一个线程,如果应用程序始终运行)会定期检查此表,并尝试将数据发送到 Web 服务,直到数据成功发送为止。
这种场景确实非常可靠(消息保证送达),但并不是应用程序之间通信的有效方式。此解决方案存在一些非常严重的问题:
- **开发(编码)时间长**。
- 为所有**消息类型**(或远程方法调用)单独编码。对于新的 Web 服务方法调用,您必须更改所有服务、应用程序和数据库表。
- 几乎相同的软件和结构必须为每个类似的服务开发(或复制和修改)。
- 编码后,**过多的服务/应用程序/数据库**的测试和维护。
- 一些应用程序和服务会定期检查数据库,即使没有新消息(如果数据库索引和优化不佳,这可能会消耗大量系统资源)。
消息中间件可以完成所有这些工作,并承担将消息以最高效的方式传递给远程应用程序的所有责任。下图显示了使用 DotNetMQ 的相同应用程序集成。
DotNetMQ 是一个独立的 Windows 服务,可在**服务器 - A** 和**服务器 - B** 上运行。因此,您只需编写代码与 DotNetMQ 通信即可。使用 DotNetMQ 客户端库,连接和发送/接收消息到/从 DotNetMQ 服务非常容易和快捷。**应用程序 - 1** 准备消息,设置目标,然后将消息传递给 DotNetMQ 中间件。DotNetMQ 中间件将以最高效和最快的方式将消息传递给**应用程序 - 2**。
关于现有的消息中间件
很明显,集成应用程序需要消息中间件。我在网上搜索,并阅读了书籍,以查找一个可以轻松与 .NET 一起使用的免费(如果可能,开源)消息中间件。让我们谈谈我找到的:
- **Apache ActiveMQ**(https://activemq.apache.ac.cn):它是开源的,并实现了**JMS**(Java 消息服务是 Java 领域消息传递的标准 API)。它还有一个 .NET 客户端库。我阅读了整本书《ActiveMQ in Action》以了解更多信息,并开发了一些简单的应用程序。尽管我读了这本书,但我没有看到一种简单可靠的方法来构建一个协同工作并路由消息的 ActiveMQ **服务器图**。我也没看到一种方法来设置消息的目标服务器。它会自动路由消息,但我无法有效地控制路由。我了解到它通常与**Apache Camel**(http://camel.apache.org)一起使用,以实现常见的应用程序集成模式。Apache Camel 也是另一个需要探索的世界,更糟糕的是,它只适用于 Java。最后,我认为它不够简单易用,特别是配置、监控和管理方面。所以我放弃了在 ActiveMQ 上工作。
- **MSMQ**(http://msdn.microsoft.com/en-us/library/ms711472(VS.85).aspx):这是**Microsoft** 的解决方案,也是与 .NET 应用程序最适合使用的框架。它易于使用和学习,并且提供了监控队列和消息的工具。它尤其适合运行在同一台计算机上或可以直接连接到同一台计算机的应用程序之间的异步通信。但是我找不到一个内置的解决方案来构建一个路由消息的 MSMQ 服务器图。由于路由是我首先要解决的问题,所以我排除了这个中间件。
- **RabbitMQ**(https://rabbitmq.cn):它使用**Erlang**编程平台(由**Ericsson**开发)开发。您需要先安装 Erlang。我花费了大量时间来安装、配置并编写了一个示例应用程序。它有一个 .NET 客户端,但在尝试开发和运行一个简单的应用程序时遇到了许多错误。在两台不同服务器上安装和运行两个 RabbitMQ 中间件非常困难。几天后,我放弃了,因为我认为学习和开始开发应用程序不应该这么难。
- **OpenAMQ**(http://www.openamq.org)、**ZeroMQ**(http://www.zeromq.org):**我总体上研究了这些中间件**,但我发现无法轻松地用 .NET 实现我想要的功能。
- **其他**:我还发现了一些其他项目,但它们缺少重要的功能,如路由、持久化消息、请求/响应消息等。
您可以看到,上面的列表中没有一个完全用 .NET 开发的消息中间件。
从用户的角度来看,我只想将“**消息数据、目标服务器和应用程序名称**”传递给我的**本地**中间件。我不在乎其他细节。它将通过网络多次路由消息,并将其传递给我目标服务器上的目标应用程序。我的消息传递系统必须为我提供这种简单性。这是我的出发点,我根据这一点评估了消息中间件。下图显示了我想要的功能。
**应用程序 - 1** 将消息传递给**本地服务器**(**服务器 - A**)上的**消息中间件**
- 目标服务器:**服务器 - D**
- 目标应用程序:**应用程序 - 2**
- 消息数据:**应用程序特定的数据**
**服务器 - A** 没有直接连接到**服务器 - D**。因此,消息中间件将消息通过服务器进行转发(消息依次通过服务器 - A、服务器 - B、服务器 - C 和服务器 - D 传输),消息最终到达服务器 - D 上的消息中间件,以将消息传递给**应用程序 - 2**。请注意,服务器 - E 上还运行着另一个应用程序 - 2 的实例,但它不会收到此消息,因为消息的目标服务器是*服务器 - D*。
DotNetMQ 提供了此功能和简单性。它在图上找到从源服务器到目标服务器的**最佳(最短)路径**并转发消息。
经过全面的介绍后,让我们看看如何在实践中使用 DotNetMQ。
安装和运行 DotNetMQ
目前没有自动安装程序,但安装 DotNetMQ 非常简单。从文章顶部**下载**并**解压****二进制文件**下载文件。只需将所有文件从那里复制到*C:\Program Files\DotNetMQ\*,然后运行* **INSTALL_x86.bat**(如果您使用的是 64 位操作系统,则为 **INSTALL_x64.bat**)。
您可以检查 Windows 服务,看看 DotNetMQ 是否已安装并正在运行。
使用 DotNetMQ 的第一个应用程序
让我们看看 DotNetMQ 的实际应用。为了使第一个应用程序最简单,我假设有两个控制台应用程序运行在同一台计算机上(事实上,(正如我们稍后在本文件中将看到的)如果应用程序在不同的计算机上,也没有显着差异;唯一的区别是正确设置消息中的目标服务器名称)。
- **应用程序 1**:从用户那里获取一个字符串消息并发送到应用程序 2。
- **应用程序 2**:将传入的消息写入控制台屏幕。
将应用程序注册到 DotNetMQ
我们需要一次性注册应用程序才能使用它们与 DotNetMQ。这是一个非常简单的过程。运行**DotNetMQ Manager**(DotNetMQ 程序文件夹中的*MDSManager.exe*(默认:*C:\Program Files\DotNetMQ\*)),然后从**应用程序**菜单打开**应用程序列表**。单击**添加新应用程序**按钮并为应用程序输入一个名称。
按照上述步骤将**Application1** 和**Application2** 应用程序添加到 DotNetMQ。最后,您的应用程序列表应如下所示:
此屏幕显示已注册到 DotNetMQ 的应用程序。已连接客户端列显示当前连接到 DotNetMQ 的应用程序实例数量。**无需重新启动** DotNetMQ,因为此屏幕上的更改。
开发 Application1
在 Visual Studio 中创建一个名为 **Application1** 的新控制台应用程序,并添加对 **MDSCommonLib.dll** 的引用,该库提供连接到 DotNetMQ 所需的类。然后,在 Program.cs 文件中写入以下代码:
using System;
using System.Text;
using MDS.Client;
namespace Application1
{
class Program
{
static void Main(string[] args)
{
//Create MDSClient object to connect to DotNetMQ
//Name of this application: Application1
var mdsClient = new MDSClient("Application1");
//Connect to DotNetMQ server
mdsClient.Connect();
Console.WriteLine("Write a text and press enter to send " +
"to Application2. Write 'exit' to stop application.");
while (true)
{
//Get a message from user
var messageText = Console.ReadLine();
if (string.IsNullOrEmpty(messageText) || messageText == "exit")
{
break;
}
//Create a DotNetMQ Message to send to Application2
var message = mdsClient.CreateMessage();
//Set destination application name
message.DestinationApplicationName = "Application2";
//Set message data
message.MessageData = Encoding.UTF8.GetBytes(messageText);
//Send message
message.Send();
}
//Disconnect from DotNetMQ server
mdsClient.Disconnect();
}
}
}
创建 MDSClient
对象时,我们传递连接到 DotNetMQ 的应用程序名称。使用此构造函数,我们连接到本地服务器 (127.0.0.1) 上的 DotNetMQ,端口号为默认端口 (10905)。可以使用重载构造函数连接到其他服务器和端口。
MDSClient
的 CreateMessage
方法返回一个 IOutgoingMessage
类型的对象。MessageData
属性是要发送到目标应用程序的实际数据。它是一个字节数组。我们使用 UTF8 编码将用户输入的文本转换为字节数组。DestinationApplicationName
和 DestinationServerName
属性用于设置消息的目标地址。如果我们不指定目标服务器,则假定为本地服务器。最后,我们发送消息。
开发 Application2
在 Visual Studio 中创建一个名为 **Application2** 的新控制台应用程序,添加对 **MDSCommonLib.dll** 的引用,并写入以下代码:
using System;
using System.Text;
using MDS.Client;
namespace Application2
{
class Program
{
static void Main(string[] args)
{
//Create MDSClient object to connect to DotNetMQ
//Name of this application: Application2
var mdsClient = new MDSClient("Application2");
//Register to MessageReceived event to get messages.
mdsClient.MessageReceived += MDSClient_MessageReceived;
//Connect to DotNetMQ server
mdsClient.Connect();
//Wait user to press enter to terminate application
Console.WriteLine("Press enter to exit...");
Console.ReadLine();
//Disconnect from DotNetMQ server
mdsClient.Disconnect();
}
/// <summary>
/// This method handles received messages from other applications via DotNetMQ.
/// </summary>
/// <param name="sender"></param>
/// <param name="e">Message parameters</param>
static void MDSClient_MessageReceived(object sender, MessageReceivedEventArgs e)
{
//Get message
var messageText = Encoding.UTF8.GetString(e.Message.MessageData);
//Process message
Console.WriteLine();
Console.WriteLine("Text message received : " + messageText);
Console.WriteLine("Source application : " + e.Message.SourceApplicationName);
//Acknowledge that message is properly handled
//and processed. So, it will be deleted from queue.
e.Message.Acknowledge();
}
}
}
创建 MDSClient
对象与 Application1 中的类似,但应用程序名称为 Application2。为了接收应用程序的消息,它需要注册到 MDSClient
的 MessageReceived
事件。然后我们连接到 DotNetMQ 并保持连接,直到用户按下 Enter 键。
当消息发送到 Application2 时,MDSClient_MessageReceived
方法会处理该事件。我们通过 MessageReceivedEventArgs
的 Message
属性获取消息。消息的类型是 IIncomingMessage
。IIncomingMessage
的 MessageData
属性包含 Application1 发送的实际消息数据。由于它是一个字节数组,我们使用 UTF8 编码将其转换为字符串。我们将 Application1 发送的消息文本写入控制台屏幕。
处理完传入消息后,需要**确认**消息。这意味着消息已正确接收并正确处理。DotNetMQ 然后从消息队列中删除消息。我们也可以使用 Reject
方法拒绝消息(如果我们无法处理该消息)。在这种情况下,消息会返回到消息队列,稍后将发送给目标应用程序(如果同一服务器上存在 Application2 的另一个实例,则会发送到该实例)。这是 DotNetMQ 系统的一个强大机制。因此,消息不可能丢失,并且绝对可以得到处理。如果您不确认或拒绝消息,它将被视为已拒绝。因此,即使您的应用程序崩溃,您的消息也会稍后重新发送给您的应用程序。
如果您运行 **Application2 的多个实例**,哪个实例会接收消息?在这种情况下,DotNetMQ 会按顺序将消息传递给应用程序。因此,您可以创建多发送方/接收方系统。一条消息只由一个应用程序实例接收(应用程序接收不同的消息)。DotNetMQ 提供所有功能和同步。
消息的传输规则属性
在发送消息之前,您可以设置消息的**传输规则**,如下所示:
message.TransmitRule = MessageTransmitRules.NonPersistent;
有三种传输规则:
StoreAndForward
:这是默认的传输规则。消息是持久化的,不会丢失,并且保证送达。如果Send
方法未抛出异常,则消息已正确由 DotNetMQ 接收并存储在数据库中。它将存储在数据库中,直到目标应用程序接收并确认它。NonPersistent
**:**消息不存储在数据库中。这是**最快**的消息发送方式。只有当 DotNetMQ 服务器停止时,消息才会丢失。DirectlySend
**:**这是 DotNetMQ 的独有功能。这类消息直接发送给应用程序。发送应用程序将一直阻塞,直到接收方确认消息。因此,如果发送方在调用Send
方法时未收到任何异常,则表示消息已由接收方应用程序正确接收并确认。如果在传输消息时发生错误,接收方离线,或者接收方拒绝了消息,发送方将在Send
方法中收到异常。此规则即使应用程序在不同的服务器上(即使应用程序之间有很多服务器)也能正确工作。
由于默认传输规则是 StoreAndForward
,让我们试试:
- 运行 **Application1**(同时 Application2 未运行),输入一些消息,然后关闭应用程序。
- 运行 **Application2**,您将看到您的消息已被 Application2 接收并且没有丢失。
即使您在 Application1 发送消息后从 Windows 服务中停止 DotNetMQ 服务,您的消息也不会丢失。这称为**持久化**。
MDSClient 的通信方式属性
默认情况下,应用程序可以使用 MDSClient 发送和接收消息(CommunicationWays.SendAndReceive
)。如果应用程序不想接收消息,必须将 CommunicationWay
属性设置为 CommunicationWays.Send
。在连接到 DotNetMQ 之前或期间,可以更改此属性。
MDSClient 的出错重连服务器属性
默认情况下,如果 MDSClient 断开连接,它会**自动重新连接**到 DotNetMQ。因此,即使您重新启动 DotNetMQ,也无需重新启动已连接到 DotNetMQ 的应用程序。您可以将 ReConnectServerOnError
属性设置为 false
以禁用自动重连。
MDSClient 的自动确认消息属性
默认情况下,您必须在 **MessageReceived** 事件中**显式确认**消息。否则,它将被视为**已拒绝**。如果您想要这种方法的相反行为,则必须将 **AutoAcknowledgeMessages** 属性设置为**true**。在这种情况下,如果您的 MessageReceived **事件处理程序**未抛出**异常**,或者您没有显式确认/拒绝消息,它将被自动**确认**(如果抛出异常,消息将被拒绝)。
配置 DotNetMQ
您可以通过两种方式配置 DotNetMQ:使用**XML 设置文件**或**DotNetMQ Manager**(Windows Forms 应用程序)。在这里,我将展示这两种方法。一些配置需要您**重新启动** DotNetMQ,而另一些则不需要。
服务器
您可能只在**一台服务器**上运行 DotNetMQ。在这种情况下,无需为服务器配置任何内容。但是,如果您想在**多台服务器**上运行 DotNetMQ 并使它们相互通信,则必须定义您的**服务器图**。
服务器图由两个或多个**节点**组成。每个节点是一台服务器,具有**IP 地址**和**TCP 端口**(由 DotNetMQ 使用)。您可以使用 DotNetMQ Manager 配置/**设计**服务器图。
在上图中,您看到一个包含五个节点的服务器图。红色节点表示此服务器(此服务器是指您使用 DotNetMQ Manager 连接的服务器)。一条线表示两个节点之间存在连接(并且它们可以发送/接收消息)(它们被称为相邻节点)。服务器/节点在图中的名称很重要,用于发送消息到服务器。
您可以双击图中的服务器来更改其属性。要连接两个服务器,请按住 Ctrl 键,单击第一个服务器,然后单击第二个服务器(要断开连接,请执行相同的操作)。您可以通过右键单击并选择**设置为此服务器**来将服务器设置为当前服务器。您还可以通过右键单击菜单删除服务器或添加新服务器。最后,您可以拖动服务器来移动它们。
设计完服务器图后,必须单击**保存并更新图**按钮来保存更改。更改将保存到 DotNetMQ 安装文件夹中的*MDSSettings.xml*文件。您**必须重新启动** DotNetMQ 才能应用更改。
对于上图的服务器图,对应的*MDSSettings.xml*设置如下所示:
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
<Settings>
...
</Settings>
<Servers>
<Server Name="halil_pc" IpAddress="192.168.10.105"
Port="10099" Adjacents="emre_pc" />
<Server Name="emre_pc" IpAddress="192.168.10.244" Port="10099"
Adjacents="halil_pc,out_server,webserver1,webserver2" />
<Server Name="out_server" IpAddress="85.19.100.185"
Port="10099" Adjacents="emre_pc" />
<Server Name="webserver1" IpAddress="192.168.10.263"
Port="10099" Adjacents="emre_pc,webserver2" />
<Server Name="webserver2" IpAddress="192.168.10.44"
Port="10099" Adjacents="emre_pc,webserver1" />
</Servers>
<Applications>
...
</Applications>
<Routes>
...
</Routes>
</MDSConfiguration>
当然,此配置是根据您的实际网络进行的。您必须在图中的所有服务器上安装 DotNetMQ。此外,您必须在所有服务器上配置相同的图(您可以轻松地将 XML 中的服务器节点复制到其他服务器)。
DotNetMQ 使用**最短路径**算法来发送消息(如果在设置文件中未定义手动路由)。考虑运行在**halil_pc**上的**应用程序 A**,它正在向**webserver2**上的**应用程序 B**发送消息。路径很简单:应用程序 A -> **halil_pc -> emre_pc -> webserver2** -> 应用程序 B。halil_pc 通过使用服务器图定义知道下一个转发服务器(emre_pc)。
最后,*MDSSettings.design.xml*文件包含服务器设计信息(屏幕上节点的位置)。这仅在 DotNetMQ Manager 的服务器图窗口中需要,而 DotNetMQ 的运行时不需要。
应用程序
如图 - 5 所示,您可以**添加/删除**使用 DotNetMQ 作为消息中间件的应用程序。**无需重新启动** DotNetMQ 以进行这些更改。应用程序设置也保存在*MDSSettings.xml*文件中,如下所示:
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
...
<Applications>
<Application Name="Application1" />
<Application Name="Application2" />
</Applications>
...
</MDSConfiguration>
一个应用程序必须在此列表中才能连接到 DotNetMQ。如果您直接修改 XML 文件,则必须重新启动 DotNetMQ 服务器。
路由 / 负载均衡
DotNetMQ 的一个可用功能是路由。路由设置(目前)仅在 XML 设置文件(*MDSSettings.xml*)中配置。您可以在下面的设置文件中看到两种类型的路由:
<?xml version="1.0" encoding="utf-8" ?>
<MDSConfiguration>
...
<Routes>
<Route Name="Route-App2" DistributionType="Sequential" >
<Filters>
<Filter DestinationServer="this" DestinationApplication="Application1" />
</Filters>
<Destinations>
<Destination Server="Server-A" Application="Application1" RouteFactor="1" />
<Destination Server="Server-B" Application="Application1" RouteFactor="1" />
<Destination Server="Server-C" Application="Application1" RouteFactor="1" />
</Destinations>
</Route>
<Route Name="Route-App2" DistributionType="Random" >
<Filters>
<Filter DestinationServer="this" DestinationApplication="Application2" />
<Filter SourceApplication="Application2" TransmitRule="StoreAndForward" />
</Filters>
<Destinations>
<Destination Server="Server-A" Application="Application2" RouteFactor="1" />
<Destination Server="Server-B" Application="Application2" RouteFactor="3" />
</Destinations>
</Route>
</Routes>
...
</MDSConfiguration>
一个 Route
节点有两个属性:Name
是路由条目的用户友好名称(不影响路由),DistributionType
是路由策略。有两种路由策略:
Sequential
:消息按顺序路由到目标服务器。在分发时会考虑目标的RouteFactor
。Random
:消息随机路由到目标服务器。选择服务器 A 的概率为:(A 的 RouteFactor / 所有目标路由定义中所有 RouteFactor 值之和)。
**过滤器**用于决定将消息路由到哪个路由。如果消息的属性符合其中一个过滤器,则消息将被路由。有**五种条件**(XML 属性)用于定义过滤器:
SourceServer
:消息的第一个源服务器。可以是this
来表示此服务器。SourceApplication
:消息的发送方应用程序。DestinationServer
:消息的最后一个目标服务器。可以是this
来表示此服务器。DestinationApplication
:将接收消息的应用程序。TransmitRule
:以下传输规则之一:StoreAndForward
、DirectlySend
或NonPersistent
。
如果未声明一个或多个条件,则在过滤消息时不会考虑它。因此,如果所有条件都为空(或未声明),则所有消息都符合此过滤器。只有当所有条件都符合消息时,才会为消息选择一个过滤器。如果消息符合(至少)一个路由的过滤器,则选择并使用该路由。
**目标**用于将消息路由到其他服务器。根据 Route
条目的 DistributionType
属性(如前所述)选择其中一个目标。一个目标**必须定义三个属性**:
Server
:目标服务器。可以是this
来表示此服务器。Application
:目标应用程序。目标应用程序通常定义与其原始目标相同,但您可以将消息**重定向**到其原始目标应用程序以外的应用程序。RouteFactor
:此属性用于指示目标的相对选择比例。RouteFactor
属性可用于**负载均衡**。如果您想平均分配消息到所有服务器,可以将此值设置为 1。但如果您有两个服务器,其中一个比另一个更强大,您可以通过定义适当的路由因子来选择第一个服务器的频率是第二个服务器的两倍。
更改路由后,您**必须重新启动** DotNetMQ。
其他设置
DotNetMQ 目前支持三种**存储**类型:**SQLite**(默认)、**MySQL** 和**内存**。您可以在*MDSSettings.xml*文件中更改存储类型。
<?xml version="1.0" encoding="utf-8"?>
<MDSConfiguration>
...
<Settings>
<Setting Key="ThisServerName" Value="halil_pc" />
<Setting Key="StorageType" Value="SQLite" />
</Settings>
...
</MDSConfiguration>
存储类型必须是以下值之一:
- **SQLite**:使用 SQLite 数据库系统。这是默认存储类型。将* {DotNetMQ-Install-Directory}\SqliteDB\MDS.s3db* 文件用作数据库。
- **MSSQL**:使用 Microsoft SQL Server 数据库。您必须提供
ConnectionString
设置作为连接字符串(稍后解释)。 - **MySQL-ODBC**:使用 ODBC 连接 MySQL 数据库。您必须提供
ConnectionString
设置作为连接字符串。 - **MySQL-Net**:使用**.NET 适配器**连接 MySQL 数据库。您必须提供
ConnectionString
设置作为连接字符串。 - **Memory**:使用内存作为存储设备。在这种情况下,如果 DotNetMQ 停止,持久化消息将丢失。
以下是使用**MySQL-ODBC**存储类型的一个示例配置:
<Settings>
<Setting Key="ThisServerName" Value="halil_pc" />
<Setting Key="StorageType" Value="MySQL-ODBC" />
<Setting Key="ConnectionString"
Value="uid=root;server=localhost;driver={MySQL ODBC 3.51 Driver};database=mds" />
</Settings>
您可以在* **Setup\Databases*** 文件夹(在 DotNetMQ 安装文件夹中)中找到创建 DotNetMQ 使用的数据库和表所需的*. **文件。如果您有问题,请随时问我。
还有一个设置用于定义当前/此服务器的名称(ThisServerName
)。它必须是 Servers
部分中的一个服务器。如果您使用 DotNetMQ Manager 编辑您的服务器图,它会自动设置。
网络消息传递
将消息发送到远程服务器上的应用程序,就像发送消息到当前服务器上的应用程序一样简单。
一个简单的应用程序
让我们考虑下面的网络。
在 ServerA 上运行了一个应用程序(Application1),它想将消息发送到 ServerC 上的另一个应用程序(Application2),并且由于防火墙规则,ServerA 和 ServerC 之间没有直接连接。让我们修改我们在**第一个应用程序**部分开发的应用程序。
Application2 没有任何变化。只需在 ServerC 上运行 Application2 并等待传入的消息。
Application1 在发送消息的方式上有一个小的改动。它必须将消息的 DestinationServerName
设置为**ServerC**。
var message = mdsClient.CreateMessage();
message.DestinationServerName = "ServerC"; //Set destination server name here!
message.DestinationApplicationName = "Application2";
message.MessageData = Encoding.UTF8.GetBytes(messageText);
message.Send();
就是这样。您不必知道 ServerC 的位置,就可以直接连接到 ServerC……它们都定义在 DotNetMQ 设置中。请注意,如果您不设置消息的 DestinationServerName
,它将被假定为**当前/此**服务器,DotNetMQ 会将消息发送到同一台服务器上的应用程序。此外,如果您定义了必要的路由,则无需设置目标服务器:它将由 DotNetMQ 自动路由。
当然,DotNetMQ 设置必须根据服务器连接(服务器图)正确设置,并且 Application1 和 Application2 必须**注册**到 DotNetMQ 服务器,如**配置 DotNetMQ** 部分所述。
真实案例:**分布式短信处理器**
正如您已经看到的,DotNetMQ 可以用于构建**分布式**、**负载均衡**的应用程序系统。在本节中,我将讨论一个真实场景:一个分布式短信处理系统。
假设有一个短信服务,用于轮询音乐比赛。在所有参赛者演唱完歌曲后,观众发送“VOTE 103”之类的消息到我们的短信服务,为他们最喜欢的参赛者投票(103 是为特定参赛者投票的示例代码)。并且假设此轮询仅在 30 分钟内完成,大约有五百万人将发送短信到我们的服务。
我们将接收每条消息,处理它(解析短信文本,更新数据库以增加参赛者的投票计数),并向 SMS 发送者发送一条确认消息。我们需要从两个服务器接收消息,在四个服务器上处理消息,并在两个服务器上发送确认消息。我们总共有八台服务器。让我们看看完整的系统图:
有三种类型的应用程序:**接收器**、**处理器**和**发送器**。在这种场景下,您可以使用 DotNetMQ 作为消息队列和负载均衡器,通过配置**配置 DotNetMQ** 部分所述的服务器图和路由来构建分布式、可扩展的消息处理系统。
使用 DotNetMQ 的请求/响应式消息传递
在大多数情况下,一个应用程序将消息发送给另一个应用程序并获得响应消息。DotNetMQ 对这种类型的消息传递有**内置支持**。考虑一个用于查询股票状态的服务。有两种消息:
[Serializable]
public class StockQueryMessage
{
public string StockCode { get; set; }
}
[Serializable]
public class StockQueryResultMessage
{
public string StockCode { get; set; }
public int ReservedStockCount { get; set; }
public int TotalStockCount { get; set; }
}
简单的**股票服务器**代码如下所示:
using System;
using MDS;
using MDS.Client;
using StockCommonLib;
namespace StockServer
{
class Program
{
static void Main(string[] args)
{
var mdsClient = new MDSClient("StockServer");
mdsClient.MessageReceived += MDSClient_MessageReceived;
mdsClient.Connect();
Console.WriteLine("Press enter to exit...");
Console.ReadLine();
mdsClient.Disconnect();
}
static void MDSClient_MessageReceived(object sender,
MessageReceivedEventArgs e)
{
//Get message
var stockQueryMessage =
GeneralHelper.DeserializeObject(e.Message.MessageData)
as StockQueryMessage;
if (stockQueryMessage == null)
{
return;
}
//Write message content
Console.WriteLine("Stock Query Message for: " +
stockQueryMessage.StockCode);
//Get stock counts from a database...
int reservedStockCount;
int totalStockCount;
switch (stockQueryMessage.StockCode)
{
case "S01":
reservedStockCount = 14;
totalStockCount = 80;
break;
case "S02":
reservedStockCount = 0;
totalStockCount = 25;
break;
default: //Stock does not exists!
reservedStockCount = -1;
totalStockCount = -1;
break;
}
//Create a reply message for stock query
var stockQueryResult = new StockQueryResultMessage
{
StockCode = stockQueryMessage.StockCode,
ReservedStockCount = reservedStockCount,
TotalStockCount = totalStockCount
};
//Create a MDS response message to send to client
var responseMessage = e.Message.CreateResponseMessage();
responseMessage.MessageData =
GeneralHelper.SerializeObject(stockQueryResult);
//Send message
responseMessage.Send();
//Acknowledge the original request message.
//So, it will be deleted from queue.
e.Message.Acknowledge();
}
}
}
股票服务器监听传入的 StockQueryMessage
对象,并向发送方发送 StockQueryResultMessage
。为简单起见,我没有从数据库中选择股票。响应消息是由传入消息的 CreateResponseMessage()
方法创建的。最后,在发送响应后,消息被确认。现在我将展示一个简单的股票客户端代码来从服务器获取股票信息:
using System;
using MDS;
using MDS.Client;
using MDS.Communication.Messages;
using StockCommonLib;
namespace StockApplication
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Press enter to query a stock status");
Console.ReadLine();
//Connect to DotNetMQ
var mdsClient = new MDSClient("StockClient");
mdsClient.MessageReceived += mdsClient_MessageReceived;
mdsClient.Connect();
//Create a stock request message
var stockQueryMessage = new StockQueryMessage { StockCode = "S01" };
//Create a MDS message
var requestMessage = mdsClient.CreateMessage();
requestMessage.DestinationApplicationName = "StockServer";
requestMessage.TransmitRule = MessageTransmitRules.NonPersistent;
requestMessage.MessageData = GeneralHelper.SerializeObject(stockQueryMessage);
//Send message and get response
var responseMessage = requestMessage.SendAndGetResponse();
//Get stock query result message from response message
var stockResult = (StockQueryResultMessage)
GeneralHelper.DeserializeObject(responseMessage.MessageData);
//Write stock query result
Console.WriteLine("StockCode = " + stockResult.StockCode);
Console.WriteLine("ReservedStockCount = " + stockResult.ReservedStockCount);
Console.WriteLine("TotalStockCount = " + stockResult.TotalStockCount);
//Acknowledge received message
responseMessage.Acknowledge();
Console.ReadLine();
//Disconnect from DotNetMQ server.
mdsClient.Disconnect();
}
static void mdsClient_MessageReceived(object sender,
MessageReceivedEventArgs e)
{
//Simply acknowledge other received messages
e.Message.Acknowledge();
}
}
}
在上面的示例中,TransmitRule
被选为 NonPersistent
以展示示例用法。当然,您可以发送 StoreAndForward
(持久化)消息。这是运行应用程序的一个示例屏幕截图:
DotNetMQ 中的**面向服务**架构
**SOA**(面向服务架构)多年来一直是一个流行的概念。Web 服务和 WCF 是 SOA 的两个主要解决方案。通常,不期望消息队列系统支持 SOA。此外,消息传递是一个异步的、松耦合的过程,而 Web 服务方法调用通常是同步的、紧耦合的。即使(正如您在之前的示例应用程序中所见)消息传递不像调用远程方法那样简单。但是,当消息数量增加时,您的应用程序会变得复杂且难以维护。
**DotNetMQ 支持在持久化或非持久化消息之上的远程方法调用机制**。因此,您可以异步调用一个远程方法,该方法保证被调用且保证成功!
示例应用程序:**短信/邮件发送器**
在这里,我们将开发一个可以用来发送短信和电子邮件的简单服务。也许不需要编写服务来发送电子邮件/短信,所有应用程序都可以自己完成。但是,想象一下您有很多应用程序在发送电子邮件。如果邮件服务器在发送电子邮件时遇到问题怎么办?应用程序必须尝试直到成功发送电子邮件。因此,您必须在应用程序中构建一个队列机制来一次又一次地尝试发送电子邮件。最坏的情况下,您的应用程序可能是一个短时间运行的应用程序(例如 Web 服务),或者必须在发送电子邮件之前关闭。但是,您必须在邮件服务器上线时发送电子邮件,并且不能丢失邮件。
在这种情况下,您可以开发一个单独的邮件/短信服务,该服务将尝试发送短信/邮件直到成功发送。您可以开发一个邮件服务,该服务通过 DotNetMQ 接收邮件请求,并且仅在电子邮件成功发送后才确认请求(消息)。如果发送失败,只需不确认(或拒绝)消息,稍后将再次尝试发送(**可靠性**)。您可以使用 MDSService
类的 IncomingMessage
属性访问原始消息。此外,您可以使用 RemoteApplication
属性获取远程应用程序(调用服务方法的应用程序)的信息。
服务
我们将首先开发邮件/短信服务。为此,我们必须定义一个继承自 MDSService
基类的类:
using System;
using MDS.Client.MDSServices;
namespace SmsMailServer
{
[MDSService(Description = "This service is a " +
"sample mail/sms service.", Version = "1.0.0.0")]
public class MyMailSmsService : MDSService
{
//All parameters and return values can be defined.
[MDSServiceMethod(Description = "This method is used send an SMS.")]
public void SendSms(
[MDSServiceMethodParameter("Phone number to send SMS.")] string phone,
[MDSServiceMethodParameter("SMS text to be sent.")] string smsText)
{
//Process SMS
Console.WriteLine("Sending SMS to phone: " + phone);
Console.WriteLine("Sms Text: " + smsText);
//Acknowledge the message
IncomingMessage.Acknowledge();
}
//You do not have to define any parameters
[MDSServiceMethod]
public void SendEmail(string emailAddress, string header, string body)
{
//Process email
Console.WriteLine("Sending an email to " + emailAddress);
Console.WriteLine("Header: " + header);
Console.WriteLine("Body : " + body);
//Acknowledge the message
IncomingMessage.Acknowledge();
}
// A simple method just to show return values.
[MDSServiceMethod]
[return: MDSServiceMethodParameter("True, if phone number is valid.")]
public bool IsValidPhone([MDSServiceMethodParameter(
"Phone number to send SMS.")] string phone)
{
//Acknowledge the message
IncomingMessage.Acknowledge();
//Return result
return (phone.Length == 10);
}
}
}
如您所见,它只是一个普通的 C# 类,带有属性。必须定义 MDSService
和 MDSServiceMethod
属性,所有其他属性都是可选的(但最好写上)。我们稍后会看到它们为什么被使用)。您的服务方法必须具有 MDSServiceMethod
属性。如果您不想公开某些方法,只需不添加 MDSServiceMethod
属性即可。
我们还必须在服务方法中**确认**消息。否则,(导致此方法调用的)消息将不会从消息队列中删除,并且我们的方法将再次被调用。如果无法处理消息(例如,如果邮件服务器未运行并且我们无法发送电子邮件),我们也可以**拒绝**该消息。如果我们拒绝该消息,它将稍后发送给我们(**可靠性**)。您可以使用 MDSService
类的 IncomingMessage
属性访问原始消息。此外,您可以使用 RemoteApplication
属性获取远程应用程序(调用服务方法的应用程序)的信息。
创建合适的类后,我们必须创建一个应用程序来运行它。这是一个运行我们的 MyMailSmsService
的简单控制台应用程序:
using System;
using MDS.Client.MDSServices;
namespace SmsMailServer
{
class Program
{
static void Main(string[] args)
{
using (var service =
new MDSServiceApplication("MyMailSmsService"))
{
service.AddService(new MyMailSmsService());
service.Connect();
Console.WriteLine("Press any key to stop service");
Console.ReadLine();
}
}
}
}
如您所见,仅用三行代码即可创建和运行服务。由于 MDSService
是可 disposable 的,您可以使用 using
语句。此外,您可以使用 MDSServiceApplication
的 Disconnect
方法手动关闭服务。您可以使用 AddService
方法在单个 MDSServiceApplication
上运行多个服务。
**客户端**
要开发使用 DotNetMQ 服务的应用程序,您必须创建一个**服务代理**(类似于 Web 服务和 WCF)。为此,您可以使用**MDSServiceProxyGenerator**工具。首先,编译您的服务项目,然后运行*MDSServiceProxyGenerator.exe*(在 DotNetMQ 安装文件夹中)。
选择您的服务**程序集**文件(在本示例项目中为*SmsMailServer.exe*)。您可以选择**服务类**或为给定程序集中的所有服务生成代理。输入**命名空间**和**目标文件夹**以**生成代理类**。生成代理类后,您可以将其**添加**到您的项目中。
我不会展示这个代理类的内部结构,您也不必了解它(您可以在源代码中看到,它是一个非常简单的类)。您的方法/参数**属性**用于在该代理文件中生成**代码注释**。
将生成的代理类添加到我们的项目后,我们可以像**简单方法调用**一样轻松地向服务发送消息:
using System;
using MDS.Client;
using MDS.Client.MDSServices;
using SampleService;
namespace SmsMailClient
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Press enter to test SendSms method");
Console.ReadLine();
//Application3 is name of an application that sends sms/email.
using (var serviceConsumer = new MDSServiceConsumer("Application3"))
{
//Connect to DotNetMQ server
serviceConsumer.Connect();
//Create service proxy to call remote methods
var service = new MyMailSmsServiceProxy(serviceConsumer,
new MDSRemoteAppEndPoint("MyMailSmsService"));
//Call SendSms method
service.SendSms("3221234567", "Hello service!");
}
}
}
}
您还可以调用服务的其他方法,并像常规方法调用一样获取返回值。实际上,您的方法调用被转换为可靠的消息。例如,即使在调用 SendSms
时远程应用程序(MyMailSmsService
)未运行,当服务启动运行时也会调用它,因此您的方法调用也保证会被调用。
您可以使用服务代理的 TransmitRule
属性来更改消息传递的传输规则。如果服务方法返回 void
,则其传输规则默认为 StoreAndForward
。如果服务方法返回值,则无法进行方法调用(因为方法调用是同步的且等待结果),其规则为 DirectlySend
。
您可以选择任何类型作为方法参数。如果它是基本类型(string
、int
、byte
...),则无需额外设置,但如果您想使用自己的类作为方法参数,则该类必须标记为 Serializable
,因为 DotNetMQ 对参数使用二进制序列化。
**请注意**,在运行此示例之前,您必须将 **MyMailSmsService** 和 **Application3** **注册**到 DotNetMQ。
Web 服务支持
当然,您可以在 Web 服务中连接到 DotNetMQ,因为它也是一个 .NET 应用程序。但是,如果您想编写一个 ASP.NET Web 方法来处理应用程序的消息(并在同一上下文中回复消息)怎么办?Web 服务适用于此类请求/响应式方法调用。
DotNetMQ 支持 ASP.NET Web 服务,并可以向 Web 服务传递消息。示例中有一个模板 Web 服务(在下载文件中)可以完成此任务。它定义如下:
using System;
using System.Web.Services;
using MDS.Client.WebServices;
[WebService(Namespace = "http://www.dotnetmq.com/mds")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
public class MDSAppService : WebService
{
/// <summary>
/// MDS server sends messages to this method.
/// </summary>
/// <param name="bytesOfMessage">Byte array form of message</param>
/// <returns>Response message to incoming message</returns>
[WebMethod(Description = "Receives incoming messages to this web service.")]
public byte[] ReceiveMDSMessage(byte[] bytesOfMessage)
{
var message = WebServiceHelper.DeserializeMessage(bytesOfMessage);
try
{
var response = ProcessMDSMessage(message);
return WebServiceHelper.SerializeMessage(response);
}
catch (Exception ex)
{
var response = message.CreateResponseMessage();
response.Result.Success = false;
response.Result.ResultText =
"Error in ProcessMDSMessage method: " + ex.Message;
return WebServiceHelper.SerializeMessage(response);
}
}
/// <summary>
/// Processes incoming messages to this web service.
/// </summary>
/// <param name="message">Message to process</param>
/// <returns>Response Message</returns>
private IWebServiceResponseMessage
ProcessMDSMessage(IWebServiceIncomingMessage message)
{
//Process message
//Send response/result
var response = message.CreateResponseMessage();
response.Result.Success = true;
return response;
}
}
您不应更改 ReceiveMDSMessage
方法,而应在 ProcessMDSMessage
方法中处理消息,如上所示。此外,您必须在 MDSSettings.xml 中定义您的 Web 服务的**地址**,如下所示。您也可以使用 DotNetMQ 管理工具添加 Web 服务。
...
<Applications>
<Application Name="SampleWebServiceApp">
<Communication Type="WebService"
Url="https:///SampleWebApplication/SampleService.asmx" />
</Application>
</Applications>
...
DotNetMQ 的性能
以下是 DotNetMQ 中消息传递的一些测试结果:
消息传递
- **10,000** 条消息,**持久化**方式,耗时约 **25** 秒(约 **400** 条消息/秒)。
- **10,000** 条消息,**非持久化**方式,耗时约 **3.5** 秒(约 **2,850** 条消息/秒)。
方法调用(在 DotNetMQ 服务中)
- **10,000** 次方法调用,**持久化**方式,耗时约 **25** 秒(约 **400** 次调用/秒)。
- **10,000** 次方法调用,**非持久化**方式,耗时约 **8.7** 秒(约 **1,150** 次调用/秒)。
测试平台:Intel Core 2 Duo 3.00 GHz CPU。2 GB RAM PC。消息/调用是在运行在同一台计算机上的两个应用程序之间进行的。
参考
历史
- **2011.05.23**(DotNetMQ v0.9.1.0)
- 添加了 Microsoft SQL Server 数据库支持。
- 将
MySQLConnectionString
设置更改为ConnectionString
。 - 源代码已更新。
- 文章已根据更改进行更新。
- **2011.05.16**(DotNetMQ v0.9.0.0)
- 向下载文件中添加了示例 Web 服务模板。
- 文章中的一些修复和补充。
- **2011.05.09**(DotNetMQ v0.9.0.0)
- 首次发布。