Microsoft Message Queuing – 使用 Microsoft SQL Server 记录交易信息






4.60/5 (7投票s)
本文介绍了如何使用 Microsoft Message Queuing 和 Microsoft SQL Server 创建一个简单的交易日志服务器
引言
我之前的文章,Microsoft 消息队列 – 一个简单的多线程客户端和服务器,展示了如何轻松创建一个基于 Microsoft 消息队列的简单解决方案。
我写这篇文章的动机是为了展示这件事可以多么容易地完成,以回应 CodeProject 问答区发布的一个问题。我承认,当发现发布者仍然无法让他的解决方案工作时,我有点惊讶。
事实证明,他正在开发一个将交易记录到数据库的解决方案,所以我想我必须再尝试描述一下它实际上可以多么容易地完成。
显然,这不是一个功能完备的交易系统,但它仍然是如何使用 Microsoft 消息队列和 Microsoft SQL Server 处理市场数据的原理的一个例子。
服务器的屏幕截图

数据库
我希望我们的小系统能够处理交易系统通常需要的数据的一个合理子集 - 因此我们只记录买盘、卖盘和实际交易。
这是我们用于存储买盘的表
CREATE TABLE Bid
(
/* Each bid is identified by an id */
Id uniqueidentifier not null primary key,
/* When was the bid posted */
TimeStamp datetime not null,
/* Who posted the bid */
BidderId uniqueidentifier not null,
/* What are we bidding for */
InstrumentId uniqueidentifier not null,
/* Under what condition will the bidder accept an offer */
/* Usually there is a limited set of standard conditions */
BidConditionsId uniqueidentifier not null,
/* Unit price */
Value float not null,
/* Number of units */
Volume float not null
)
go
我们用于存储卖盘的表
CREATE TABLE Offer
(
/* Each offer is identified by an id */
Id uniqueidentifier not null primary key,
/* When was the offer posted */
TimeStamp datetime not null,
/* Who posted the offer */
SellerId uniqueidentifier not null,
/* Under what condition will the seller accept a bid */
/* Usually there is a limited set of standard conditions */
SaleConditionsId uniqueidentifier not null,
/* What's being offered */
InstrumentId uniqueidentifier not null,
/* Unit price */
Value float not null,
/* Number of units */
Volume float not null
)
go
最后是我们的交易表
CREATE TABLE Trade
(
/* Each trade is identified by an id */
Id uniqueidentifier not null primary key,
/* When was the trade done */
TimeStamp datetime not null,
/* Who bought */
SellerId uniqueidentifier not null,
/* Who sold */
BuyerId uniqueidentifier not null,
/* Under what agreement was the trade made */
TradeAgreementId uniqueidentifier not null,
/* The instrument, goods, ... */
InstrumentId uniqueidentifier not null,
/* Unit price */
Value float not null,
/* Number of units */
Volume float not null
)
go
这些表可以使用 Harlinn.Messaging.Server2
项目中的 SQL\CreateDataTables.sql 脚本创建 - 它包含在解决方案源代码中。
插入数据
以下小查询是我在编写代码来执行针对数据库的操作时发现很有用的一个查询 - 这次它列出了 Offer
表的列名。
select cols.name from sys.all_columns cols
join sys.tables t ON (t.object_id = cols.object_id)
where t.name = 'Offer'
由于通常需要这种解决方案具有一定的性能水平,我们将直接使用 SqlConnection
和 SqlCommand
- 上面的脚本可以防止我在创建代码时犯太多错误。
public class Offer
{
public const string INSERT_STATEMENT =
"INSERT INTO OFFER(Id,TimeStamp,SellerId,SaleConditionsId,InstrumentId,Value,Volume)"+
" VALUES(@id,@timeStamp,@sellerId,@saleConditionsId,@instrumentId,@value,@volume)";
public static void Insert(SqlConnection connection, PayloadOffer offer)
{
SqlCommand command = connection.CreateCommand();
using (command)
{
command.CommandText = INSERT_STATEMENT;
command.Parameters.Add("@id", SqlDbType.UniqueIdentifier).Value = offer.Id;
command.Parameters.Add("@timeStamp", SqlDbType.DateTime).Value = offer.TimeStamp;
command.Parameters.Add("@sellerId", SqlDbType.UniqueIdentifier).Value = offer.SellerId;
command.Parameters.Add("@instrumentId",
SqlDbType.UniqueIdentifier).Value = offer.InstrumentId;
command.Parameters.Add("@saleConditionsId",
SqlDbType.UniqueIdentifier).Value = offer.SalesConditionsId;
command.Parameters.Add("@value", SqlDbType.Float).Value = offer.Value;
command.Parameters.Add("@volume", SqlDbType.Float).Value = offer.Volume;
command.ExecuteNonQuery();
}
}
}
如您所见,直接使用 SqlConnection
和 SqlCommand
并没有那么糟糕,下次您想尝试实体框架时可以考虑一下。
由于买盘和交易表的代码非常相似,因此我将跳过这些表。
有效载荷
我们将要发送到服务器的数据基于一个简单的类层次结构,包括 PayloadBase
、PayloadBid
、PayloadOffer
和 PayloadTrade
。PayloadBid
、PayloadOffer
和 PayloadTrade
派生自 PayloadBase
。
[Serializable]
public abstract class PayloadBase
{
private Guid id;
private DateTime timeStamp;
public PayloadBase()
{
}
public abstract PayloadType PayloadType
{
get;
}
// code removed
}
PayloadType
用于区分 PayloadTrade
、PayloadBid
和 PayloadOffer
- 虽然我们可以使用 is 运算符,但我倾向于认为使用鉴别器使代码更具可读性。
处理消息
就像在上一篇文章中一样,我们将使用 OnReceiveCompleted
方法异步地一次处理一条传入消息
private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;
if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
}
catch (Exception ex)
{
LogMessage(ex.Message);
}
if (message != null)
{
PayloadBase payload = message.Body as PayloadBase;
if (payload != null)
{
if (receivedCounter == 0)
{
firstMessageReceived = DateTime.Now;
}
receivedCounter++;
if ((receivedCounter % 10000) == 0)
{
TimeSpan ts = DateTime.Now - firstMessageReceived;
string messageText = string.Format
("Received {0} messages in {1}", receivedCounter, ts);
LogMessage(messageText);
}
try
{
switch (payload.PayloadType)
{
case PayloadType.Bid:
{
PayloadBid bid = (PayloadBid)payload;
DB.Bid.Insert(sqlConnection, bid);
}
break;
case PayloadType.Offer:
{
PayloadOffer offer = (PayloadOffer)payload;
DB.Offer.Insert(sqlConnection, offer);
}
break;
case PayloadType.Trade:
{
PayloadTrade trade = (PayloadTrade)payload;
DB.Trade.Insert(sqlConnection, trade);
}
break;
}
}
catch (Exception e)
{
if (isRunning)
{
LogMessage(e.Message);
}
}
}
}
}
finally
{
if (isRunning)
{
mq.BeginReceive();
}
}
}
return;
}
catch (Exception exc)
{
if (isRunning)
{
LogMessage(exc.Message);
}
}
}
如果您不熟悉二进制序列化是如何工作的,您可能会惊讶于访问队列上传递的数据是多么简单 - 正如您所见,我只是使用以下方法将检索到的有效负载强制转换为基类
PayloadBase payload = message.Body as PayloadBase;
如果我理解了促使我写这篇文章的帖子 - 这并不像我最初认为的那么明显。无论如何 - 我们现在准备好使用我们简单但相当高效的 DB.<type>.Insert 方法存储我们的数据 - 如上面的 switch 所示。
消息生成器

