65.9K
CodeProject 正在变化。 阅读更多。
Home

使用发布/订阅模式和命名管道作为传输实现进程间通信

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2020 年 10 月 15 日

CPOL

7分钟阅读

viewsIcon

12486

downloadIcon

166

MetaPubSub 库简介 - 一个实现进程间通信层的发布/订阅模式

引言

在我当前的工作项目中,我们开始遇到软件模块之间关系数量的问题,并决定使用发布/订阅模式来传输事件和命令,以减少耦合。像往常一样,没有现有的实现适合我们,所以我开始开发自己的库。

该模式的实现并不是一项非常困难的任务。第一个版本在我们的项目中得到了应用,并且效果非常好。但由于我们的应用程序由多个进程和服务组成,我们立即希望根据相同的发布/订阅原则将它们链接成一个整体。因此,我们决定扩展该库以支持进程间通信。

在开发过程中,提出了以下要求:

  • 易用性 - 所有功能都应该与本地 PubSub 模块一样工作,并且订阅者原则上甚至可能不知道消息来自哪里——是来自他们自己的进程还是远程。
  • 必须保证投递 - 如果消息无法传输,发布者将立即收到异常。
  • 在处理消息期间发生的所有异常也必须被序列化并传回发送者。
  • 消息必须独立处理,不排队。
  • 必须支持异步调用模型 (async/await)。
  • 服务器和客户端集线器作为一个整体工作 - 无论发布者或订阅者位于哪个进程。
  • 进程之间的连接应自动恢复。

什么是 PubSub?

首先,值得了解发布/订阅模式是什么以及如何使用它。正如维基百科所说:

在软件架构中,发布/订阅是一种消息传递模式,其中消息的发送者(称为发布者)不直接将消息编程发送给特定的接收者(称为订阅者),而是将已发布的消息分类到不同的类别中,而不知道可能存在哪些订阅者。同样,订阅者表达对一个或多个类别的兴趣,并且只接收感兴趣的消息,而不知道可能存在哪些发布者。

在我的 MetaPubSub 模块中,实现了最常见的基本功能和一些高级功能:

  • 双向通信
  • 与消费者的近乎实时交互
  • SubscribePublish 方法的独立顺序
  • 可等待的方法,例如,您可以 await Publish 并等待所有订阅者完成消息处理
  • 至少一次投递检查 - 如果没有人订阅您的消息,您可以选择抛出异常
  • 消息过滤 - 您可以定义一个谓词来仅订阅您想要处理的消息
  • 等待订阅者的超时 - 您的消息可以排队并等待有人订阅并处理
  • 调度消息 - 您的消息可以排队并在延迟后发布
  • 通过单个方法调用异步等待指定消息,无需订阅/取消订阅该消息(When 方法)
  • 请求-响应模式 - 发送消息并等待响应,作为单个可等待方法,无需订阅/取消订阅响应消息(Process 方法)
  • 取消令牌支持 - 您可以取消调度或等待消息
  • 异常处理 - 订阅者处理消息时引发的所有异常都可以作为 AggregateException 被发布者捕获

如何使用示例

准备消息类

每个消息类都必须派生自 IPubSubMessage 接口

public interface IPubSubMessage
{
    bool DeliverAtLeastOnce { get; }
    int Timeout { get; }
    string RemoteConnectionId { get; set; }
}

此外,您还可以派生自 PubSubMessageBase,其声明如下:

public class PubSubMessageBase : IPubSubMessage
{
    public bool DeliverAtLeastOnce { get; set; }

    public int Timeout { get; set; }

    public string RemoteConnectionId { get; set; }
}

或者您可以定义自己的基类

public class MessageBase : IPubSubMessage
{
    // set this to be the default value for all derived classes
    public bool DeliverAtLeastOnce => true;

    // default timeout in milliseconds
    public int Timeout => 1000;

    // required for internal message processing, leave it as is
    public string RemoteConnectionId { get; set; }
}

因此,您的消息声明应该如下所示:

public class MyMessage : MessageBase // or PubSubMessageBase
{
    public string SomeData { get; }

    public MyMessage(string data)
    {
        SomeData = data;
    }
}

创建集线器

// hub creation
var hub = new MetaPubSub();

// subscribing to MyMessage
hub.Subscribe<MyMessage>(OnMyMessage);

// publishing a message
await hub.Publish(new MyMessage());

// unsubscribing
hub.Unsubscribe<MyMessage>(OnMyMessage);

异常处理

var hub = new MetaPubSub();

