C# 中的全局服务总线架构






4.81/5 (15投票s)
用于企业软件的多传输/提供商的消息总线实现
引言
本文介绍了一种全局服务总线架构,旨在将各种传输和消息总线提供商统一到一个易于使用的接口中,该接口将传输实现与消息的发布和订阅分离。
该架构旨在应对常见的企业开发挑战,例如
- 不同团队的不同消息传递需求。
- 使用各种技术的遗留代码。
- 通过允许增量重构来减少时间限制。
该架构面向广播消息,但可以更改以适应其他发布/订阅策略。
背景
在阅读本文之前,您应该熟悉
- IOC 开发
- NuGet
- 发布/订阅
- 响应式扩展 (Rx)
提供商模式
代码使用一种我称为提供商模式的聚合模式。您创建一个提供特定功能的接口,然后有一个类来控制提供商的聚合以及它们之间的任何交互。这在模块化开发中非常有用,因为单个模块通常提供应用程序作为一个整体需要处理的功能和对象访问。
提供商接口如下所示
[Flags]
public enum MessageScope
{
Unknown = 0,
Local = 1,
Global = 2
}
public interface IMessageBusProvider
{
MessageScope Scope { get; }
IObservable<T> Listen<T>();
void Send<T>(T message);
}
提供的功能是监听或发送消息的能力。我包含了一个范围来帮助确定消息的处理。本地和全局非常重要,因为
- 某些消息总线只能在当前应用程序内发送(本地)。
- 全局消息的接收和发送成本可能很高,因此您不想不必要地执行此操作。
- 并非所有内容都适合广播到所有地方。
范围可以扩展以涵盖特定主题、部门等。您也可以在本地提供商内部进行其他过滤。
在这种特定情况下,提供商是消息总线或传输技术。全局消息总线技术的示例如下
- Windows 服务总线
MassTransit
NServiceBus
- TIBCO
本地消息总线技术的示例如下
EventAggregator
ReactiveUI.MessageBus
注意:一些全局消息总线技术也提供仅本地发送消息的选项。
在全球企业应用程序开发中,您可能在一个团队或一个模块中有各种消息传递需求,而在另一个团队则完全不同。例如,在我最后一个受到这篇文章启发的公司,一个团队在使用 Windows 服务总线时遇到了吞吐量问题;然而,我的团队面临时间和资源问题,这使得 Windows 服务总线成为最佳选择。我们也愿意稍后使用不同的技术,因此如果其他团队能够按计划完成所有实施,我们也希望能够无缝切换。
除了允许同时实现多个提供商之外,该架构还允许您热插拔任何提供商。如果您在一个模块中实现了提供商,您所要做的就是将其吸收到您的目录中,或者不吸。其他消费模块的依赖关系将放在聚合器类上,因此它们不会受到影响。
PollingProvider
如果您已经有了消息总线实现,更改为统一的消息总线架构可能不是一件大事,但反过来说,如果您目前使用的是轮询应用程序,那么您可能还有很多工作要做。好消息是,如果出现这种情况,您可以使用 PollingProvider
逐步集成到架构中。
public abstract class PollingProvider : IMessageBusProvider
{
public PollingProvider(MessageScope scope, TimeSpan pollingInterval)
{
Scope = scope;
PollingInterval = pollingInterval;
}
public MessageScope Scope { get; protected set; }
public TimeSpan PollingInterval { get; set; }
protected abstract T Poll<T>();
public IObservable<T> Listen<T>()
{
return Observable.Interval(PollingInterval).Select(_ => Poll<T>());
}
public abstract void Send<T>(T message);
}
在这种情况下,Poll<T>()
可能是一种数据库或服务调用。例如,我每分钟轮询一次数据库以检索用户更新。Poll<User>()
将调用客户端需要做的任何事情来获取那些更新的用户。如果客户端发送了一个 UpdateUsersMessage
,那么发送操作将提取更新的用户信息并进行适当的调用以持久化更改。
其他客户端也将轮询并在下次轮询时获取这些更新。当服务器端实现完整消息总线实现的时候,您只需删除 PollingProvider
,消费者就不会受到影响,因为它们连接到 MessageBus
而不是提供商。
此外,您可能有一个不需要将特定轮询函数切换为面向消息总线函数的用例。您仍然希望拥有统一的处理方法,而这正是它所做的。
MessageBus
所以这里有一个用于聚合的示例类
public class MessageBus
{
public MessageBus()
{
}
private List<IMessageBusProvider> _providers;
protected List<IMessageBusProvider> Providers
{
get
{
if (_providers == null)
{
_providers = new List<IMessageBusProvider>();
}
return _providers;
}
}
public IObservable<T> Listen<T>(MessageScope scope)
{
return Observable.Merge(
Providers.Where(p => (p.Scope & scope) == scope).Select(p => p.Listen<T>()));
}
public void Send<T>(T message, MessageScope scope)
{
Providers.Where(p => (p.Scope & scope) == scope)
.ToList()
.ForEach(p => p.Send(message));
}
}
根据您是否使用 IOC 以及您使用的技术,构建方式会略有不同。如果您有选择,我更喜欢 MEF,因为它完全抽象了提供商和聚合器。MEF 提供的很多功能您都可以在 Unity 中实现(或规避),反之亦然,但 MEF 包含导出相同接口的多个类的能力。
在 MEF 中,您将添加以下字段
[ImportMany]
private IEnumerable<Lazy<IMessageBusProvider>> _importedProviders;
然后将 Providers
属性修改为
private List<IMessageBusProvider> _providers;
protected List<IMessageBusProvider> Providers
{
get
{
if (_providers == null)
{
_providers = _importedProviders.Select(p => p.Value).ToList();
}
return _providers;
}
}
如果您想使用 Unity 或不支持同一接口的多个实现的 IOC 容器,或者根本不使用容器,您将添加一个注册方法
public void RegisterProvider(IMessageBusProvider provider)
{
Providers.Add(provider);
}
在这种情况下,在模块初始化期间,您只需注册模块实现的提供商。我不喜欢这种方法,因为它意味着我的提供商必须了解正在消费它的东西。虽然在 MessageBus
的情况下,这应该只有一个对象;然而,在其他场景中,您可能希望实现提供商,您可能有许多提供商的消费者。
IObservable
IObservable
接口随 .NET 4.0 一起引入,支持它的库位于 Reactive Extensions (Rx) 库中。您需要安装“Reactive Extensions - Main Library”NuGet 包。Observable
类位于 System.Reactive.Linq
命名空间中。
如果您不熟悉 Rx 库,我强烈建议您花些时间去了解。此实现使用它的原因在于这些库的丰富性和有用性。
由于监听全局消息比本地订阅消耗更多的资源,您希望尽量将全局订阅者限制为每个应用程序每个消息类型一个订阅。我经常在专用的业务逻辑中有 IObservables
来本地广播消息,如下所示
public class WidgetService
{
public WidgetService(MessageBus messageBus)
{
messageBus.Listen<WidgetMessage>(MessageScope.Global).Subscribe(m =>
{
List<Widget> updatedWidgets = ApplyWidgetChanges(m);
_widgetChanged.OnNext(updatedWidgets);
}
}
private Subject<List<Widget>> _widgetsChanged = new Subject<List<Widget>>();
public IObservable<List<Widget>> WidgetsChanged
{
get
{
return _widgetsChanged;
}
}
}
虽然我可以本地重新发布全局消息,但我认为将本地广播作为业务服务的一部分有助于整合业务职责。
Subjects 位于 System.Reactive.Subjects
命名空间中,允许您观察和推送数据。有几种有用的 Subject
实现和功能,但这里使用的基本 Subject
只是推送数据。
我通常使用 List<T>
来处理更改更新,因为取决于订阅方连接了什么,单独更新可能会非常低效。显然,如果您仍然想单独处理/发送项目,您可以发送一个包含一个项目的 List<T>
。
提供商实现
实现具有此接口的提供商时存在一些常见的挑战。它们主要围绕对传输资源和消息对象的访问。
注意:下面的代码旨在成为一个 abstract
的基提供商类的一部分。具体的实现可能需要针对消息类型、主题、主题等进行特定处理。最重要的是,具体的实现将需要确定如何处理错误。
第一个挑战是确保每个消息的单个应用程序访问。为了实现这一点,您可以使用与 WidgetService
类似的方法。
protected Subject<ProviderMessageType> _receiver = new Subject<ProviderMessageType>();
我真正使用过的两个提供商都在它们自己的自定义类中发送消息,所以您发送的消息实际上被包装了。在 Windows 服务总线中,它使用 BrokeredMessage
,而在 TIBCO 中,它只是一个 Message
。ProviderMessageType
仅仅是该消息的类型。
当消息使用者订阅提供商时,它实际上会订阅 _receiver
对象。以下是 TIBCO 提供商的示例实现
public IObservable<T> Listen<T>()
{
return _receiver
.ObserveOn(new NewThreadScheduler())
.Select(m => CloneMessage(m))
.Where(m => m.GetField("MessageType") == typeof(T).ToString())
.Select(m =>
{
try
{
if (m.GetField("MessageBody") != null)
{
try
{
var memoryStream = new MemoryStream((byte[])m.GetField("MessageBody"));
var formatter = new BinaryFormatter();
return (T)formatter.Deserialize(memoryStream);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
else
{
var body = (T)(typeof(T).GetConstructor(new Type[0]).Invoke(new object[0]));
foreach (var propertyInfo in typeof(T).GetProperties())
{
try
{
propertyInfo.SetValue(body, m.GetField(propertyInfo.Name).Value);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
return body;
}
}
catch (Exception ex)
{
HandleReceiveException(m, ex);
}
return default(T);
});
}
我将逐行讨论细节。
.ObserveOn(new NewThreadScheduler())
正如您可能假设(或已经知道)的那样,这确保了任何订户都将在新线程上。您最不希望看到的是 100 个订户全部阻塞,因为它们最终在同一个线程上。
.Select(m => CloneMessage(m))
TIBCO 消息不是线程安全的,所以我克隆消息以确保任何订阅都使用自己的消息实例。Windows 服务总线也有类似的问题,即消息正文只能检索一次;然而,您可能有多个订阅者。为了解决这个问题,我通过 MessageId
缓存了消息正文,以便后续的接收者不必再次尝试提取它。
重点是您可能需要围绕这个问题进行一些自定义工作。
.Where(m => m.GetField("MessageType") == typeof(T).ToString())
处理消息时,您不希望在反序列化时出现各种异常。诀窍是将类型包含在元数据中。只需在继续之前将元数据信息与泛型类型进行比较。
if (m.GetField("MessageBody") != null)
{
try
{
var memoryStream = new MemoryStream((byte[])m.GetField("MessageBody"));
var formatter = new BinaryFormatter();
return (T)formatter.Deserialize(memoryStream);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
在 TIBCO 中,您可以开箱即用地添加原始字段,但如果您需要自定义序列化,这是一个简单的方法。显然,如果您想使用此方法,则必须为消息类型添加序列化支持。
else
{
var body = (T)(typeof(T).GetConstructor(new Type[0]).Invoke(new object[0]));
foreach (var propertyInfo in typeof(T).GetProperties())
{
try
{
propertyInfo.SetValue(body, m.GetField(propertyInfo.Name).Value);
}
catch (Exception ex)
{
HandleMessageProcessingException(m, ex);
}
}
return body;
}
如果未包含 MessageBody
字段,它会尝试构造一个默认对象,然后以一对一的关系填充该字段。此方法的优点是您不需要为对象实现序列化,消息数据是可读的,并且可以接收 TIBCO 消息的其他平台可以处理此问题。限制是所有字段都必须是 TIBCO 可以开箱即用处理的格式。
您发送消息的方式显然取决于谁将接收它们。这是提供通用功能的两种选择。
catch (Exception ex)
{
HandleReceiveException(m, ex);
}
这是一个关键部分。如果您不将整个订阅逻辑包装在 try catch
中,您将冒着静默取消订阅的风险。您的应用程序可能对消息无响应,而没有任何指示。
希望主要的类和 abstract
提供商是类库或框架的一部分,可以被通用地消费,因此您不希望指定模块或应用程序如何处理消息传递错误。这可能是一些可以忽略的事情,但您可能需要关闭应用程序以避免数据损坏。
关注点
这里的最终目标只是奠定实现您自己的 MessageBus
的架构和框架。根据各个应用程序和消息传递的要求,有很多可以更改和调整的地方。如果您面临处理重复消息的风险,您可能需要添加一个 MessageId
字段并相应地处理。很多也取决于您选择的提供商的个体挑战。
历史
- 2015-09-03:准备接受批评