65.9K
CodeProject 正在变化。 阅读更多。
Home

WCF: 双工 MSMQ

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (29投票s)

2009年8月29日

Ms-PL

11分钟阅读

viewsIcon

155850

downloadIcon

2027

如何使用通过 MSMQ 绑定的双工通信。

引言

今天,我将讨论一种使用 WCF 中的 netMsmqBinding 实现双工通信的解决方案。

好消息是,我的解决方案可以与任何支持 IOutputChannel/IInputChannel 的绑定实现双工通信。

通俗地说,这意味着如果一个绑定可以单向发送和接收消息,那么它也可以用于双工通信,即从服务器通过回调向客户端发送消息。是的,您想要并且将能够通过 Twitter 使用双工通信。我将在另一篇文章中讨论我的项目 WCF over Twitter,以了解更多关于 WCF 的信息!

如果您只想使用双工 MSMQ,请访问此处下载该项目:http://duplexmsmq.codeplex.com/;其中包含一个示例。

SOAP 管道

首先,有些人会说:“您可以使用CompositeDuplexBindingElement”,我不会重复解释为什么它不起作用,这篇博客文章解释了这个问题(感谢 Mike Taulty)。

他找到了一个解决方案,使 MSMQ 能够进行双工通信,但这需要可靠会话。可靠会话的问题在于“WCF 支持处于活动状态且正在运行的终结点之间的可靠会话”。而当我们使用 MSMQ 时,我们希望能够即使在服务离线时也能发送消息给服务,并且我们希望服务即使客户端未连接也能发送消息给客户端。因此,这不是一个好的解决方案。

但您应该会觉得有趣的是我如何在没有可靠会话的情况下实现它的!也许,为什么它只在有可靠会话的情况下才有效。

首先,我将在 SOAP 级别解释双工通信是如何工作的。

我将使用 Visio 和这个Visio 模板“其中包含 51 个集成模式图标作为 Visio 形状”。

WCF 中的 Channel(通道)是能够接收或发送消息的东西。有几种类型的通道,但我只会谈论 IInputChannel/IOutputChannel。这些通道是 netMsmqBinding 使用的通道,这意味着 IOutputChannel 仅支持单向发送消息给另一个 IInputChannel。服务器侦听 IInputChannel,客户端使用 IOutputChannel 发送消息。显然,netMsmqBinding 的这些通道的具体实现使用 MSMQ。

如果我们希望能够在双工场景中将消息回发给客户端,客户端需要公开一个 IInputChannel,而服务需要使用一个 IOutputChannel

在 WCF 中,IInputChannel + IOutputChannel = IDuplexChannel,而 WCF 随附的CompositeDuplexBindingElement 负责将一对 IInputChannel/IOutputChannel 转换为 IDuplexChannel。我们稍后会回到这里。

首先,客户端需要告诉服务器将回复发送到哪里,就像您在下面的图中所看到的。这通过每个消息的 SOAP 标头中的一些附加元素来完成。

并且,当服务器响应时,它需要指定返回地址。

所以我的目标是

  • 客户端必须从 netMsmqBinding 创建一个 IInputChannel 来接收来自服务器的消息。
  • 将此 IInputChannel 的地址附加到发送到服务器的每条消息的返回地址。
  • 服务器必须从 netMsmqBinding 创建一个 IOutputChannel 来发送消息给客户端。
  • IInputChannel 的地址附加到发送到客户端的每条消息的返回地址。

您可以看到我的解决方案并未与 MSMQ 耦合,并且可以与任何支持 IInputChannel/IOutputChannel 的绑定一起使用。

好吧,首先,我将解释 Mike Taulty 的解决方案为什么在有可靠会话的情况下有效,以及为什么它是一个糟糕的解决方案。诀窍在于返回地址随客户端发送给服务器的第一条消息一起发送。服务器为客户端创建一个会话,并将返回地址存储在内存中。然后,客户端在每条 SOAP 标头中发送其会话 ID,而不带返回地址。显然,如果服务器崩溃,会话就会丢失,客户端的消息就会被丢弃,或者发送到死信队列。

那么,我如何扩展 WCF 来解决这个问题呢?我将创建自己的 BindingElement,但我们需要了解什么是绑定。

WCF 管道

