65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 WCF 和 MSMQ 进行队列通信的入门指南,展示双向客户端通信

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (9投票s)

2013年1月2日

CPOL

24分钟阅读

viewsIcon

97546

downloadIcon

3323

这是一个客户端与服务进行排队通信的实际示例,客户端向服务发送消息,接收来自服务的主动消息,同时客户端和服务都可以在对方离线时向对方发送排队消息。

The finished client in use

目标

本文源于我的一次学习练习,目的是使用WCF来利用MSMQ构建一个系统,在该系统中,购买和消费数据需要在不同安装之间实时共享。这是一个每个客户端既是发布者又是订阅者的场景。在撰写本文时,大多数对技术细节不太精通的人在MSMQ/WCF示例中会迷失在如何对响应进行排队。这些示例要么过于复杂难以理解,要么使用非标准结构,要么根本没有在响应元素中正确应用排队。

我在这篇文章中的目标是提供一个易于理解和复制的工作示例——即使它不一定容易理解。

本文有三个主要部分:从客户端到服务的传统消息发送,将数据(在本例中是寻址)放在出站消息头中(以帮助将消息回送给客户端),最后是从服务到客户端发送消息。

示例来源 - 为什么选择MSMQ?

由于我系统的性质,我无法保证客户端和服务将始终能够相互连接,同样地,当客户端上线时,它需要能够接收在其离线期间发出的任何面向服务的广播。

在目标队列位于不可用远程机器的部署中,MSMQ将自动(无需您的任何操作)将消息本地保存并在目标机器可用时将其转发。[ROGERS]

我的原始示例是我建立知识库的关键垫脚石,但它依赖于客户端和服务同时在线,因此不适用于任何实际用途。此示例代表了计划开发和部署的模型。

在这里,每个客户端在运行时创建自己的入站消息队列,并将该队列的地址放在加入服务的请求中。服务创建一个字典,使用客户端名称作为键保存每个客户端的代理。客户端编写消息并在列表中勾选收件人,然后将消息发送到服务队列。服务读取此队列,并检查每条消息上的地址列表。使用存储的代理字典,它能够将这些消息排队发送给每个预期客户端。

请注意,如果服务关闭,您将需要重新注册用户以避免错误——这是因为我尚未实现代码来保存和重新加载字典。

但是,当服务启动时,您将能够让客户端离线,从其他客户端向它们发送消息,并在该客户端重新上线时看到这些消息。

这与发布和订阅非常相似,但不同之处在于没有专门的发布者或订阅者。在我的示例中,每个客户端既是发布者又是订阅者。而服务,只不过是一个中继站。

我以前的WCF产品源于两个优秀的灵感来源,它们对于实现其目标都至关重要

  • [TROELSEN] - Andrew Troelsen 撰写的“Pro C# 2010 and the .NET 4 Platform (第五版)”第25章
  • [BARNES] - Jeff Barnes 在CodeProject上撰写的“WCF: Duplex Operations and UI Threads”

这次我没有找到可以作为我努力基础的出色示例,因此这次您看到的很大程度上是我自己的工作,其中夹杂着来自一系列来源的少量贡献。我相信我已经在这篇文章的末尾列出了所有这些来源,但如果有什么遗漏,请提前致歉。

最初的计划是改编我的GPH_QuickMessageService的源代码,展示如何将其从连接转换为排队——但这在学习如何排队消息和响应的基础上,被证明是一个过于遥远的桥梁。在开始这项练习之前,我必须先整理一个简单的示例,其中客户端将硬编码的消息放在队列中,由服务读取并写入控制台。[LOWY] 在指导我完成这一垫脚石方面发挥了重要作用。它本身毫无意义,但在引导我走向有意义的双向排队解决方案方面至关重要。

我采用的队列方法是事务性的,我将在代码的各个阶段指出这一点。

如果这是您第一次接触WCF,那么我建议您阅读我的初始文章以及其中引用的那些开创性作品。

注意:为避免疑问,这是一个C#示例。

开始之前...

现在是时候去控制面板,程序,打开或关闭Windows功能,并确保您已激活MSMQ。

从客户端向服务发送消息

首先,我将详细介绍我用于将消息从客户端排队到服务的代码。这没什么不寻常的,而且有很多其他例子。完成后,我们将重新审视整个代码集,并编织进将消息从服务发送到客户端所需的逻辑。

服务

为控制台应用程序打开一个新解决方案,我将其命名为GPH_QueuedMessageService。将自动生成的Program.cs重命名为更相关的名称。我选择了GPH_QueuedMessageHost.cs。暂时将其搁置,并向解决方案添加一个新的类库项目——这将包含服务契约,并且名称应反映这一点。我选择了GPH_QueuedMessageContract。重命名自动生成的命名空间和类名,以更好地反映您正在进行的工作。我们首先检查此契约

