使用 C# 和 MongoDB 的高速消息总线






4.67/5 (4投票s)
使用 C# 和 MongoDB 的高速消息总线。
引言
现代企业在其数据中心拥有许多系统、应用程序和服务。许多这些应用程序和服务都需要相互通信以满足业务需求。在很大程度上,Web 服务已将各种系统、应用程序和服务之间的集成标准化。由于 Web 服务基于 Http 运行,因此它可以透明地与端点或其他服务通信。换句话说,我们可以使用单个接口进行服务到服务的通信,以及服务到端点的通信。直接的服务到服务集成会导致紧耦合的架构;其中一项服务中的任何更改都可能对其他服务产生连锁反应。
对于 N 个服务,我们需要 N(N-1)/2 个连接才能实现一个完全互联的系统。每个服务都了解其他服务的接口;一项服务中的更改可能会影响多个服务。如果服务数量很少,那么这种设计是可以的。随着服务数量的增加(这在企业中很常见),服务之间的互连数量将呈指数级增长;使任何此类系统的维护成为开发人员的噩梦。
面向消息的中间件 (MOM) 已经存在多年。微服务在全球范围内的采用重新激发了对高速消息传递系统的关注,因为它是服务内通信的首选机制。消息传递系统是分布式系统的组成部分,允许应用程序之间进行消息传递。消息传递中间件基于存储和转发的设计原则运行,以提供保证的消息传递和对运行时中断的弹性。消息传递系统使企业能够实现服务到服务的集成;而无需多个服务之间的互连。
当今,处理“大数据”和基于“数据分析”的决策能力是企业的关键战略优势。数据密集型系统需要高速、高吞吐量的数据库来以闪电般的速度处理大量数据。MongoDB 在处理大量结构化/非结构化数据、近乎零的对象到表阻抗以及内存存储能力方面的优势,使其成为大数据解决方案的理想选择,远超关系数据库管理系统 (RDBMS)。
大多数传统的消息传递系统使用专有技术,并且无法提供实时消息传递功能。RabbitMQ 是高速消息传递的流行选择;然而,RabbitMQ 的设置、配置和操作是一个挑战。
利用 MongoDB 的固定连接和 C# 泛型功能,可以创建一个轻量级、可扩展、可投入生产使用的消息系统。在本文中,我解释了使用 MongoDB 和 C# 开发的消息总线的设计、架构和用法。所有源代码均已发布在 GITHub 上。
背景
消息总线是一种逻辑概念,旨在围绕消息系统的运行创建概念框架。消息总线的概念类似于 TCP/IP 网络的网络架构。在 IP 网络中,数据包在共享介质上传输;共享介质充当 IP 数据包从源到目的地传输的数据总线。IP 网络中的“共享介质”是一个逻辑概念,通过许多网络组件、物理介质和连接来实现。与 IP 网络类似,消息总线提供了一个逻辑共享介质,其中“应用程序消息”以非常高的速度从一个系统传输到另一个系统。数据库和消息缓冲区等不同组件协同工作,以实现连接到消息总线的系统之间的共享介质概念。
与 TCP/IP 不同,消息中不提供目标地址。消息总线中的消息传递遵循“发送后即忘”的原则。连接到消息总线的每个系统都会声明其接收特定类型或特定来源消息的兴趣。每当一个系统想要发布一条消息时,它就使用消息总线提供的标准接口来发布消息。消息总线存储消息,然后将其传递给所有订阅了该消息类型和来源的各方。所有消息发布都是异步的,即发布者在发送消息后不会被阻塞。消息总线提供保证的消息传递和优雅处理运行时错误的能力。这种架构允许消息发布与消息使用者完全独立,并使消息的发布者和使用者能够独立于对方进行演进。
在消息总线的上下文中,消息是应用程序级别的事件或领域对象状态的变化,需要与其他系统或服务共享。例如,在银行系统中,一些消息类型可以是:
- 开设新账户
- 将资金从一个账户转账到另一个账户
- 信用卡激活
- 交易授权
图 1 展示了在拥有大量通过消息总线互连的应用程序和服务的假设银行系统中,消息总线实现的概念设计。当客户开设新银行账户时,核心银行系统可以发送一个账户开户消息。该消息可以被 CRM 系统获取,用于向客户发送电子邮件和短信通知,被反洗钱 (AML) 系统获取,用于应用治理策略,被中央日志记录系统获取,用于记录交易详细信息,以及被分析系统获取,用于更新仪表板和报告。核心银行系统无需了解消费者的详细信息。消费者可以轻松添加/删除/修改,而无需消息发布系统知晓。
消息总线架构
消息总线是通过利用 MongoDB 的固定集合 (capped collections) 的强大功能,以及 C# 的游标 (cursors) 和反射 API 来开发的。消息总线将支持的每种消息类型在 Web 应用程序的“app.config”或“web.config”文件中进行配置。初始化后,消息总线将为每种消息类型创建一个新的固定集合(如果尚不存在)。每种消息类型都有独立的集合,允许根据需要存储的消息大小和数量定制每个集合。它还可以将一种消息类型的访问与另一种消息类型隔离开来,高频消息不会“饿死”低频消息。MongoDB 的固定集合是循环队列,即当一个集合已满时,插入下一条消息将自动删除集合中最旧的消息。从固定集合读取的顺序是自然的插入顺序。
public void initQueues(SupportedMessages mappings)
{
foreach(var mapping in mappings)
{
string collectionName = DBUtil.GetCollectionName(mapping);
if (!IsCollectionExists(collectionName))
{
MongoDB.CreateCollection(collectionName, new CreateCollectionOptions
{ Capped = true,
MaxDocuments = ConfigManager.GetIntParameter
(mapping.GetType().FullName, Constants.Max_NUMBER_OF_DOCUMENTS, 1000),
MaxSize = ConfigManager.GetIntParameter
(mapping.GetType().FullName,
Constants.MAX_COLLECTION_SIZE_INBYTES, 1024 * 1024) });
}
IMongoCollection<BsonDocument> collection =
MongoDB.GetCollection<BsonDocument>(collectionName);
collections.Add(collectionName, collection);
}
}
消息总线将每种消息类型转换为通用的“BusMessage
”类型。消息总线会跟踪特定应用程序已传递的消息,该应用程序由 Application info 标识。如果与消息总线断开连接,在重新连接后,消息订阅者将从上次读取消息的点继续接收消息。新订阅者将从订阅点开始接收新消息。不会传递新订阅者的历史消息。这种安排将优化 MongoDB 的性能和内存使用。Bus Manager 的 SendMessage
方法将每条消息转换为 BusMessage
并将其存储到相应的集合中。发送消息后会返回一个 Response
对象,其中包含有关服务请求完成的信息。
// internal structure of BusMessage
public class BusMessage<t>
{
public ObjectId _id { get; set; }
public t Message { get; set; }
public string SenderApplicationName { get; set; }
public bool DeleteOnRead { get; set; }
public DateTime CreationTime { get; set; }
public DateTime? TTL { get; set; }
public BusMessage(string sender, t message, TimeSpan messageTTL)
{
this.Message = message;
this.CreationTime = DateTime.Now;
this.SenderApplicationName = sender;
this.DeleteOnRead = false;
if(messageTTL != default(TimeSpan))
{
TTL = DateTime.Now.Add(messageTTL);
}
}
public BusMessage()
{
}
}
public class Response
{
public bool Success { get; set; }
public int ErrorCode { get; set; }
public string ErrorMessage { get; set; }
public object ResponseObject { get; set; }
}
//Send message implementation
public Response SendMessage
(t message, ApplicationInfo senderApplication, TimeSpan messageTTL = default(TimeSpan))
{
Logger.Debug(string.Format("{0} published a message on {1}. Message =
{2}",senderApplication.ApplicationName,DateTime.Now,message.ToJSON()));
Response defaultResonse = DefaultResponse();
if (!SupportedMessages.GetInstance().IsTypeSupported(message.GetType()))
{
Logger.Debug("default response returned. response = "+defaultResonse.ToJSON());
return defaultResonse;
}
if (storeManager.SaveMessage<t>
(new BusMessage<t>(senderApplication.ApplicationName, message,messageTTL)))
{
defaultResonse.Success = true;
defaultResonse.ErrorMessage = "Message published successfully";
defaultResonse.ErrorCode = 0;
}
else
{
defaultResonse.ErrorMessage = "Failed to publish message";
}
Logger.Debug(defaultResonse.ToJSON());
return defaultResonse;
}
为每种消息类型设置单独的集合还可以解耦和隔离不同消息类型的访问。为了监控固定集合中的修改,监控系统会在被监控的集合上打开一个永不过期的可追溯游标 (tailable cursor)。来自多个源对一个集合的监控不会有性能问题。这种设计使得在不影响现有集合的情况下添加新集合变得非常容易。
每个固定集合都像一个循环队列,在添加新项时会自动替换最旧的项。固定集合还维护项添加到队列中的自然顺序。
StorageManager
类负责管理所有 MongoDB 交互。为了监控消息队列,BusManager
会创建一个 MessageMonitor
对象。每个 MessageMonitor
会创建一个子线程,打开与相应 MongoDB 集合的游标。固定集合中的任何插入都会被游标自动捕获,并且消息监视器会根据接收到的消息数量调用 OnEvent
或 OnEvents
方法。
public void Subscribe(Subscriber<t> subscriber, ApplicationInfo listenerApplication)
{
if(subscriber.IsNull() || listenerApplication.IsNull())
{
throw new ArgumentNullException();
}
if(!SupportedMessages.GetInstance().IsTypeSupported(typeof(t)))
{
throw new ArgumentException("Type not supported by MessageBus");
}
/**Here, assumption is that only subscriber will exist
one app context for one type of message.
if double entry is tried that system will ignore the subscription attempt. It is the
responsibility of the caller to ensure that double subscription doesn't happen
*/
if (!this.subscribers.ContainsKey(typeof(t).FullName))
{
MessageMonitor<t> messageMonitors =
new MessageMonitor<t>(subscriber, listenerApplication);
this.subscribers.Add(typeof(t).FullName, messageMonitors);
}
}
用法
使用消息总线需要三个独立的步骤:
- 配置
- 消息发布
- 订阅消息类型以接收消息
消息总线配置
步骤 1
第一步是配置 MongoDB 的连接字符串。参数“MONGO_CONNECTION
”代表 MongoDB 的连接 URL,不包含模式名。参数 MONGODB_NAME
应包含 MongoDB 数据库的名称。
<appSettings>
<add key="MONGO_CONNECTION" value="DEV"/>
<add key="MONGODB_NAME" value="MessageBus"/>
</appSettings>
.
.
.
<connectionStrings>
<add name="DEV" connectionString="mongodb://127.0.0.1:27017"/>
</connectionStrings>
第二步
第二步是通过在 <configurationsections>
下添加一个节来配置 MessageBus
的配置处理程序,如下列表所示。接下来是配置消息类型及其存储详细信息。
在 <messagebus>
下,您可以配置适用于所有消息集合的默认参数;如果未配置集合特定的参数,则将使用默认参数创建集合。有两个默认参数:
MaxDocuments
= 集合中可以存储的最大文档数MaxSize
= 集合的大小(以字节为单位)
这些参数也可以为每种消息类型指定,以微调每个集合的存储。在 <supportedMessageTypes>
下,应配置所有消息类型。未在 <supportedMessageTypes>
下配置的任何消息类型都将不可用于消息发布或订阅。消息类型可以是任何可序列化的类型。基本形式下,可以使用领域对象作为消息类型,或者开发一种基于上下文的消息传递标准,该标准可以封装领域对象和所有相关数据到一个消息类型中。作为标准化消息传递一部分配置的元数据可供订阅者用于构建灵活的消息处理机制。
<configSections>
<section name="messageBus" type="MessageBus.Config.MessageBusSection,MessageBus"/>
</configSections>
<messageBus>
<defaultParams>
<param name="MaxDocuments" value="1000000"/>
<param name="MaxSize" value="1070596096"/>
</defaultParams>
<supportedMessageTypes>
<supportedType value="MessageBus.Message.Revenue">
<param name="MaxDocuments" value="200000"/>
</supportedType>
<supportedType value="MessageBus.Message.Booking"/>
<supportedType value="MessagePublishingTest.Events.FundsTransfer"/>
</supportedMessageTypes>
</messageBus>
发布消息
为了演示消息发布,我们使用 Microsoft 的标准 Web 应用程序模板开发了一个示例 Web 应用程序。在该 Web 应用程序中,我添加了一个新菜单“资金转账”来模拟一个假设的资金转账场景。
使用资金转账菜单,用户可以请求将资金从一个账户转账到另一个账户。
为了捕获资金转账详细信息,创建了一个简单的领域对象“FundsTransfer
”。
public class FundsTransfer
{
[Display(Name = "From Customer Name")]
public string FromCustomerName { get; set; }
[Display(Name ="Transfer to Customer Name")]
public string ToCustomerName { get; set; }
[Display(Name ="Amount")]
public double Amount { get; set; }
public DateTime TransactionTime { get; set; }
public FundsTransfer()
{
FromCustomerName = ToCustomerName = string.Empty;
TransactionTime = DateTime.Now;
}
}
为使此示例简单化,我使用了领域对象作为消息传递类型。消息可以通过 BusManager
接口发布。BusManager
的引用可以通过 BusManagerFactory
获取。Bus Manager 是用户将与之合作的最重要的对象。它允许发布消息和订阅消息类型。
namespace MessageBus.Interfaces
{
public interface BusManager<t>
{
Response SendMessage(t message, ApplicationInfo senderApplication,
TimeSpan messageTTL = default(TimeSpan));
void Subscribe(Subscriber<t> subscriber, ApplicationInfo listenerApplication);
}
}
将使用 SendMessage
方法发布消息。方法签名的第一个参数是要发布的消息。第二个参数是“ApplicationInfo
”对象,它代表每个应用程序或服务的身份。连接到 MessageBus
的每个应用程序/服务都必须提供一个唯一的 MessageInfo
。消息总会将此信息用于两种方式:
首先,跟踪已发送到应用程序的消息,并发送自上次成功传递消息以来的消息。第二,通过消息来源过滤消息订阅。消息总线还允许订阅者订阅特定来源(由 sources applicationinfo
对象标识)的特定类型消息。
第三个参数是可选的。如果正在发布的消息对时间敏感,并且在一定时间后会失去意义,则可以提供一个持续时间来标记消息的有效性。消息将在 messageTTL
中提供的持续时间过后自动过期。
namespace MessageBus
{
public class ApplicationInfo
{
public string ApplicationName { get; set; }
public string ApplicationIP { get; set; }
public int ApplicationPort { get; set; }
}
}
以下代码演示了通过 BusManager
发布“FundsTransfer
”消息。使用 BusManager
,发送消息是一项简单的任务。如前所述,每个连接到消息总线的组件都必须有一个唯一的应用程序名称。消息总线不强制应用程序名称的唯一性。使用重复的应用程序名称会导致两个应用程序之间出现竞态条件。ApplicationInfo
还有助于订阅者从特定应用程序订阅特定类型消息。
[HttpPost]
public ActionResult PublishMessage(FundsTransfer fundsTransfer)
{
var factory = BusManagerFactory.GetBusManager<FundsTransfer>();
factory.SendMessage(fundsTransfer, new ApplicationInfo{ApplicationName="WebApp" });
return View(Index_VIEW, fundsTransfer);
}
订阅消息
要监控消息,需要实现“Subscriber
”接口。Subscriber
是一个通用接口,允许对各种消息类型进行单一实现。
namespace MessageBus.Interfaces
{
public interface Subscriber<t>
{
List<ApplicationInfo> InterestedInSources { get; set; }
void OnEvent(t message);
void OnEvents(List<t> messages);
void OnError(Exception exception);
}
}
InterestedInSources
是一个“ApplicationInfo
”对象的列表,该订阅者有兴趣监控这些对象。如果未提供值,则订阅者将监听给定类型 t
的所有消息。
当应用程序发布特定类型“t
”的消息时,MessageBus
会调用订阅者的 OnEvent
方法。如果同时触发了多个事件,则会调用 OnEvents
方法,并将所有发布者发布的所有消息列表作为参数。所有消息将按其自然的插入顺序排列。对于任何运行时错误条件,都会调用 OnError
方法,并将底层异常作为参数提供。OnError
方法将使订阅者能够处理运行时错误,并可用于通知或日志记录目的。
创建订阅者实例后,必须使用 BusManager
接口将其注册到消息总线。要注册订阅者,请在类型为“t
”的 BusManager
上调用“Subscribe
”方法。如果定义的类型不受 MessageBus
支持(即未在配置文件中配置),则订阅将因异常而失败。调用 subscribe
方法将自动创建一个新的子线程来监控该特定类型的消息。
与发布类似,Subscribe 方法还需要 ApplicationInfo
对象来唯一标识消息侦听器。如前所述,MessageBus
会跟踪已发送到特定订阅者的消息,该订阅者由其 ApplicationInfo
标识。如果出于任何原因,订阅者与 MessageBus
断开连接,在重新连接时,将通过调用 OnEvents
方法将所有积压的消息传递给订阅者。已指定生存时间 (TTL) 且其 TTL 持续时间已过期的消息将不会被传递。
出于演示目的,我们创建了一个简单的订阅者,它除了在控制台上显示收到的消息外,不做任何其他操作。在“MessageMonitorTest
”项目中,创建了一个小型应用程序来演示订阅者的工作。
namespace MessageBus.Message
{
public class DummySubscriber<t> : Subscriber<t>
{
public List<string> InterestedInSources { get; set; }
public void OnError(Exception exception)
{
Logger.Error(this,exception);
}
public void OnEvent(t message)
{
Console.WriteLine(message.ToJSON());
}
public void OnEvents(List<t> messages)
{
foreach(var message in messages)
{
OnEvent(message);
}
}
}
}
namespace MessageMonitorTest
{
public class MessageMonitor
{
private static ApplicationInfo AppInfo =
new ApplicationInfo { ApplicationName = "ConsoleMoniter" };
public static void Main(string[] args)
{
BusManager<FundsTransfer> FundsTransferBusManager =
ObjectFactory.GetBusManager<FundsTransfer>();
FundsTransferBusManager.Subscribe
(new DummySubscriber<FundsTransfer>(), AppInfo);
WaitForExit();
}
private static void WaitForExit()
{
while(true)
{
Console.WriteLine("Do you want to exit?");
string input = Console.ReadLine();
if (input.IsNull()) continue;
if ("exit".Equals(input.ToLower()))
{
return;
}else
{
Console.WriteLine("Invalid command");
}
}
}
}
}
结论
本文介绍了使用 MessageBus
API 所需的必要步骤。该项目是构建松散耦合的企业应用程序的大型计划的一部分。由于 MongoDB 是我们应用程序架构的一部分,我们决定通过将其塑造成消息传递系统来利用其功能。考虑到其简单性和为我们的应用程序设计带来的价值,我们决定将其作为一个开源项目发布。我们希望其他人能够为此进一步增强其功能做出贡献。
所有未来的更改和错误修复将在 GITHub.com 的项目存储库中发布:https://github.com/wahmed36/MessageBus