65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SQL Server 2005 Service Broker 模拟独白对话

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (4投票s)

2007年10月12日

6分钟阅读

viewsIcon

50927

downloadIcon

412

如何使用 SQL Server 2005 Service Broker 技术模拟独白和传统队列

引言

SQL Server 2005 附带了一项新服务,对于创建强大的企业级应用程序非常有用:Service Broker。Service Broker 可用于实现两个服务之间消息的保证按序送达。这被建模为两个服务之间的**对话(dialog)**。目前,Service Broker 缺乏对**独白(monolog)**或简单消息队列的支持。

本文展示了如何在 SQL Server 2005 Service Broker 中实现独白。

背景

您可以阅读 Akram Hussein 在《在 SQL Server 2005 中使用 Service Broker 实现分布式消息传递入门》中对 Service Broker 的精彩介绍。这里给出一个**非常简要**的概述:Service Broker 处理的是服务之间使用消息进行通信。每个服务都有一个与之关联的队列。应用程序可以从队列中读取(接收)消息,并将消息发送到其他服务或响应始发服务。

Service Broker Dialog

消息编排的方式是通过**会话(conversation)**的概念来建模的。每条消息都是会话的一部分,并且消息在会话内是有序的。这就是实现按序送达的方式。

SQL Server 2005 只有一种类型的会话:**对话(dialog)**。对话是两个服务之间的会话,每个服务都有自己的队列。如果您想在两个应用程序之间实现可靠的通信,这种类型的会话非常有用。这两个服务甚至可以位于两个不同的数据库上。

SQL Server 2005 不包含其他会话类型。这就是为什么在文档中,“会话”和“对话”这两个术语似乎可以互换使用,这会带来一些困惑。这是因为对话是会话的唯一实现。

独白的缺失

另一种会话类型曾被建模但从未在 SQL Server 2005 中实现:**独白(monolog)**。独白基本上是一种发布者/订阅者模型,其中一个服务发布消息,任意数量的服务都可以接收这些消息。正如 SQL Server 产品团队的一位成员在 MSDN 论坛上提到的那样,SQL Server 2008 也不会包含独白功能。在本文中,我们建议探讨一种使用当前可用功能(即对话)来实现独白的方法。

用对话模拟独白

我们的方法是只定义一个服务和一个队列。然后,我们将定义从该服务到其自身的对话。不同的“消费者”将由不同的对话来表示。每个对话的行为都像一个子队列:一个事务会锁定消息的接收,并且消息会按顺序存储。

Monolog

为了将一条消息广播给一组消费者,我们只需通过与这些消费者关联的所有对话发送该消息即可。

该方法的挑战

我们遇到的最大困难之一是如何管理这些对话。Service Broker 对话旨在松散地耦合进行对话的两个服务。松散耦合的后果之一是,发起方服务的对话 ID 与另一方服务的对话 ID 是不相同的。

这使得事情变得困难,因为当你创建一个对话时,你得到的对话 ID 并不是在接收端标识该对话的 ID。幸运的是,这两个对话 ID 在系统视图 sys.conversation_endpoints 中是相关的。这里有一个数据库脚本可以说明这一点。它创建了一个 TestDialog 数据库、一个协定、一个队列和一个服务。然后它在该服务上启动一个对话并发送一条消息。

CREATE DATABASE TestDialogID
GO

USE TestDialogID
GO

CREATE SCHEMA mlg
GO

CREATE CONTRACT [mlg.MonologContract]
(
    [DEFAULT] SENT BY INITIATOR
)

CREATE QUEUE mlg.MonologQueue

CREATE SERVICE [mlg.MonologService]
    ON QUEUE mlg.MonologQueue ([mlg.MonologContract])
GO

DECLARE @dialog_handle AS UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @dialog_handle
FROM SERVICE [mlg.MonologService]
TO SERVICE 'mlg.MonologService'
ON CONTRACT [mlg.MonologContract]
WITH
ENCRYPTION = OFF

SELECT @dialog_handle AS dialog_handle;

