使用 SQL Server 2005 Service Broker 模拟独白对话





4.00/5 (4投票s)
2007年10月12日
6分钟阅读

50927

412
如何使用 SQL Server 2005 Service Broker 技术模拟独白和传统队列
引言
SQL Server 2005 附带了一项新服务,对于创建强大的企业级应用程序非常有用:Service Broker。Service Broker 可用于实现两个服务之间消息的保证按序送达。这被建模为两个服务之间的**对话(dialog)**。目前,Service Broker 缺乏对**独白(monolog)**或简单消息队列的支持。
本文展示了如何在 SQL Server 2005 Service Broker 中实现独白。
背景
您可以阅读 Akram Hussein 在《在 SQL Server 2005 中使用 Service Broker 实现分布式消息传递入门》中对 Service Broker 的精彩介绍。这里给出一个**非常简要**的概述:Service Broker 处理的是服务之间使用消息进行通信。每个服务都有一个与之关联的队列。应用程序可以从队列中读取(接收)消息,并将消息发送到其他服务或响应始发服务。

消息编排的方式是通过**会话(conversation)**的概念来建模的。每条消息都是会话的一部分,并且消息在会话内是有序的。这就是实现按序送达的方式。
SQL Server 2005 只有一种类型的会话:**对话(dialog)**。对话是两个服务之间的会话,每个服务都有自己的队列。如果您想在两个应用程序之间实现可靠的通信,这种类型的会话非常有用。这两个服务甚至可以位于两个不同的数据库上。
SQL Server 2005 不包含其他会话类型。这就是为什么在文档中,“会话”和“对话”这两个术语似乎可以互换使用,这会带来一些困惑。这是因为对话是会话的唯一实现。
独白的缺失
另一种会话类型曾被建模但从未在 SQL Server 2005 中实现:**独白(monolog)**。独白基本上是一种发布者/订阅者模型,其中一个服务发布消息,任意数量的服务都可以接收这些消息。正如 SQL Server 产品团队的一位成员在 MSDN 论坛上提到的那样,SQL Server 2008 也不会包含独白功能。在本文中,我们建议探讨一种使用当前可用功能(即对话)来实现独白的方法。
用对话模拟独白
我们的方法是只定义一个服务和一个队列。然后,我们将定义从该服务到其自身的对话。不同的“消费者”将由不同的对话来表示。每个对话的行为都像一个子队列:一个事务会锁定消息的接收,并且消息会按顺序存储。

为了将一条消息广播给一组消费者,我们只需通过与这些消费者关联的所有对话发送该消息即可。
该方法的挑战
我们遇到的最大困难之一是如何管理这些对话。Service Broker 对话旨在松散地耦合进行对话的两个服务。松散耦合的后果之一是,发起方服务的对话 ID 与另一方服务的对话 ID 是不相同的。
这使得事情变得困难,因为当你创建一个对话时,你得到的对话 ID 并不是在接收端标识该对话的 ID。幸运的是,这两个对话 ID 在系统视图 sys.conversation_endpoints
中是相关的。这里有一个数据库脚本可以说明这一点。它创建了一个 TestDialog
数据库、一个协定、一个队列和一个服务。然后它在该服务上启动一个对话并发送一条消息。
CREATE DATABASE TestDialogID
GO
USE TestDialogID
GO
CREATE SCHEMA mlg
GO
CREATE CONTRACT [mlg.MonologContract]
(
[DEFAULT] SENT BY INITIATOR
)
CREATE QUEUE mlg.MonologQueue
CREATE SERVICE [mlg.MonologService]
ON QUEUE mlg.MonologQueue ([mlg.MonologContract])
GO
DECLARE @dialog_handle AS UNIQUEIDENTIFIER
BEGIN DIALOG CONVERSATION @dialog_handle
FROM SERVICE [mlg.MonologService]
TO SERVICE 'mlg.MonologService'
ON CONTRACT [mlg.MonologContract]
WITH
ENCRYPTION = OFF
SELECT @dialog_handle AS dialog_handle;
SEND ON CONVERSATION @dialog_handle
MESSAGE TYPE [DEFAULT] ('Test message');
SELECT * FROM mlg.MonologQueue
SELECT * FROM sys.conversation_endpoints
GO
以下是此脚本可能输出的示例。所有 ID 都是生成的 GUID;因此它们在您的机器上会有所不同。