try
{
    var message = new MyMessage
    {
        DeliverAtLeastOnce = true,
    };

    // publishing a message when no one subscribed - NoSubscribersException
    //await hub.Publish(message);

    // publishing a message when no one subscribed and Timeout > 0 - TimeoutException
    //message.Timeout = 100;
    //await hub.Publish(message);

    hub.Subscribe<MyMessage>(OnMyMessageHandlerWithException);

    // publishing a message
    await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
    // No one is subscribed to this message and 
    // (message.DeliverAtLeastOnce == true and message.Timeout == 0)
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (TimeoutException ex)
{
    // No one is subscribed to this message and 
    // (message.DeliverAtLeastOnce == true and message.Timeout > 0)
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}
catch (AggregateException ex)
{
    // All exceptions raised when a message processing by subscribers 
    // can be caught by the publisher as an AggregateException.
    // If some of the subscribers throw an exception, other subscribers 
    // continues to process the message.
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
    foreach (var innerEx in ex.InnerExceptions)
    {
        Console.WriteLine($"\tInner Exception {innerEx.GetType()}: {innerEx.Message}");
    }
}

await hub.Unsubscribe<MyMessage>(OnMyMessageHandlerWithException);

至少一次投递检查

var hub = new MetaPubSub();

var message = new MyMessage
{
    // if this not set, NoSubscribersException will not be thrown
    DeliverAtLeastOnce = true
};

try
{
    // publishing a message when no one is subscribed
    await hub.Publish(message);
}
catch (NoSubscribersException ex)
{
    // no one is listening
    Console.WriteLine($"Exception {ex.GetType()}: {ex.Message}");
}

hub.Subscribe<MyMessage>(OnMyMessage);
await hub.Publish(message);
hub.Unsubscribe<MyMessage>(OnMyMessage);

您可以在项目的 GitHub 页面 上查看示例的完整列表。

添加进程间通信层

为了让我们的集线器能够传输进程之间的消息,您需要通过某种数据传输机制将它们连接起来。可以是 Sockets、COM、共享文件、DDE、RPC 或任何其他方式。但我最终选择了命名管道 - 它相当简单、方便,并且其性能和可靠性对我来说都足够了。

因此,一个集线器应该成为服务器,所有其他集线器都成为客户端。要做到这一点,在服务器端,您需要调用 StartServer 方法。

var serverHub = new MetaPubSub();

// Starting the hub as a server named 'Meta'.
serverHub.StartServer("Meta");

在客户端,您需要连接到服务器。

var clientHub = new MetaPubSub();

// Connecting to the remote server.
await clientHub.ConnectToServer("Meta");

现在,如果客户端想从服务器接收事件,它必须订阅它。

await clientHub.SubscribeOnServer<MyMessage>(Handler);

如果客户端想将事件发送到服务器,它会这样做:

await clientHub.PublishOnServer(new MyMessage());

如果服务器想发布消息,它会像处理本地消息一样进行。

await serverHub.Publish(new MyMessage());

如果服务器想接收消息,它会像处理本地消息一样订阅。

serverHub.Subscribe<MyMessage>(Handler);

在这种情况下,Subscribe 调用不会被等待,因为它没有耗时的操作,例如网络传输或消息处理。

它是如何工作的?

每条消息都会获得一个 GUID,被序列化为 JSON,并通过命名管道进行传递。为了等待结果,会创建一个 TaskCompletionSource 并由调用代码等待。接收方反序列化消息并将其发送给订阅者进行处理。处理完成后,会生成一个包含相同 GUID 和所有异常(如果有)的结果,并将其发送回发送方。结果会再次被反序列化并传递给 TaskCompletionSource。最终,调用结束或抛出异常。

通过命名管道发送和接收是独立进行的,在不同的线程上。消息响应可能会以任何顺序到达,无论请求发送的顺序如何。所有 MetaPubSub 方法都是线程安全的。

连接处理

客户端会自动重新建立与服务器丢失的连接。但如果客户端启动时与服务器的连接尚未建立怎么办?客户端在尝试调用 SubscribeOnServer 方法时会收到异常。接下来,我们可以实现等待连接的逻辑并订阅所有必要的事件。但这对每个订阅服务器事件的类都必须这样做,这会导致源代码增长和重复。在这种情况下,TryConnectTrySubscribeOnServer 方法派上了用场。

  • TryConnect - 连接到服务器将在其可用时立即建立。如果立即建立连接,则返回 true,否则返回 false
  • TrySubscribeOnServer - 本地订阅消息,然后尝试在服务器上订阅。如果服务器未连接,则返回 false。连接建立后会自动进行订阅。

使用这些方法,您不再需要担心调用方法时是否存在与服务器的连接。

使用限制

发布/订阅模式是一个非常方便且有用的架构模式,但它也有其狭窄的适用范围。例如,在以下一些情况下,您可能更适合使用消息队列模式:

  • 当消息处理的顺序很重要时
  • 如果必须保证每条消息都能被处理,即使客户端和服务器之间的通信中断
  • 当您需要并行化消息在多个处理程序之间的流时
  • 当您需要在服务器关闭时将所有未处理的消息持久化到磁盘时
  • 当需要消息优先级(QoS)时

结论

MetaPubSub 模块已在我的工作项目中进行了测试,虽然我不能保证它 100% 没有错误,但我相信它已经可以用于生产代码。但应该记住,在客户端连接到服务器时尚未实现授权系统。在开发中,我试图特别注意请求的执行速度,并使所有代码无锁。最好进行测试并与其他类似系统进行性能比较,也许用户中的某个人能够做到这一点并分享结果。

您可以在 GitHub 仓库 中查看源代码、示例、留下问题或提交拉取请求。

历史

  • 2020年10月15日:初始版本
© . All rights reserved.