当我想到绑定时,我就会想到汉堡包。而实际上,有两种汉堡包:一种是你最喜欢的快餐店做的,另一种是你妈妈/妻子/你自己用爱和心意做的。绑定的组成部分是 BindingElement

先谈谈快餐。WCF 内置绑定的好处是它们确实易于理解和创建。而且,嗯……这是快餐,所以你不知道里面到底有什么,但它们有名字,你也能猜到里面有一些 BindingElement

例如,对于 WSHttpBinding,你可以猜到里面有一个 HttpTransportBindingElement,并且你也能猜到它通过 HTTP 发送 SOAP 消息。你猜测这段代码会启用可靠会话,但你不知道确切是哪个组件或“BindingElement”受到了影响。

wsHttpBinding.ReliableSession.Enabled = true;

好处是,在大多数情况下,您不必了解内部原理。微软做得很好,满足了大多数常见的用例。当您将绑定传递给服务主机或通道工厂时,它将调用

binding.CreateBindingElements()

来创建所有 BindingElement,这些 BindingElement 基于您在内置 Binding 上设置的属性。

但有时,您需要自己指定要添加到 Binding 中的组件或“BindingElement”。这称为自制绑定,有两种创建方式

  • 继承 Binding
  • 使用 CustomBinding

我将使用后者,因为这样最容易解释 Binding 的工作原理。

CustomBinding 只是一个您直接指定所有 BindingElement 的绑定。例如

binding = new CustomBinding(
    new ListenUriBindingElement()
    {
        ListenUriBaseAddress = baseClient
    },
    new CompositeDuplexBindingElement(),
    new ReplyToBindingElement(),
    new TextMessageEncodingBindingElement(),
    CreateMsmqBinding());

这会创建一个包含 ListenUriBindingElementCompositeDuplexBindingElementReplyToBindingElementTextMessageEncodingBindingElementMsmqTransportBindingElement(它是 CreateMsmqBinding() 的返回类型)的 Binding。这个枚举称为“BindingElement 堆栈”;最后一个 BindingElement 在堆栈中最低,它始终是 TransportBindingElement

大多数情况下,服务器端和客户端的 BindingElement 堆栈是相同的。每个 BindingElement 都可以决定修改、检查或验证消息,或者它们可以将参数传递给 BindingElement 堆栈中较低的组件。例如,当您为绑定选择消息安全时,WCF 会在内部创建绑定元素,这些元素使用证书或其他内容对消息进行编码/解码。服务器和客户端拥有相同的堆栈;BindingElement 会对在客户端发送的消息进行编码,并在服务器端进行解码。

另一个例子,当您选择将消息编码为纯文本 TextMessageEncodingBindingElement 时,它不会拦截任何消息,但它会将自身作为参数传递到堆栈中。

context.BindingParameters.Add(this);

这样,TransportBindingElement(堆栈中最低的元素)就可以检索 MessageEncodingBindingElement,以便知道如何将 Message 序列化和反序列化为字节流。每个 BindingElement 都清楚地说明了它是否能使用 CanBuildChannelListener<TChannel>CanBuildChannelFactory<TChannel> 构建通道类型。而且,每个 BindingElement 在创建其“通道管理器”时都有一个 BindingContext,用于将参数向下传递到堆栈,或为堆栈中直接下面的 BindingElement 构建“通道管理器”。

通道管理器是 IChannelFactoryIChannelListennerBuildChannelListener 在服务器端调用,BuildChannelFactory 在客户端调用。

IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)

大多数情况下,中间绑定不关心创建通道管理器,因此它们只是将调用委托给堆栈,就像 TextMessageEncodingBindingElement 所做的那样。

return context.BuildInnerChannelFactory<TChannel>();

实际上,这段代码只是调用当前 BindingElement 下方的 BindingElementBuildChannelFactory

但是,如果您想拦截和修改消息,则需要将内部通道管理器包装在一个自定义通道管理器中。我所说的“内部通道管理器”是由当前 BindingElement 下方的 BindingElement 创建的通道管理器。

这张图解释了通道管理器是如何创建的。它假设 BindingElement1BindingElement2 希望拦截消息,因此它们包装了内部通道管理器。TextMessageEncodingBindingElement 不需要拦截消息,因此它不会包装 TransportBindingElement1 返回的通道管理器。

