使用发布/订阅模式和命名管道作为传输实现进程间通信





5.00/5 (2投票s)
MetaPubSub 库简介 - 一个实现进程间通信层的发布/订阅模式
引言
在我当前的工作项目中,我们开始遇到软件模块之间关系数量的问题,并决定使用发布/订阅模式来传输事件和命令,以减少耦合。像往常一样,没有现有的实现适合我们,所以我开始开发自己的库。
该模式的实现并不是一项非常困难的任务。第一个版本在我们的项目中得到了应用,并且效果非常好。但由于我们的应用程序由多个进程和服务组成,我们立即希望根据相同的发布/订阅原则将它们链接成一个整体。因此,我们决定扩展该库以支持进程间通信。
在开发过程中,提出了以下要求:
- 易用性 - 所有功能都应该与本地 PubSub 模块一样工作,并且订阅者原则上甚至可能不知道消息来自哪里——是来自他们自己的进程还是远程。
- 必须保证投递 - 如果消息无法传输,发布者将立即收到异常。
- 在处理消息期间发生的所有异常也必须被序列化并传回发送者。
- 消息必须独立处理,不排队。
- 必须支持异步调用模型 (async/await)。
- 服务器和客户端集线器作为一个整体工作 - 无论发布者或订阅者位于哪个进程。
- 进程之间的连接应自动恢复。
什么是 PubSub?
首先,值得了解发布/订阅模式是什么以及如何使用它。正如维基百科所说:
在软件架构中,发布/订阅是一种消息传递模式,其中消息的发送者(称为发布者)不直接将消息编程发送给特定的接收者(称为订阅者),而是将已发布的消息分类到不同的类别中,而不知道可能存在哪些订阅者。同样,订阅者表达对一个或多个类别的兴趣,并且只接收感兴趣的消息,而不知道可能存在哪些发布者。
在我的 MetaPubSub
模块中,实现了最常见的基本功能和一些高级功能:
- 双向通信
- 与消费者的近乎实时交互
Subscribe
和Publish
方法的独立顺序- 可等待的方法,例如,您可以
await Publish
并等待所有订阅者完成消息处理 - 至少一次投递检查 - 如果没有人订阅您的消息,您可以选择抛出异常
- 消息过滤 - 您可以定义一个谓词来仅订阅您想要处理的消息
- 等待订阅者的超时 - 您的消息可以排队并等待有人订阅并处理
- 调度消息 - 您的消息可以排队并在延迟后发布
- 通过单个方法调用异步等待指定消息,无需订阅/取消订阅该消息(
When
方法) - 请求-响应模式 - 发送消息并等待响应,作为单个可等待方法,无需订阅/取消订阅响应消息(
Process
方法) - 取消令牌支持 - 您可以取消调度或等待消息
- 异常处理 - 订阅者处理消息时引发的所有异常都可以作为
AggregateException
被发布者捕获
如何使用示例
准备消息类
每个消息类都必须派生自 IPubSubMessage
接口
public interface IPubSubMessage
{
bool DeliverAtLeastOnce { get; }
int Timeout { get; }
string RemoteConnectionId { get; set; }
}
此外,您还可以派生自 PubSubMessageBase
,其声明如下:
public class PubSubMessageBase : IPubSubMessage
{
public bool DeliverAtLeastOnce { get; set; }
public int Timeout { get; set; }
public string RemoteConnectionId { get; set; }
}
或者您可以定义自己的基类
public class MessageBase : IPubSubMessage
{
// set this to be the default value for all derived classes
public bool DeliverAtLeastOnce => true;
// default timeout in milliseconds
public int Timeout => 1000;
// required for internal message processing, leave it as is
public string RemoteConnectionId { get; set; }
}
因此,您的消息声明应该如下所示:
public class MyMessage : MessageBase // or PubSubMessageBase
{
public string SomeData { get; }
public MyMessage(string data)
{
SomeData = data;
}
}
创建集线器
// hub creation
var hub = new MetaPubSub();
// subscribing to MyMessage
hub.Subscribe<MyMessage>(OnMyMessage);
// publishing a message
await hub.Publish(new MyMessage());
// unsubscribing
hub.Unsubscribe<MyMessage>(OnMyMessage);
异常处理
var hub = new MetaPubSub();
try
{
var message = new MyMessage
{
DeliverAtLeastOnce = true,
};
// publishing a message when no one subscribed - NoSubscribersException
//await hub.Publish(message);
// publishing a message when no one subscribed and Timeout > 0 - TimeoutException
//message.Timeout = 100;
//await hub.Publish(message);
hub.Subscribe<MyMessage>(OnMyMessageHandlerWithException);
// publishing a message
await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
// No one is subscribed to this message and
// (message.DeliverAtLeastOnce == true and message.Timeout == 0)
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (TimeoutException ex)
{
// No one is subscribed to this message and
// (message.DeliverAtLeastOnce == true and message.Timeout > 0)
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (AggregateException ex)
{
// All exceptions raised when a message processing by subscribers
// can be caught by the publisher as an AggregateException.
// If some of the subscribers throw an exception, other subscribers
// continues to process the message.
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"\tInner Exception {innerEx.GetType()}: {innerEx.Message}");
}
}
await hub.Unsubscribe<MyMessage>(OnMyMessageHandlerWithException);
至少一次投递检查
var hub = new MetaPubSub();
var message = new MyMessage
{
// if this not set, NoSubscribersException will not be thrown
DeliverAtLeastOnce = true
};
try
{
// publishing a message when no one is subscribed
await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
// no one is listening
Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
hub.Subscribe<MyMessage>(OnMyMessage);
await hub.Publish(message);
hub.Unsubscribe<MyMessage>(OnMyMessage);
您可以在项目的 GitHub 页面 上查看示例的完整列表。
添加进程间通信层
为了让我们的集线器能够传输进程之间的消息,您需要通过某种数据传输机制将它们连接起来。可以是 Sockets、COM、共享文件、DDE、RPC 或任何其他方式。但我最终选择了命名管道 - 它相当简单、方便,并且其性能和可靠性对我来说都足够了。
因此,一个集线器应该成为服务器,所有其他集线器都成为客户端。要做到这一点,在服务器端,您需要调用 StartServer
方法。
var serverHub = new MetaPubSub();
// Starting the hub as a server named 'Meta'.
serverHub.StartServer("Meta");
在客户端,您需要连接到服务器。
var clientHub = new MetaPubSub();
// Connecting to the remote server.
await clientHub.ConnectToServer("Meta");
现在,如果客户端想从服务器接收事件,它必须订阅它。
await clientHub.SubscribeOnServer<MyMessage>(Handler);
如果客户端想将事件发送到服务器,它会这样做:
await clientHub.PublishOnServer(new MyMessage());
如果服务器想发布消息,它会像处理本地消息一样进行。
await serverHub.Publish(new MyMessage());
如果服务器想接收消息,它会像处理本地消息一样订阅。
serverHub.Subscribe<MyMessage>(Handler);
在这种情况下,Subscribe
调用不会被等待,因为它没有耗时的操作,例如网络传输或消息处理。
它是如何工作的?
每条消息都会获得一个 GUID,被序列化为 JSON,并通过命名管道进行传递。为了等待结果,会创建一个 TaskCompletionSource
并由调用代码等待。接收方反序列化消息并将其发送给订阅者进行处理。处理完成后,会生成一个包含相同 GUID 和所有异常(如果有)的结果,并将其发送回发送方。结果会再次被反序列化并传递给 TaskCompletionSource
。最终,调用结束或抛出异常。
通过命名管道发送和接收是独立进行的,在不同的线程上。消息响应可能会以任何顺序到达,无论请求发送的顺序如何。所有 MetaPubSub
方法都是线程安全的。
连接处理
客户端会自动重新建立与服务器丢失的连接。但如果客户端启动时与服务器的连接尚未建立怎么办?客户端在尝试调用 SubscribeOnServer
方法时会收到异常。接下来,我们可以实现等待连接的逻辑并订阅所有必要的事件。但这对每个订阅服务器事件的类都必须这样做,这会导致源代码增长和重复。在这种情况下,TryConnect
和 TrySubscribeOnServer
方法派上了用场。
TryConnect
- 连接到服务器将在其可用时立即建立。如果立即建立连接,则返回true
,否则返回false
。TrySubscribeOnServer
- 本地订阅消息,然后尝试在服务器上订阅。如果服务器未连接,则返回false
。连接建立后会自动进行订阅。
使用这些方法,您不再需要担心调用方法时是否存在与服务器的连接。
使用限制
发布/订阅模式是一个非常方便且有用的架构模式,但它也有其狭窄的适用范围。例如,在以下一些情况下,您可能更适合使用消息队列模式:
- 当消息处理的顺序很重要时
- 如果必须保证每条消息都能被处理,即使客户端和服务器之间的通信中断
- 当您需要并行化消息在多个处理程序之间的流时
- 当您需要在服务器关闭时将所有未处理的消息持久化到磁盘时
- 当需要消息优先级(QoS)时
结论
MetaPubSub
模块已在我的工作项目中进行了测试,虽然我不能保证它 100% 没有错误,但我相信它已经可以用于生产代码。但应该记住,在客户端连接到服务器时尚未实现授权系统。在开发中,我试图特别注意请求的执行速度,并使所有代码无锁。最好进行测试并与其他类似系统进行性能比较,也许用户中的某个人能够做到这一点并分享结果。
您可以在 GitHub 仓库 中查看源代码、示例、留下问题或提交拉取请求。
历史
- 2020年10月15日:初始版本