契约:GPH_QueuedMessageContract

首先向GPH_QueuedMessageContract.cs添加以下引用

using System.ServiceModel;
using System.Transactions;  // For Transaction Scope
using System.Messaging;     //Access Messaging
using System.Runtime.Serialization; //DataContract

我已采用以“N”为前缀的源文件名作为我的命名空间名称 (NGPH_QueuedMessageContract)。

在该命名空间中,您将找到一个标准服务契约

[ServiceContract(
Name = "GPH_QueuedService"
)]

public interface IGPH_QueuedService
{
    [OperationContract(IsOneWay = true)]
    void RegisterUser(string arg_Username);
    [OperationContract(IsOneWay = true)]
    void ReceiveMessage(string userName, List addressList, string userMessage);
    [OperationContract(IsOneWay = true)]
    void RemoveUser(string arg_Username);
} 

对于那些检查过我早期连接WCF示例的人来说,这会非常熟悉,所有三个方法都有相似的签名,其中两个方法被赋予了新名称以反映它们在队列世界中行为的差异。因为这是一个断开连接的示例,所以不能通过方法类型向客户端返回任何内容——当消息被处理时,客户端可能已离线——因此返回类型必须是void,并且所有方法现在都带有OneWay属性。

这次将没有回调接口。在我描述服务如何接收排队消息后,我们将回顾代码并编写将这些消息分发到指定客户端所需的逻辑。

与任何其他WCF实现一样,契约模块中的类使用接口来拾取契约

public class CGPH_QueuedMessageContract : IGPH_QueuedService

与我之前的示例一样,我保留了一份订阅者姓名列表,以及一个包含订阅者姓名及其代理的字典(严格来说,按照我的介绍,他们现在更像是用户而不是订阅者——这就是遗产)。

private static List m_SubscriberList = new List();
private static Dictionary<string, GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient> m_NotifyList =
   new Dictionary<string, GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient>();// Default Constructor
注册用户。

这里所示方法最显著的特点是OperationBehaviour属性。如果没有这个属性,该方法就不能用于基于事务的队列。

目前,当用户注册时,我只通过输出控制台消息来显示已发出“加入请求”