这是一个想要拦截消息的绑定元素的示例;这是装饰器模式。

public override IChannelFactory<TChannel> 
       BuildChannelFactory<TChannel>(BindingContext context)
{
    if(!CanBuildChannelFactory<TChannel>(context))
    {
        throw new ArgumentException("Impossible to build ReplyToBindingElement");
    }
    var innerChannelFactory = (IChannelFactory<IOutputChannel>)
             context.BuildInnerChannelFactory<TChannel>();
    return (IChannelFactory<TChannel>)new ReplyToChannelFactory(
                    context.ListenUriBaseAddress, innerChannelFactory);
}

我的 ReplyToChannelFactory 实现将拦截发送到 innerChannel 的调用。大多数调用都只是委托给 innerChannelFactory。我们稍后将看到实现。ServiceHostChannelFactory<ServiceType> 类将使用最顶层绑定元素的通道管理器来发送或接收消息。请不要将 ChannelFactory<ServiceType>IChannelFactory<TChannel> 混淆;ChannelFactory<ServiceType> 是该接口的实现,但它主要用于创建客户端代理,而不是由 BindingElement 返回。

ChannelFactory<ServiceType> 只是一个类,它根据您在客户端代理上调用的方法和参数序列化和反序列化 SOAP 消息的主体,然后将消息传递给由最顶层 BindingElement(由您的最顶层 BindingElement 创建)创建的通道。ServiceHost 做的事情相同,只是将 IChannelFactory 替换为 IChannelListener

这是通道的创建;请注意,IChannelFactories 也使用装饰器模式来创建通道。

然后,当您调用代理上的方法时,会创建一个消息,并调用最顶层通道的 Send 方法

双工 MSMQ 实现

好的,现在,如果您理解了我上面说的话,您会认为实现将是一件轻而易举的事,事实也确实如此!要理解它如何工作有一个很深的学习曲线,但一旦理解了,您就会认为 WCF 非常容易扩展且设计精良。您会发现它并不像看起来那么难。我越了解 WCF,就越多产生项目想法(很棒的,也很糟糕的……)

首先,我创建了名为 ProxyXXXX : XXXX 的辅助类,其中 XXXX 是我想拦截调用的接口。ProxyXXXX 将接收一个内部 XXXX 并将所有调用转发给它。

ProxyXXXX 的每个方法、属性和事件都是 virtual 的,因此如果我想拦截调用,我只需继承 ProxyXXXX 并重写一个类成员。

例如,在这里,我所做的是在 ProxyOutputChannel 内部,我的通道将重写 Send 方法来拦截消息

public virtual void Send(System.ServiceModel.Channels.Message message, 
                         System.TimeSpan timeout)
{
    _InnerOutputChannel.Send(message, timeout);
}

ReplyToBindingElement 负责在出站消息中添加标头以实现双工通信。因此,它包装了每个内部 ChannelFactory

public override IChannelFactory<TChannel> 
       BuildChannelFactory<TChannel>(BindingContext context)
{
    if(!CanBuildChannelFactory<TChannel>(context))
    {
        throw new ArgumentException("Impossible to build ReplyToBindingElement");
    }
    var innerChannel = (IChannelFactory<IOutputChannel>)
                          context.BuildInnerChannelFactory<TChannel>();
    return (IChannelFactory<TChannel>)
       new ReplyToChannelFactory(context.ListenUriBaseAddress, innerChannel);
}

public override IChannelListener<TChannel> 
          BuildChannelListener<TChannel>(BindingContext context)
{
    return base.BuildChannelListener<TChannel>(context);
}

public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
    return typeof(TChannel) == typeof(IOutputChannel);
}

然后 ReplyToChannelFactory 包装了每个内部通道

public class ReplyToChannelFactory : ProxyChannelFactory<IOutputChannel>
{
    readonly Uri _ReplyAddress;
    public ReplyToChannelFactory(Uri replyAddress, IChannelFactory<IOutputChannel> inner)
        : base(inner)
    {
        _ReplyAddress = replyAddress;
    }
    public override IOutputChannel CreateChannel(System.ServiceModel.EndpointAddress to)
    {
        return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to));
    }
    public override IOutputChannel 
           CreateChannel(System.ServiceModel.EndpointAddress to, Uri via)
    {
        return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to, via));
    }
}

