统一消息总线框架 - 第一部分





5.00/5 (5投票s)
支持多种传输和消息总线技术,集成以提供统一的消息框架,从而在任何平台之间进行通信的架构。
警告
这并非一个简单的演示项目。它包含大量架构和高级概念,涵盖 12 个项目。如果您选择创建自己的 Azure Service Bus(我推荐这样做),演示将完全按描述运行,但即使没有它,如果您深入研究,仍然可以从中获得很多收益。大部分 UI 元素都是原生的,因为我试图将重点放在发布/订阅方面,而不是如何制作漂亮的应用。
引言
如今,大多数开发都朝着跨平台方向发展,因此我想扩展我之前的关于全局服务总线架构的文章,加入一个支持所有平台之间消息传递的基础功能框架。
我之前讨论过建立一个传输提供程序架构,使用外观模式和适配器模式来简化消息的统一消耗。在这种情况下,传输技术上是指移动消息的底层物理机制(例如 MSMQ、套接字等);然而,我使用这个术语更宽松一些。在这种情况下,我也将传输涵盖了任何服务总线或更高级别的消息传递实现。
对于这篇新文章和演示(可能还会有一个开源项目),我一开始就选择了五种传输方式:
- Azure Service Bus
- 注意:您必须创建自己的 Azure 帐户和 Azure Service Bus 才能使用此功能。
- SignalR
- WebApi
- ZeroMQ
- 注意:由于旧的 NuGet 包问题,这一个可能会很麻烦。
- Reactive Extensions
通过这些传输,您可以在任何平台之间发送消息。例如,一个 Web 客户端可以发送一条消息,该消息可以被 WPF 应用程序消费,反之亦然。
在即将进行的更新中,我将努力使其更简洁,但我希望在第一次接触 Google Cloud Pub/Sub 之前发布我已有的内容。此外,由于缺乏最近的 NuGet 更新,ZeroMQ 和 SignalR 出现了一些意料之外的怪癖,所以我不想投入太多精力使它们更简洁,希望更新能解决这些问题。
在未来的文章中,我将深入探讨实际的提供程序实现。现在,让我们先讨论基本概念、架构和演示项目……
背景
在阅读本文之前,您应该熟悉
- IOC 开发
- MEF
- 发布/订阅
- 异步编程
- Observables (可观察对象)
- 适配器模式
- 外观模式
- Azure Service Bus
请阅读之前的文章以获取架构概念的更多背景信息。
有三种不同的应用程序可以运行来发送消息
- 控制台
- Web
- WPF
如果您想处理 Web 项目,您需要 ASP.NET 和 WebApi 的背景知识。如果您想处理 WPF 项目,您需要熟悉 PRISM。
架构
我不会重复整篇前文,但我确实想就多平台考虑的架构进行更深入的探讨。
我已经将提供程序的 Scope 扩展到了以下范围:
- 全局
- 本地机器
- Application
全局消息用于发出系统范围的命令或发送域级对象的更新。虽然演示只是传递文本数据,但消息框架甚至可以被应用于物联网领域,用于高级控制器。
由于全局消息的发送和消费成本最高,因此在每个应用程序中拥有一个集中的服务来处理全局消息至关重要。消息服务负责通过 `Bus` 将消息广播到本地的 `IUnifiedMessageBusProvider`,或通过推送到本地的 Observable。
该 `Bus` 还确保每条消息只被拾取一次。如果您发出一个耗时或资源密集型的命令,您显然只希望执行一次。此设计的优点是允许有多个 `IUnifiedMessageBusProvider` 来实现冗余或专门的吞吐量,而无需任何消费者自己过滤重复项。
应用程序范围的消息仅与应用程序相关。如前所述,这包括在必要时将全局消息重新广播到每个应用程序的消费者。另一个例子是用户偏好设置。特定应用程序的用户偏好设置(例如,配置布局、控件大小等)通常只与该特定应用程序相关。显然,您不希望处理来自不同应用程序的布局更改。
应用程序范围消息的最大优点是无需进行序列化。由于对象已经在内存中,您只需传递它即可。应用程序范围消息的陷阱是可能导致内存泄漏。订阅通常针对静态或长生命周期的对象,因此您需要确保在不需要时显式处理所有订阅。
本地机器消息的范围最有限,但适用于分布式应用程序。标准的建议是通过服务和 API 进行通信,但可以通过发布/订阅来替代。与公开方法相比,其优点是可以将消息类型限制为特定应用程序,从而在应用程序之间创建独占的、隐藏的通信。
最终,您通过提供商网络连接应用程序。使用 Azure Service Bus 等全局服务总线作为骨干,然后推送到适当的本地 `IUnifiedMessageBusProvider`。只要两个应用程序共享至少一个 `IUnifiedMessageBusProvider`,它们就可以将任何消息重新广播到原始发送方没有意识到的(或无法支持的)其他 `IUnifiedMessageBusProvider`。
每个基础 `IUnifiedMessageBusProvider` 实现都是抽象的,应由共享的具体实例实现——您不希望两段独立的代码尝试从同一资源中拉取消息。您可以随意从同一个基类创建多个具体的 `IUnifiedMessageBusProvider` 实现,只需将其配置为使用不同的主题/订阅/资源。
此外,具体实现可能需要插入应用程序的不同部分,以用于启动和协调等事务,因此应用程序应了解正在导出什么。
保持简单……
大多数服务总线支持不同类型的消息。虽然提供商之间的语义可能不同,但有三个基本的消息概念:
- 主题
- 一条消息通过多个提供程序广播给所有订阅者,几乎没有选择。
- 队列
- 在所有订阅者中只能消费一次的消息。
- 工作流 (Saga)
- 具有指定工作流和/或与其他消息和响应协调的消息。
当前的 Bus 是基于主题的,因为它最通用、支持最广泛且最易于实现。可以使用数据库级别的锁定等各种机制来逻辑地实现队列消息,并且应用程序消息服务可以在基于主题的架构中协调工作流。
话虽如此,但将队列消息集成到此架构中并不困难,但目前此解决方案不打算处理此复杂性。受工作流限制的消息通常在提供程序内部进行协调,不适合用于通用消费的框架。
诀窍在于……
架构最大的诀窍在于如何处理序列化/反序列化。每个 `IUnifiedMessageBusProvider` 对消息在传输过程中必须如何构造都有其自身的要求。
- Azure Service Bus
- `BrokeredMessage` 包含一个二进制序列化的对象,该对象只能检索一次。
- WebApi
- JSON 友好序列化。
- ZeroMQ
- 每条消息都需要特定的序列化属性。
`MessageWrapper` 类包含消息元数据,以包含消息对象的类型。无论对象如何发送,这些消息元数据都必须通过网络传输,以确保接收端能够通用地处理对象以进行反序列化。如果您不这样做,问题就会变成必须创建特定类的消息消费者,这有点违背了基于 IOC 的总线(在我看来)的目的。
注意:这也是我不喜欢 MassTransit 的原因之一,因为它需要消费者实现。
侦听器被期望将 `MessageWrapper` 返回给总线,而不是反序列化的对象。每个 `IUnifiedMessageBusProvider` 可能需要重新构建 `MessageWrapper`,但 ID 特别重要。虽然您可能认为这是不必要的,但另一个因素是消息实际上可能发送的是完全不同的数据类型!
对于 Web,我们不是要发送二进制序列化的数据,或者对象本身,对吗?我们将希望将数据序列化为 JSON 字符串,通过任何 Web `IUnifiedMessageBusProvider`;然而,非 Web 提供程序可能需要发送实际的请求体。
为了支持这种情况,`MessageWrapper` 类有一个通用的、虚拟的 `Unwrap` 方法,因此任何专门的实现都可以提供自己的自定义逻辑。在 `MessageWrapper` 上提供该方法允许其他提供程序访问真实数据类型。
绕过此问题的另一种方法是使用两种不同的消息——一种是通用消费的,另一种是将相同的数据预先转换为 JSON。我对这种方法的问题在于,您必须深入了解每个提供程序以及它支持的类型,并且还要明确了解给定消息可能存在的每种格式。此设计的总体目标是让任何消费者只需了解其所需的消息类型。
由于订阅和序列化是基于实际消息的 `Type`,因此类需要在一个全局可用的库中可用。我建议 DTO 消息在同一库中从 POCO 模型创建。
特别是在处理 WPF 时,开发人员可能习惯于更复杂的模型对象;然而,我个人不建议将此类模型用于全局应用程序消费域级模型类型。尽管它最初的接收效果很差,但您可以在我的关于在 MVVM 中使用此类模型的文章这里中找到它。也许在当前上下文中它会更有意义……
Using the Code
有四个主要的演示项目:
- 一个控制台应用程序(TestClient)。
- Web 提供程序
- ZeroMQ
- 一个 MVC Web 应用程序(DemoWeb)。
- Azure
- SignalR
- 一个 WebApi,用于支持基于 Web 的提供程序(DemoWebApi)。
- Azure
- 一个使用 PRISM 的 WPF 应用程序(DemoWpf)。
- Azure
- Reactive
主要的演示项目实现了具体的提供程序和其他特定于应用程序的支持,这些将不包含在框架中。
代码是基于 MEF 的 IOC。这意味着您通常会将任何被导出的程序集输出或复制到主应用程序目录(或配置目录)中进行注入。如果您想更改应用程序可用的 `IUnifiedMessageBusProvider`,只需将它们添加到/从指定目录中删除即可。
为了演示的简洁性,主应用程序与控制台应用程序和 Web 应用程序中的提供程序之间有直接的项目引用。您手动创建和协调要与 IOC 结合使用的程序集的类,这只是您必须经历 IOC 旨在解决的所有痛苦。这通常不推荐,除了简单的概念/演示项目。
Web 应用程序和 WPF 使用 IOC。这两个项目都有一些很好的起点示例,说明 MEF 和 IOC 在各自环境中的工作方式,如果您对 IOC 和/或 MEF 的熟悉程度较低。WPF 演示显示了 MEF 和 IOC 的更完整功能,并使用了 PRISM。
如果您想使用 Azure Service Bus `IUnifiedMessageBusProvider`,您需要设置您自己的 Azure 帐户并托管您自己的服务总线。在具体实现中替换您的服务总线端点。接下来,取消注释 `Export` 属性,您就可以开始了。
//If using this provider, uncomment the line below then change to your Azure Service Bus endpoint.
//[Export(typeof(IUnifiedMessageBusProvider))]
public class WpfAzureProvider : TopicProvider, IDisposable
{
private static readonly string connectionString = "YOUR_ENDPOINT";
...
}
如果您使用的是控制台应用程序,您需要将 Web 提供程序的地址更改为 WebApi 项目的地址并删除异常。
public class ConsoleWebProvider : WebProvider { public ConsoleWebProvider() : base("https://:12345/", Guid.NewGuid(), TimeSpan.FromSeconds(10)) { throw new Exception("Change localhost port!"); } }
从逻辑角度来看,演示中的框架如下所示:
实际的发布者和订阅者与如何适当地发送数据没有具体的交互或委托。所有需要的只是 `Bus` 的实例,然后是它们感兴趣的消息的 `Type` 和 `Scope`。
在后台,有更多的复杂性。控制台应用程序可以通过其 ZeroMQ 提供程序直接与其他控制台应用程序通信。当与 WPF 应用程序通信时,它会连接到 WebApi,然后 WebApi 将消息中继到其 Azure 提供程序。由于 WPF 应用程序有一个 Azure 提供程序,因此它可以接收它。
即使演示应用程序可能看起来对一些内部工作有大量了解,这仅仅是因为这是演示的重点。通常,这将是即插即用的,并且会有完全独立的程序集供团队处理通信方面。此外,大型应用程序可能有 10 到 100 个不同的消息消费者。这些实例中的每一个应该只使用应用程序服务或 `Bus`,因此消费者面临的复杂性会更明显。
注意:为了澄清,如果我说的消费者是指需要特定订阅的应用程序特定代码,而不是客户端。活动客户端的数量可能达到 100,000+。
演示中的 Web 应用程序没有 Web 提供程序。它使用 Azure 提供程序进行全局通信。Web 提供程序仅在控制台应用程序中使用,以演示其工作原理以及如何连接不同的平台。如果所有东西都连接到 Azure 提供程序,那将不是一个好的演示。
即使您只有一个传输技术,`Bus` 仍然具有优势。例如,如果您希望对 Azure Service Bus 进行负载均衡,您可以创建多个具有不同端点的具体提供程序。有几种潜在的解决方案可以只将消息发送到选定的 Azure Service Bus 提供程序,从而减少每个提供程序的邮件负载。在最简单的实现中,您仍然可以通过总线监听所有实例。如果您想要减少特定传输的订阅者,也可以创建中间应用程序。
所以,目前为止,在演示中,请随便玩玩。要使其成为一个真正的框架,还有很多工作要做,但如果您愿意花点时间分享,我将很乐意听取您对一切的想法。
在接下来的文章中(如果感兴趣),我将深入探讨实现的细节,包括每个传输以及我使用它们的经验。祝您阅读愉快!
关注点
Azure Service Bus 的设置和使用非常方便。它是一个可靠且持久的服务总线,但这也带来了吞吐量和延迟的代价。大多数高吞吐量提供程序都不可靠或不持久。可以将其视为 TCP 和 UDP 之间的区别(有时这确实是原因)。除非您正在处理股票行情或有其他高吞吐量需求,否则我推荐它作为全局服务总线。有关改进 Azure Service Bus 性能的建议,请查看这篇文章。
在开发时,SignalR 尚不支持 ASP.NET Core,因此使用了最新支持的框架。WebApi 中的某些内容导致 SignalR 出错,所以我不得不将其提取到一个单独的项目,但这实际上更现实。为了演示的简洁性,我本会将其保留在同一个项目中。
SignalR 具有用于在负载均衡服务器上协调消息的后备计划的概念;然而,这个框架处理了该功能——只需在每台服务器上放置一个全局提供程序。我不知道 SignalR 在后备计划上的效率,但框架的巨大优势在于能够跨越任何可能位于任何机器上的 `IUnifiedMessageBusProvider` 发送消息。
基于 WebApi 的提供程序是基于轮询的,但我创建它是因为几乎任何平台都支持 RESTful API 调用(特别是移动设备)。
我确实打算将该框架发布到 NuGet 并将其变成一个开源项目。我还需要做一些清理工作,但如果您碰巧有兴趣贡献,请留言。如果您想实现一个特定的传输提供程序,我很乐意看看,或者如果您完成了,我很乐意将其添加到框架中(*MassTransit 除外*)。
我刚研究了 Google Cloud Pub/Sub,它看起来非常有前途。以前从没用过,但它似乎是专为移动设备设计的。我需要研究一下。它声称每秒可以发送高达 100 万条消息,而 Azure 只能发送 2000 条消息,所以这也可能是高吞吐量的选择。预计会有包含 Google 提供程序的更新……
历史
2016-09-05 初始发布。有点匆忙。急于处理 Google Cloud Pub/Sub。准备接受批评……