第一个结果集是由 BEGIN DIALOG CONVERSATION
命令创建的对话句柄。第二个结果集是消息发送后队列的内容。第三个结果集是消息发送后 sys.conversation_endpoints
的内容。您会注意到第二个结果集中的 conversation_handle
与第一个结果集中的 dialog_handle
是不一样的。这是因为第一个对话句柄是发起方的句柄,而第二个是接收方的句柄。第三个结果集实际上解决了这个困惑:您可以在第一行的 conversation_handle
列中看到第一个结果集中的 dialog_handle
,在第二行中看到第二个。两者共同的是 conversation_id
。
Using the Code
本文附带的小型 SQL 库基本上将前面几节的概念封装到 6 个存储过程中。
存储过程名称 | 描述 |
mlg.CreateConversation |
创建一个会话并返回其 conversation_id 。 |
mlg.ReceiveMessage |
使用 conversation_id 从一个会话中接收一条消息,并可选择设置超时。 |
mlg.ReceiveMessages |
使用 conversation_id 从一个会话中接收所有消息,并可选择设置超时。 |
mlg.SendMessage |
使用 conversation_id 向一个会话发送一条消息。 |
mlg.EndConversation |
结束一个会话并清理待处理的消息。 |
mlg.CleanUpAllConversations |
清理队列中的所有会话。此存储过程不应由应用程序使用,仅用于管理目的。它使用一个效率不高的 while 循环来遍历会话。 |
我们使用会话 ID 而不是会话句柄,因为会话 ID 在通信两端都是唯一的。我们还包含了封装了对这些 SQL 存储过程调用的 C# API。以下是对象模型。

一个基本的测试用例会是这样:
using (Queue queue =
Queue.FromConnectionString(
"Data Source=.;Initial Catalog=MyDataBase;Integrated Security=True"))
{
Conversation conversation = queue.CreateConversation();
try
{
byte[] data = new byte[] { 1, 2, 3, 4 };
conversation.PushMessage(data);
ConversationMessage message =
conversation.PopMessage(TimeSpan.FromSeconds(1));
Console.WriteLine("ID: {0}", message.MessageID);
Console.WriteLine("Sequence Number: {0}",
message.MessageSequenceNumber);
Console.WriteLine("Message body length: {0}",
message.MessageBody.Length);
}
finally
{
conversation.EndConversation();
}
}
最后,我们提供了一个 Windows Forms 应用程序来演示该 API 的用法。

这个演示展示了(左侧)一次发送/接收一条消息和(右侧)发送/接收所有消息的存储过程。
广播
这个库中有意缺失的是广播消息的能力。原因在于这取决于具体的应用程序。在您的应用程序中,您只需添加一个层来识别接收者组以及该接收者组的条件。当您想要广播消息时,您需要查看所有组的条件,选择符合条件的组,然后使用此处提供的 API 将消息发送给每个组的每个成员。
后续步骤
当前发送消息的实现是一次发送一条消息。对于大规模发送来说,这可能会很慢。例如,在一个笔记本电脑上运行的虚拟机上快速测试发送 10000 条 10 KB 的消息,耗时可达 2 分钟。在您希望将所有这些发送操作都包装在一个 SQL 事务中的广播场景中,这就太慢了。我们需要实现批量发送消息的功能。这可以通过将一批消息打包到一个 XML 参数中来轻松实现。
结论
SQL Server 2005 中尚不存在独白会话,但如本文所示,有一种方法可以使用对话会话来模拟它。因此,SQL Server 2005 Service Broker 可以用来实现一个可靠的发布/订阅模式。
历史
- 2007年10月12日 -- 原始版本发布