[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void RegisterUser(string arg_Username)
{
    Console.WriteLine("Got Message");
}

我做出了这个决定,因为这里出现的逻辑用于记录将用于向客户端分发消息的地址,所以当我们查看从服务向它们发送消息时,我将向您介绍它。

接收消息。

除了添加的行为属性之外,接收消息不需要任何特殊操作。我再次使用控制台writeline来显示它已到达。

[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void ReceiveMessage(string arg_userName, List arg_addressList, string arg_userMessage)
{
    Console.WriteLine("Message [{0}] from [{1}]", arg_userMessage, arg_userName);
}
注销用户。

当收到注销消息时,此方法用于从订阅者列表和字典中删除该用户,因此当我们查看发送给客户端的消息时,我们将更多地看到它。目前,它只是通过writeline简单地表示已收到注销消息

[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void RemoveUser(string arg_Username)
{
    Console.WriteLine("Got Removal Message from [{0}]", arg_Username);
}

主机:GPH_QueuedMessageHost

如果您首先创建了GPH_QueuedMessageHost并添加了契约库,则跳过下一段。

但是,如果主机项目中唯一的模块是契约库,那么现在使用控制台应用程序模板向解决方案添加一个新项目 (Ctrl-Shift-N)。我将其命名为GPH_QueuedMessageHost。确保“创建解决方案目录”未选中,如果您想跟随我的做法并将其包含在与契约相同的解决方案中,请在下拉菜单中选择“添加到解决方案”。

Add a Project to the solution

一旦您的新模块生成,我建议将类名从默认的program重命名为更相关的名称。我选择了GPH_QueuedMessageHost

无论您如何或何时创建主机模块,它都需要这些引用才能执行其任务

using System.ServiceModel;
using System.Configuration; //Access app.config
using System.Messaging;     //Access Messaging
using NGPH_QueuedMessageContract;

Main方法开始显示 [LOWY] 的影响 - 如果队列不存在,则以编程方式创建它。

// Get MSMQ queue name from app settings in configuration
string queueName = ConfigurationManager.AppSettings["queueName"];
// Create the transacted MSMQ queue if necessary
if (!MessageQueue.Exists(queueName))
    MessageQueue.Create(queueName, true);//Creates a transactional queue.

然后我从 App.Config 中引入了基地址

string baseAddress = ConfigurationManager.AppSettings["baseAddress"]; 

此地址用于侦听WS-MetaDataExchange请求,并将在客户端生成服务代理时使用。

剩下完成服务的就是使其启动并运行的代码

using (ServiceHost serviceHost = new ServiceHost(typeof(CGPH_QueuedMessageContract), new Uri(baseAddress)))
{
    // Open the host and start listening for incoming messages.
    serviceHost.Open();
    // Keep the service running until the Enter key is pressed.
    Console.WriteLine("The service is ready.");
    Console.WriteLine("Press the Enter key to terminate service.");
    Console.ReadLine();
}

请注意,当 using 为服务创建上下文时,契约和基地址都用作参数。

配置:App.Config

在我们尝试编译服务之前,它需要一个配置文件 (App.Config)。

右键单击主机项目名称,选择“添加,新项”,然后突出显示“应用程序配置文件”,如图所示。

Add Application Configuration Fileto the Project

接受默认名称并单击“确定”。

appSettings将提供队列的名称和地址

<appSettings>
    <!-- use appSetting to configure MSMQ queue name -->
    <add key="queueName" value=".\private$\GPH_QueuedServiceQueue"/>
    <add key="baseAddress" value="https://:8080/GPH_QueuedService"/>
</appSettings>

<system.serviceModel>标签中,我们有服务、绑定和行为

<services> 引入了使服务工作所需的所有组件。它由行为配置、服务名称、端点地址、绑定、绑定配置、契约和端点契约限定。

请特别注意服务名称的组成方式 (Namespace.Class) 以及契约属性的组成 (NGPH_QueuedMessageContract.IGPH_QueuedService)。如果这些设置错误,将导致难以发现的运行时错误。

<services>
      <service
      behaviorConfiguration="GPH_QueuedServiceBehaviors"
      name="NGPH_QueuedMessageContract.CGPH_QueuedMessageContract">
        <!--Namespace.Class-->
        <endpoint address="net.msmq:///private/GPH_QueuedServiceQueue"
        binding="netMsmqBinding"
        bindingConfiguration="DomainlessMsmqBinding"
        contract="NGPH_QueuedMessageContract.IGPH_QueuedService"/>
        <!--Namespace.Interface-->
        <!-- Add the following endpoint. -->
        <!-- Note: your service must have an http base address to add this endpoint. -->
        <endpoint contract="IMetadataExchange" binding="mexHttpBinding" address="mex" />
      </service>
</services>

使用的绑定是netMsmqBinding。这告诉服务它将查看队列。它的绑定名称DomainlessMsmqBinding也是服务绑定配置,是根据[PATHAK]中的一个示例选择的。此配置文件部分的全部作用是我指示服务不期望任何消息的身份验证或保护。[LOWY]用一整章来讲解安全性。

<bindings>
      <netMsmqBinding>
        <binding name="DomainlessMsmqBinding">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          </security>
        </binding>
      </netMsmqBinding>
</bindings>

在服务行为中几乎没有任何事情发生。这是我自[TROELSEN]以来一直使用的默认设置,但在[LOWY]和[PATHAK]等书中也有提及。

<behaviors>
      <serviceBehaviors>
        <behavior name="GPH_QueuedServiceBehaviors">
          <!-- Add the following element to your service behavior configuration. -->
          <serviceMetadata httpGetEnabled="true"/>
        </behavior>
      </serviceBehaviors>
</behaviors>

服务现在已准备好从其队列中读取消息。在伴随本文的代码开发中,我能够编译并启动服务,从而开始客户端的工作。

客户端

与服务一样,客户端既发送也接收消息,但在此部分中,我将重点放在传统地将消息发送到服务,稍后处理相反的方向。

在一个新解决方案中创建一个新的Windows窗体应用程序。我选择GPH_QueuedMessageClient作为我的应用程序,并将Form1重命名为MessageForm。该窗体应按照我之前的WCF示例进行布局,并如本文开头图片中的三个实例所示。

添加一个窗体加载事件、一个窗体关闭事件、每个按钮的单击事件以及Username文本框的文本更改事件。然后将以下引用添加到窗体后面的代码中。在我的示例中,它是GPH_QueueMessageClientForm.cs:

using System.ServiceModel;
using System.Messaging;
using System.Transactions;

您还需要添加一个服务引用,以便在客户端创建服务的代理。右键单击项目名称,然后选择“添加服务引用”。您将需要服务App.Config中的服务地址。它位于appSettings中包含baseAddress的行上:https://:8080/GPH_QueuedService。将服务地址插入地址框中,如图所示。

Adding a Service Reference

在您看到消息告知您在该地址找到了 1 个服务后,您可以单击“确定”。如果您没有收到此消息,则服务可能未运行,您的地址可能存在错误,或者端点定义存在错误。例如,如果契约名称有拼写错误,它可能看起来完美无缺,但您将在此步骤收到一条消息,告知您没有公开的端点!

一条新的 using 语句将加入您的引用列表

using GPH_QueuedMessageClient.ServiceReference1;

在消息窗体类中声明一个代理变量

private static ServiceReference1.GPH_QueuedServiceClient m_Proxy;

初始化

在类构造函数中初始化代理

public MessageForm()
{
    InitializeComponent();
    m_Proxy = new ServiceReference1.GPH_QueuedServiceClient();
}

加载窗体

MessageForm_Load 没有 WCF 特定的代码,只是一些旨在改善应用程序可用性的指令

// Initialize the fields / buttons
this.btnJoin.Enabled = false;
this.btnSend.Enabled = false;
this.btnLeave.Enabled = false;
this.btnExit.Enabled = true;
this.txtMessageOutbound.Enabled = false;

// Initial eventhandlers
this.txtUsername.TextChanged += new EventHandler(txtUsername_TextChanged);
this.FormClosing += new FormClosingEventHandler(MessageForm_FormClosing);

Join

btnJoin_Click 包含了大量用于提供接收消息数据的代码。稍后将详细介绍。要向服务发送加入消息,只需要这些代码

m_Proxy.RegisterUser(txtUsername.Text);

还有一些表单处理代码,用于改善应用程序的行为,与WCF无关

// change the button states
this.btnJoin.Enabled = false;
this.btnSend.Enabled = true;
this.btnLeave.Enabled = true;
this.txtMessageOutbound.Enabled = true;

退出

btnExit_Click 事件只是触发关闭事件

this.Close();

关闭表单。

MessageForm_FormClosing确保代理与表单一起关闭

m_Proxy.Close();

输入用户名

txtUsername_TextChanged事件仅用于在用户名框有值时启用Join按钮

if (this.txtUsername.Text != String.Empty)
{
    this.btnJoin.Enabled = true;
}

发送消息

在我们第一次查看btnSend_Click时,我们将专注于向服务发送消息的行为,忽略寻址功能,因为在我们可以从服务接收订阅者列表之前,此功能无法使用。

这是代码

txtMessageOutbound.Enabled = false;
m_Proxy.ReceiveMessage(this.txtUsername.Text, addressList, this.txtMessageOutbound.Text);
txtMessageOutbound.Clear();
txtMessageOutbound.Enabled = true;

此示例中唯一必需的行是m_Proxy....开始的行,其他行仅用于禁用文本框、清除它并重新启用它。我选择禁用它的原因是,在我的机器上,排队很慢,我不想在早期阶段通过快速连续地向服务发送多条消息来混淆局面。

离开事件

btnLeave_Click事件用于向服务发出信号,表示此用户将从订阅者列表中删除,并删除其用于入站消息的队列,假设用户已完成使用应用程序且不希望离线时接收任何消息。注意:对于未单击Leave而是使用Exit关闭应用程序的用户,消息将继续排队。

m_Proxy.RemoveUser(this.txtUsername.Text);
this.btnJoin.Enabled = false;
this.btnSend.Enabled = false;
this.btnLeave.Enabled = false;
this.btnExit.Enabled = true;
this.txtMessageOutbound.Enabled = false;
if (MessageQueue.Exists(m_queueName))
    MessageQueue.Delete(m_queueName);

使用消息头

我遵循 [LOWY] 的建议,将消息头用于交换技术数据,例如入站(响应)地址或错误代码。这使得消息体专门用于应用程序交换的业务数据。除了提供他的智慧之外,[LOWY] 的进一步用途有限,因为他使用与其通用 ServiceModelEx 产品紧密交织的上下文来实现它,我无法从中提炼出我专用示例所需的所有内容。

服务

我从 [LOWY] 那里学到的是,我可以将一个专用类作为数据契约传递到消息头中。这应该属于包含服务契约的模块,所以我去了 GPH_QueuedMessageContract 添加了一个名为 PublishToDataContract 的新类

[DataContract(Name = "PublishToDataContract")]
public class PublishToDataContract
{
    [DataMember]
    public string PublishToAddress { get; set; }
    [DataMember]
    public readonly string FaultAddress; // Future use
    [DataMember]
    public readonly string MethodId; // Future use
}

这次它被称为DataContract而不是ServiceContract。我打算在客户端看到的所有成员都必须用DataMember属性标记。

于是我编译了它,详细咨询了[LOWY]、[PATHAK]和其他一些人,但在更新服务引用后,我无法在客户端访问数据契约类成员。

暴露“空”方法

这个解决方案,一个巧妙的变通方法,是作为我在CP!上一个问题的答案给我的。我把它看作是“暴露‘空’方法”。这就是为什么。在IGPH_QueuedService接口中添加一个新条目

[OperationContract(IsOneWay = true)]
void ExposeContract(PublishToDataContract arg_publish_details);

CGPH_QueuedMessageContract类中,使用新的数据契约类的类型声明一个新变量

PublishToDataContract m_PublishToDetails = new PublishToDataContract();

同样在此类中,实现最近添加到接口的ExposeContract方法

public void ExposeContract(PublishToDataContract arg_publish_details)
{
    ;
}

正如你所看到的,它本身什么也不做。但是,如果你重新启动服务,进入客户端并重新导入服务引用,那么PublishToDataContract现在将可用——而且我们在客户端不会提及ExposeContract

在前往客户端之前,我们将添加逻辑以从标头中提取地址,并在控制台窗口中显示它。

这是从消息中获取标头并将其写入的代码

try
{
    m_PublishToDetails = OperationContext.Current.IncomingMessageHeaders.GetHeader<PublishToDataContract>(
        "PublishToDataContract", "NGPH_QueuedMessageContract");
}
catch (Exception e)
{
    Console.WriteLine("Exception [{0}] reading the header", e.Message);
    Console.WriteLine("Header 0: {0}",OperationContext.Current.IncomingMessageHeaders[0].ToString());
    return;
}
try
{
    Console.WriteLine("Join Request from Queue: {0} via hdr passing addr: {1}", 
                      arg_Username, m_PublishToDetails.PublishToAddress);
    m_message = "Join Request from Queue: " 
                     + arg_Username + " via address " + m_PublishToDetails.PublishToAddress;
}
catch (Exception e)
{
    Console.WriteLine("Exception [{0}] displaying the header details", e.Message);
    return;
}

客户端

正如您刚刚看到的,您需要更新服务引用才能访问我们将用于将客户端地址传递给服务的数据契约。右键单击ServiceReference1,然后从弹出菜单中选择“更新服务引用”。服务需要正在运行才能成功。

客户端现在需要引用配置

using System.Configuration; //Access app.config

btnJoin_Click事件中,从App.Config文件中获取端点地址的根,并向其添加当前用户名以确保唯一性

string endpointAddressRoot = ConfigurationManager.AppSettings["endpointAddressRoot"];
string strEndpointAddress = endpointAddressRoot + txtUsername.Text;

我之所以选择这种方法,是因为它允许我在同一台机器上拥有多个动态寻址的客户端。

定义新的数据契约类的一个实例

PublishToDataContract PublishTo;

我按照 [LOWY] 的方法来填充此实例并将其放到消息头中

PublishTo = new PublishToDataContract();
PublishTo.PublishToAddress = strEndpointAddress;
MessageHeader<PublishToDataContract> numberHeader = 
         new MessageHeader<PublishToDataContract>(PublishTo);

同样,遵循[LOWY]的说法,对RegisterUser的代理调用现在必须进入一个内部通道,以便可以包含消息头。这就是它现在被封装的方式

using (OperationContextScope contextScope = new OperationContextScope(m_Proxy.InnerChannel))
{
    try
    {
        OperationContext.Current.OutgoingMessageHeaders.Add(
            numberHeader.GetUntypedHeader("PublishToDataContract", "NGPH_QueuedMessageContract"));
    }
    catch (Exception Ex)
    {
        MessageBox.Show("Exception: {0}", Ex.Message);
    }
    m_Proxy.RegisterUser(txtUsername.Text);
}

我们还需要修改客户端的app.config,添加一个appSettings标签集并创建一个指向端点地址根的键

<appSettings>
    <add key="endpointAddressRoot" value="net.msmq:///private/GPH_InboundClientQueue_"/>
</appSettings>

现在我们准备开始编写代码,以便我们可以从服务接收客户端上的消息。

在客户端表单上从服务接收消息

目前可用的更好示例通过将服务代码添加到客户端和客户端代码添加到服务来实现此目的。我将遵循这种模式。

作为伪服务的客户端

创建契约

客户端要实现“服务”般的行为,首先需要实现一个契约。向客户端解决方案添加一个新的类库项目。我将它命名为GPH_InboundMessageContract,并将自动生成的class1重命名为CGPH_InboundMessageHandler。我还替换了自动生成的命名空间名称,选择将其命名为NGPH_InboundMessageContract

契约模块将需要两个引用

using System.ServiceModel;
using System.Messaging;     //Access Messaging

这是一个简单的客户端,它只会接收两种类型的消息,这反映在服务契约中

[ServiceContract]
public interface IGPH_InboundMessageHandler
{
    [OperationContract(IsOneWay = true)]
    void OnRegistration(List arg_SubscriberList);
    [OperationContract(IsOneWay = true)]
    void OnInboundMessage(string arg_User, string arg_Message);
    //More operations
}

与任何WCF服务定义一样,该类实现契约接口

public class CGPH_InboundMessageHandler : IGPH_InboundMessageHandler
{
	.
	.
	.
}

插曲——触发事件

接收消息固然很好,如果我只是想根据这些消息进行数据库更改或写入磁盘,那么我可以直接从这里调用。但不行,在这个示例中,我想在创建此服务的窗体中显示入站数据。为此,我将触发两个事件,每个调用类型一个。它们是基本事件,超出了本文的范围,因此我将只对其进行简单的指出。

创建了两个公共静态成员变量,以帮助将信息中继回表单

public static string m_MessageRecieved;
public static string m_FromUser;
注册

我创建了注册事件来接受将从队列中到达的用户列表,并为它创建了一个处理程序。然后我在将触发事件的方法中使用该处理程序

public class RegistrationEventArgs : EventArgs
{
    public List<string> evUserList;
    public string CallingMethod;
}
public static event EventHandler<RegistrationEventArgs> RegistrationEvent;

public void SendData(List<string> arg_senduserList)
{
    if (RegistrationEvent != null)
        RegistrationEvent(null, new RegistrationEventArgs());
}
入站消息

ShowMessage事件的创建方式与注册事件相同——我本可以将其参数化以拥有一个通用事件,但那是另一天的练习了...

public class ShowMessageEventArgs : EventArgs
{
    public string evMessage;
    public string CallingMethod;
}
public static event EventHandler<ShowMessageEventArgs> ShowMessageEvent;

public void SendMessage(string arg_User, string arg_Message)
{
    if (ShowMessageEvent != null)
        ShowMessageEvent(null, new ShowMessageEventArgs());
}

回到契约

接收注册消息

请注意,OnRegistration方法具有为实现事务而定义的OperationBehavior属性。实际上,它所做的只是接收订阅者列表并触发SendData事件以将该列表传递给表单

[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void OnRegistration(List<string> arg_SubscriberList)
{
    //MessageBox.Show("Result = " + arg_InboundText, "InboundMessageHandler");
    m_SubscriberList = arg_SubscriberList;
    SendData(arg_SubscriberList);
}
接收常规消息

OnRegistration类似,此方法接收一个字符串和用户列表,将其值放在将由窗体获取的公共变量上,然后触发SendMessage将该字符串传递给窗体

public void OnInboundMessage(string arg_User, string arg_Message)
{
    m_MessageRecieved = arg_Message;
    m_FromUser = arg_User;
    SendMessage(arg_User,arg_Message);
}

客户端表单的更改

客户端窗体必须更改以创建将使用此契约的服务。所有这些更改都将发生在处理客户端窗体事件的代码中——在我的示例中是GPH_QueueMessageClientForm

我的第一个更改是引用新的契约和ServiceModel.Description用于端点操作

using System.ServiceModel.Description; // For ServiceEndpoint
using NGPH_InboundMessageContract;
“加入”点击事件

在“加入”点击事件 (btnJoin_Click) 中,创建一个入站(响应)队列,其中包含加入用户的名称作为从App.config中读取的根名称,如果该队列尚不存在

m_queueName = ConfigurationManager.AppSettings["m_queueName"] + txtUsername.Text;
if (!MessageQueue.Exists(m_queueName))
    MessageQueue.Create(m_queueName, true);//Creates a transactional queue.

接下来我正在创建绑定。因为我想使用涉及用户名的动态寻址,所以我正在运行时创建所有内容。如果这限制太多,您可以自由地从App.Config中读取设置。

NetMsmqBinding Binding;
Binding = new NetMsmqBinding();
Binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
Binding.Security.Transport.MsmqProtectionLevel = System.Net.Security.ProtectionLevel.None;

使用 App.Config 条目和表单中的用户名组合创建端点

string endpointAddressRoot = ConfigurationManager.AppSettings["endpointAddressRoot"];
string strEndpointAddress = endpointAddressRoot + txtUsername.Text;
EndpointAddress address = new EndpointAddress(strEndpointAddress);

然后我再次设置基地址,其中包含加入的用户名和 App.Config 中的根,并在为客户端主机定义变量时使用它

string baseAddress = ConfigurationManager.AppSettings["baseAddress"];
baseAddress += txtUsername.Text;
ServiceHost host = new ServiceHost(typeof(CGPH_InboundMessageHandler), new Uri(baseAddress));

为契约中触发的两个事件添加到入站消息处理程序的事件处理程序

CGPH_InboundMessageHandler.RegistrationEvent
   += new EventHandler<CGPH_InboundMessageHandler.RegistrationEventArgs>(
      CGPH_InboundMessageHandler_RegistrationEvent);
CGPH_InboundMessageHandler.ShowMessageEvent
   += new EventHandler<CGPH_InboundMessageHandler.ShowMessageEventArgs>(
      CGPH_InboundMessageHandler_ShowMessageEvent);

在客户端打开一个主机,使用此处定义的入站消息处理程序、绑定和端点。

host.AddServiceEndpoint(typeof(IGPH_InboundMessageHandler), Binding, strEndpointAddress);
host.Open();
处理注册事件

当契约触发注册事件时,CGPH_InboundMessageHandler_RegistrationEvent将在客户端处理它。它所做的只是刷新订阅者核对列表

clstSubscriber.Items.Clear();
clstSubscriber.Items.Clear();

foreach (string subscriber in NGPH_InboundMessageContract.CGPH_InboundMessageHandler.m_SubscriberList)
    clstSubscriber.Items.Add(subscriber);
入站消息

入站消息事件由CGPH_InboundMessageHandler_ShowMessageEvent捕获。它使用契约变量m_FromUserm_MessageRecieved,对其进行重新格式化并将其添加到窗体上的收件箱文本字段。

离开

当点击btnLeave_Click时,用户响应队列将被删除

if (MessageQueue.Exists(m_queueName))
    MessageQueue.Delete(m_queueName);

客户端App.Config更改

当客户端创建其服务引用时,我们自动获得了一个App.Config。现在是时候进一步修饰它以处理传入客户端的消息了。

首先我们需要动态寻址的根。这些将是AppSettings中的键/值对

<appSettings>
    <!-- use appSetting to configure MSMQ queue name -->
    <add key="m_queueName" value=".\private$\GPH_InboundClientQueue_"/>
    <add key="endpointAddressRoot" value="net.msmq:///private/GPH_InboundClientQueue_"/>
    <add key="baseAddress" value="https://:8080/GPH_QueuedInbound"/>
</appSettings>

还有一个新的服务条目

<service
          behaviorConfiguration="GPH_QueuedClientBehaviors"
          name = "NGPH_InboundMessageContract.CGPH_InboundMessageHandler">
</service>

以及一种新行为

<behavior name="GPH_QueuedClientBehaviors">
    <!-- Add the following element to your service behavior configuration. -->
    <serviceMetadata httpGetEnabled="true"/>
</behavior>

现在编译客户端,以管理员权限运行它,输入用户名并点击“加入”。我使用“J1”(表示第一个加入者)作为我的初始用户,请记住您的——我们将很快依赖它,因为我们将服务加速到向客户端发送消息。

作为伪客户端的服务

在这里,我们将研究服务如何承担一些客户端行为,为每个客户端服务创建一个代理,并使用它向客户端分发消息。

程序代码更改

主要更改发生在GPH_QueuedMessageContractCGPH_QueuedMessageContract中,涉及其实现契约的方式。但首先它需要一个服务引用。以与向客户端添加服务引用相同的方式添加它,这次使用您设置运行的客户端的地址。要添加的引用地址将是https://:8080/GPH_QueuedInboundj1。这与动态寻址背道而驰,给人一种只有设计时已知用户才能连接以从服务获取响应的印象。然而,当我们将响应能力添加到CGPH_QueuedMessageContract时,通过以编程方式覆盖任何客户端特定地址来克服了这一点。

CGPH_QueuedMessageContract需要一些新的成员变量。

string m_message;
EndpointAddress m_address;
NetMsmqBinding m_Binding;
private static List<string> m_SubscriberList = new List<string>();


// current plan is for a dictionary of users and proxies where a proxy is created for each user
// on registration and then used for any communication to the user.
// The alternative is to hold the endpoint in the dictionary and create the proxy before each message
// and destroy it afterwards
private static Dictionary<string, 
  GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient> m_NotifyList =
  new Dictionary<string, GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient>();
  // Default Constructor

根据我的双工WCF示例,我有一个标准订阅者列表和订阅者字典,这次引用了代理。

注册用户。

RegisterUser方法进行这些更改,以构建与正在注册的用户联系所需的代理。

创建绑定

m_Binding = new NetMsmqBinding();
m_Binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
m_Binding.Security.Transport.MsmqProtectionLevel = System.Net.Security.ProtectionLevel.None;

从消息头中提取地址——这将覆盖App.Config<Client>标签中硬编码的地址

m_address = new EndpointAddress(m_PublishToDetails.PublishToAddress);

使用新的绑定和地址为正在注册的客户端创建一个新代理

GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy;
//Use an overload to supply the new response address. Store in a key / value table per client
proxy = new GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient(m_Binding, m_address);

将代理存储在以注册用户名索引的字典中,并将该用户名添加到订阅者列表

// Store the username and proxy in a lookup table
m_NotifyList.Add(arg_Username, proxy);
m_SubscriberList.Add(arg_Username);

触发消息分发器方法以通知所有客户端此用户现已注册,并提供最新的订阅者列表

MessageDistributor(arg_Username, m_SubscriberList, " has joined the converstation",1);
注销并移除用户。

当用户点击客户端上的“离开”按钮时,这将触发服务上的RemoveUser

首先从字典中检索代理

GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy = m_NotifyList[arg_Username];

从订阅者列表中移除用户名,并同时移除其在字典中的条目。

// Remove the username and proxy from the lookup table and subscriber list.
m_NotifyList.Remove(arg_Username);
m_SubscriberList.Remove(arg_Username);

然后我关闭用户的代理

proxy.Close();

最后,我使用MessageDistributor通知其余用户此用户已离开对话,并附上更新的订阅者列表。

MessageDistributor(arg_Username, m_SubscriberList, " has left the conversation",1);
接收消息。

考虑到这项工作主要是关于客户端之间的消息分发,这个方法几乎没有做什么,而是选择将任务交给MessageDistributor方法

MessageDistributor(arg_userName, arg_addressList, arg_userMessage, 0);
消息分发

MessageDistributor方法负责将消息发送到不同的客户端。它将触发消息的用户名、当前地址列表、消息本身和一个指示器作为参数。该指示器为0时仅处理来自用户的消息,为1时则需要带有预设服务消息的新分发列表。

逻辑被封装在一个 foreach 循环中,该循环将处理地址列表参数(对于指示器 0 是整个列表,对于指示器 1 是消息中提供的列表)

foreach (string tmpAddr in arg_addressList)

每次循环迭代都在一个 TransactionScope 内进行处理

using (TransactionScope scope = new TransactionScope())

从字典中获取代理

GPH_QueuedMessageContract.ServiceReference1.GPH_InboundMessageHandlerClient proxy = m_NotifyList[tmpAddr];

将消息写入队列,请注意当指示器为1时,有两个消息要排队。如果第二个消息失败,我可以使用事务范围来回滚——这是另一天的练习

if (arg_ind == 1)
{
    string[] tmpSubscriberList = (string[])m_SubscriberList.ToArray();
    proxy.OnRegistration(tmpSubscriberList); //Send back a list of current subscribers
}
Console.WriteLine("Dispatching message to: [{0}]", proxy.Endpoint.Address);
proxy.OnInboundMessage(arg_userName, arg_userMessage);

最后一个任务是关闭作用域

scope.Complete();

配置更改

添加服务引用的过程将在契约库上创建一个app.config——应用程序无法看到它。<Client><netMsmqBinding>应复制到主机项目(在此示例中为GPH_QueuedMessageHost)上的App.config中。硬编码的端点地址是使用服务引用向导的副产品,但未在代码中使用。

查看您的队列。

您可以通过单击开始按钮并右键单击计算机来查看机器上的队列

Right Clicking Computer

单击上面显示的弹出窗口中的“管理”。这将打开计算机管理对话框,您需要展开“服务和应用程序”,然后是“消息队列”,接着是“私有队列”,并将左侧窗格加宽,直到您看到类似这样的内容

Computer Management

这是一个已排队但未处理的消息示例

Queued Message

双重构建

当您在一个解决方案中使用多个项目时,可能会出现链接错误,因为一个项目找不到另一个项目,如果您快速连续编译两次,这些错误就会消失——当这种情况发生时,请使用“项目”菜单中的“项目依赖项”来纠正它。

参考文献

  • [TROELSEN] - Andrew Troelsen 撰写的“Pro C# 2010 and the .NET 4 Platform (第五版)”第25章
  • [LOWY] - Juval Lowy 著《编程WCF服务》
  • [PATHAK] - Nishith Pathak 著《Pro WCF 4 实用微软SOA实现》
  • [NARAYAN] - SheoNarayan 著《使用C#在MSMQ中发送和接收消息》
  • [HOLLANDER] - Tom Hollander 著《使用 WCF 和 MSMQ 构建发布/订阅消息总线》
  • [VANDIEST] - Geoffrey Vandiest 著《以编程方式创建使用Msmq的WCF服务和客户端》
  • [KUMAR] - Saravankumar 著《数据契约》
  • [MSDN] - MSDN 著《如何:在代码中创建服务终结点》
  • [VDSTELT] - Dennis van der Stelt 著《WCF 和 MSMQ》
  • [HALABAI] - Mohamad Halabai 著《WCF 队列消息传递》
  • [DORIER] - Nicholas Dorier 著《WCF:双工 MSMQ》
  • [PAUL] - Razan Paul 著《基于主题的发布/订阅设计模式在 C# 中的实现 - 第二部分 (使用 WCF)》
  • [ROGERS] - Will Rogers 著《MSMQ 和 WCF(不同机器上的客户端和服务)》

历史

2013-01-01 - V1.0 - 首次提交。

© . All rights reserved.