SEND ON CONVERSATION @dialog_handle
    MESSAGE TYPE [DEFAULT] ('Test message');

SELECT * FROM mlg.MonologQueue

SELECT * FROM sys.conversation_endpoints
GO

以下是此脚本可能输出的示例。所有 ID 都是生成的 GUID;因此它们在您的机器上会有所不同。

Dialog Results

第一个结果集是由 BEGIN DIALOG CONVERSATION 命令创建的对话句柄。第二个结果集是消息发送后队列的内容。第三个结果集是消息发送后 sys.conversation_endpoints 的内容。您会注意到第二个结果集中的 conversation_handle 与第一个结果集中的 dialog_handle 是不一样的。这是因为第一个对话句柄是发起方的句柄,而第二个是接收方的句柄。第三个结果集实际上解决了这个困惑:您可以在第一行的 conversation_handle 列中看到第一个结果集中的 dialog_handle,在第二行中看到第二个。两者共同的是 conversation_id

Using the Code

本文附带的小型 SQL 库基本上将前面几节的概念封装到 6 个存储过程中。

存储过程名称 描述
mlg.CreateConversation 创建一个会话并返回其 conversation_id
mlg.ReceiveMessage 使用 conversation_id 从一个会话中接收一条消息,并可选择设置超时。
mlg.ReceiveMessages 使用 conversation_id 从一个会话中接收所有消息,并可选择设置超时。
mlg.SendMessage 使用 conversation_id 向一个会话发送一条消息。
mlg.EndConversation 结束一个会话并清理待处理的消息。
mlg.CleanUpAllConversations 清理队列中的所有会话。此存储过程不应由应用程序使用,仅用于管理目的。它使用一个效率不高的 while 循环来遍历会话。

我们使用会话 ID 而不是会话句柄,因为会话 ID 在通信两端都是唯一的。我们还包含了封装了对这些 SQL 存储过程调用的 C# API。以下是对象模型。

API

一个基本的测试用例会是这样:

using (Queue queue =
    Queue.FromConnectionString(
    "Data Source=.;Initial Catalog=MyDataBase;Integrated Security=True"))
{
    Conversation conversation = queue.CreateConversation();

    try
    {
        byte[] data = new byte[] { 1, 2, 3, 4 };

        conversation.PushMessage(data);

        ConversationMessage message = 
            conversation.PopMessage(TimeSpan.FromSeconds(1));

        Console.WriteLine("ID:  {0}", message.MessageID);
        Console.WriteLine("Sequence Number:  {0}", 
            message.MessageSequenceNumber);
        Console.WriteLine("Message body length:  {0}", 
            message.MessageBody.Length);
    }
    finally
    {
        conversation.EndConversation();
    }
}

最后,我们提供了一个 Windows Forms 应用程序来演示该 API 的用法。

Demo

这个演示展示了(左侧)一次发送/接收一条消息和(右侧)发送/接收所有消息的存储过程。

广播

这个库中有意缺失的是广播消息的能力。原因在于这取决于具体的应用程序。在您的应用程序中,您只需添加一个层来识别接收者组以及该接收者组的条件。当您想要广播消息时,您需要查看所有组的条件,选择符合条件的组,然后使用此处提供的 API 将消息发送给每个组的每个成员。

后续步骤

当前发送消息的实现是一次发送一条消息。对于大规模发送来说,这可能会很慢。例如,在一个笔记本电脑上运行的虚拟机上快速测试发送 10000 条 10 KB 的消息,耗时可达 2 分钟。在您希望将所有这些发送操作都包装在一个 SQL 事务中的广播场景中,这就太慢了。我们需要实现批量发送消息的功能。这可以通过将一批消息打包到一个 XML 参数中来轻松实现。

结论

SQL Server 2005 中尚不存在独白会话,但如本文所示,有一种方法可以使用对话会话来模拟它。因此,SQL Server 2005 Service Broker 可以用来实现一个可靠的发布/订阅模式。

历史

  • 2007年10月12日 -- 原始版本发布
© . All rights reserved.