并且,通道将所有必要的标头添加到出站消息中

public class ReplyToChannel : ProxyOutputChannel
{
    readonly Uri _ReplyAddress;
    public ReplyToChannel(Uri replyAddress, IOutputChannel inner)
        : base(inner)
    {
        _ReplyAddress = replyAddress;
    }
    public override void Send(Message message)
    {
        ApplyReplyTo(message);
        base.Send(message);
    }
    public override void Send(Message message, TimeSpan timeout)
    {
        ApplyReplyTo(message);

        base.Send(message, timeout);
    }

    void ApplyReplyTo(Message message)
    {
        if(message.Headers.MessageId == null)
        {
            message.Headers.MessageId = new System.Xml.UniqueId();
        }
        if(message.Headers.From == null)
        {
            message.Headers.From = 
              new System.ServiceModel.EndpointAddress(_ReplyAddress);
        }
        if(message.Headers.ReplyTo == null)
        {
            message.Headers.ReplyTo = 
              new System.ServiceModel.EndpointAddress(_ReplyAddress);
        }
    }
}

这是我的服务实现和契约

[ServiceContract(CallbackContract = typeof(IConversation))]
public interface IConversation
{
    [OperationContract(IsOneWay = true)]
    void Say(String something);
}

public class Conversation : IConversation
{
    #region IConversation Members

    public void Say(string something)
    {
        Console.WriteLine("Someone says \"{0}\" to you, what is your response ?", 
                          something);
        String response = Console.ReadLine();
        OperationContext.Current.GetCallbackChannel<IConversation>().Say(response);
    }

    #endregion
}

但在服务器端,在尝试向回调通道发送消息时会抛出异常。我不知道为什么,但似乎 WCF 没有自动将出站消息的 To 标头设置为入站消息的 ReplyTo 标头的地址……我不知道为什么我只需要在服务器端这样做,而在客户端却不需要,如果有人能向我解释,我将不胜感激……所以,我需要创建一个带有消息检查器的行为来修复这个问题。

public class ReplyToBehavior : IEndpointBehavior
{
    public class ReplyToInspector : IDispatchMessageInspector
    {
        #region IDispatchMessageInspector Members

        public object AfterReceiveRequest(ref Message request, 
                      IClientChannel channel, InstanceContext instanceContext)
        {
            var reply = request.Headers.ReplyTo;
            OperationContext.Current.OutgoingMessageHeaders.To = reply.Uri;
            OperationContext.Current.OutgoingMessageHeaders.RelatesTo = 
                                     request.Headers.MessageId;
            return null;
        }

        public void BeforeSendReply(ref Message reply, object correlationState)
        {
        }

        #endregion
    }


    #region IEndpointBehavior Members

    public void AddBindingParameters(ServiceEndpoint endpoint, 
                BindingParameterCollection bindingParameters)
    {
    }

    public void ApplyClientBehavior(ServiceEndpoint endpoint, 
           System.ServiceModel.Dispatcher.ClientRuntime clientRuntime)
    {
    }

    public void ApplyDispatchBehavior(ServiceEndpoint endpoint, 
                System.ServiceModel.Dispatcher.EndpointDispatcher endpointDispatcher)
    {
        endpointDispatcher.DispatchRuntime.MessageInspectors.Add(new ReplyToInspector());
    }

    public void Validate(ServiceEndpoint endpoint)
    {
    }

    #endregion
}

我的实现完成了!

最后两件事

在创建 BindingElement 堆栈期间,我需要两样东西:首先,一个 BindingElement 让我的客户端指定消息的回复地址以及要侦听的 MSMQ 队列。这是 ListenUriBindingElement 的职责。

我从这篇博客文章中获取了它。实现并不复杂,它只是设置 context.ListenUriBaseAddress。堆栈中较低的 BindingElements 需要此值来知道在哪里侦听输入消息。

public override IChannelListener<TChannel> 
       BuildChannelListener<TChannel>(BindingContext context)
{
    if(listenUriBaseAddress != null)
        context.ListenUriBaseAddress = listenUriBaseAddress;
    return base.BuildChannelListener<TChannel>(context);
}