为了运行我们的系统,我们需要一个消息生成实用程序,它与上一篇文章中介绍的那个几乎相同,除了 SendMessages
方法
private void SendMessages(int count)
{
Random random = new Random(count);
string message = string.Format("Sending {0} messages", count);
LogMessage(message);
DateTime start = DateTime.Now;
for (int i = 0; i < count; i++)
{
PayloadType payloadType = (PayloadType)(i % 3);
PayloadBase payload = null;
switch (payloadType)
{
case PayloadType.Bid:
{
PayloadBid bid = new PayloadBid();
bid.Initialize();
payload = bid;
}
break;
case PayloadType.Offer:
{
PayloadOffer offer = new PayloadOffer();
offer.Initialize();
payload = offer;
}
break;
case PayloadType.Trade:
{
PayloadTrade trade = new PayloadTrade();
trade.Initialize();
payload = trade;
}
break;
}
messageQueue.Send(payload);
}
DateTime end = DateTime.Now;
TimeSpan ts = end - start;
message = string.Format("{0} messages sent in {1}", count, ts);
LogMessage(message);
}
我们只是让二进制格式化程序完成它的工作 - 即使 messageQueue.Send(payload);
获取了对声明为 PayloadBase
的对象的引用,它也会发送正确的信息。只要该对象的类声明为 [Serializable]
,该对象将被正确序列化。
结论
通过一个更全面的示例,我希望我已成功说明如何创建一个基于 Microsoft 消息队列的简单而高效的解决方案,该解决方案能够处理“真实”数据负载。
它仍然有很大的优化空间 - 显而易见的是一次处理多条消息,另一种是使用多个配对的 MessageQueue
组件和 SqlConnections
来并行地从队列中删除消息。
主要目的是保持解决方案简单 - 同时仍然做一些有用的事情。
此致,
Espen Harlinn
历史
- 2011 年 3 月 15 日:首次发布