我还必须指定我的服务器可以发送消息,并且我的客户端必须侦听 MSMQ 通道(此通道通过之前的 BindingElement ListenUriBindingElement 设置的 context.ListenUriBaseAddress 指定)。ServiceHost 将启用服务通过回调发送消息,前提是 Binding 可以构建 IDuplexChannel。唉,事实并非如此,因为堆栈中的一个 BindingElement 不支持它:MsmqTransportBindingElement.CanBuildChannelListener<IDuplexChannel> 返回 false。所以,我需要一个 BindingElement,它在接受创建 IInputChannelIOuputChannel 的通道之上创建一个 IDuplexChannel。这很容易,您只需使用 WCF 随附的CompositeDuplexBindingElement

这是 ChannelFactory<ServiceType> 创建我的 BindingElement 堆栈中的最顶层 IChannelFactory 的方式;如果您想知道它在服务端的实现方式,只需将 IChannelFactory 替换为 IChannelListener,将 ChannelFactory<ServiceType> 替换为 ServiceHost

我们说 CompositeDuplexBindingElement 改变了通道的形状。有关更多信息,ReliableSessionBindingElement 也改变了通道的形状;它将 XXXChannel 转换为 IXXXChannel 再转换为 IXXXSessionChannel 来启用有会话的通信。

在客户端,我将使用 DuplexChannelFactory 而不是 ChannelFactory 来指定回调实现。与 ServiceHost 相同的问题通过相同的解决方案得到解决。

客户端和服务器创建

这是我为双方创建的 BindingElement 堆栈

public class BindingFactory
{
    public static Binding Create(Uri baseClient)
    {
        CustomBinding binding = null;

        binding = new CustomBinding(
        new ListenUriBindingElement()
        {
            ListenUriBaseAddress = baseClient
        },
        new CompositeDuplexBindingElement(),
        new ReplyToBindingElement(),
        new TextMessageEncodingBindingElement(),
        CreateMsmqBinding());

        return binding;
    }

    private static MsmqTransportBindingElement CreateMsmqBinding()
    {
        var binding = new MsmqTransportBindingElement();
        binding.MsmqTransportSecurity.MsmqAuthenticationMode = 
                                      MsmqAuthenticationMode.None;
        binding.MsmqTransportSecurity.MsmqProtectionLevel = 
                         System.Net.Security.ProtectionLevel.None;
        binding.UseActiveDirectory = false;
        binding.ExactlyOnce = false;
        return binding;
    }
}

显然,服务器端不需要 ListenUriBindingElement,因为 ServiceHost 将设置 context.ListenUriBaseAddress

public override IChannelListener<TChannel> 
       BuildChannelListener<TChannel>(BindingContext context)
{
    if(listenUriBaseAddress != null)
        context.ListenUriBaseAddress = listenUriBaseAddress;
    return base.BuildChannelListener<TChannel>(context);
}

为了不覆盖 context.ListenUriBaseAddress,我将调用

BindingFactory.Create(null)

在服务器端。

客户端创建

static void Main(string[] args)
{
    Console.WriteLine("Hit a key");
    Console.ReadLine();
    Binding bind = BindingFactory.Create(new Uri("net.msmq://SHIELDP/private/client"));
    DuplexChannelFactory<IConversation> channel = new DuplexChannelFactory<IConversation>
        (new Conversation(), 
        bind, 
        new EndpointAddress("net.msmq://SHIELDP/private/server"));
    channel.CreateChannel().Say("Hello I'm Nico");
    while(true)
    {
    }
}

服务器创建

class Program
{
    static void Main(string[] args)
    {
        Binding bind = BindingFactory.Create(null);
        ServiceHost host = new ServiceHost(typeof(Conversation), 
                               new Uri("net.msmq://SHIELDP/private/server"));
        var endpoint = host.AddServiceEndpoint(typeof(IConversation), bind, "");
        endpoint.Behaviors.Add(new ReplyToBehavior());
        host.Open();
        Console.WriteLine("Open");
        while(true)
        {
        }
    }
}

现在,您可以通过 MSMQ 在客户端和服务器端使用回调。

结论

我确信这个项目对于有高可用性要求的人来说会很有用。微软没有实现这一点真是太遗憾了;MSMQ 非常适合双工通信……我希望您喜欢这篇文章。下一篇将讨论WCF over Twitter,并将展示如何创建自己的 TransportBindingElement。如果您喜欢这篇文章,请告诉我!:)

© . All rights reserved.