WCF: 双工 MSMQ






4.96/5 (29投票s)
如何使用通过 MSMQ 绑定的双工通信。
引言
今天,我将讨论一种使用 WCF 中的 netMsmqBinding 实现双工通信的解决方案。
好消息是,我的解决方案可以与任何支持 IOutputChannel
/IInputChannel
的绑定实现双工通信。
通俗地说,这意味着如果一个绑定可以单向发送和接收消息,那么它也可以用于双工通信,即从服务器通过回调向客户端发送消息。是的,您想要并且将能够通过 Twitter 使用双工通信。我将在另一篇文章中讨论我的项目 WCF over Twitter,以了解更多关于 WCF 的信息!
如果您只想使用双工 MSMQ,请访问此处下载该项目:http://duplexmsmq.codeplex.com/;其中包含一个示例。
SOAP 管道
首先,有些人会说:“您可以使用CompositeDuplexBindingElement
”,我不会重复解释为什么它不起作用,这篇博客文章解释了这个问题(感谢 Mike Taulty)。
他找到了一个解决方案,使 MSMQ 能够进行双工通信,但这需要可靠会话。可靠会话的问题在于“WCF 支持处于活动状态且正在运行的终结点之间的可靠会话”。而当我们使用 MSMQ 时,我们希望能够即使在服务离线时也能发送消息给服务,并且我们希望服务即使客户端未连接也能发送消息给客户端。因此,这不是一个好的解决方案。
但您应该会觉得有趣的是我如何在没有可靠会话的情况下实现它的!也许,为什么它只在有可靠会话的情况下才有效。
首先,我将在 SOAP 级别解释双工通信是如何工作的。
我将使用 Visio 和这个Visio 模板“其中包含 51 个集成模式图标作为 Visio 形状”。
WCF 中的 Channel(通道)是能够接收或发送消息的东西。有几种类型的通道,但我只会谈论 IInputChannel
/IOutputChannel
。这些通道是 netMsmqBinding 使用的通道,这意味着 IOutputChannel
仅支持单向发送消息给另一个 IInputChannel
。服务器侦听 IInputChannel
,客户端使用 IOutputChannel
发送消息。显然,netMsmqBinding 的这些通道的具体实现使用 MSMQ。
如果我们希望能够在双工场景中将消息回发给客户端,客户端需要公开一个 IInputChannel
,而服务需要使用一个 IOutputChannel
。
在 WCF 中,IInputChannel
+ IOutputChannel
= IDuplexChannel
,而 WCF 随附的CompositeDuplexBindingElement
负责将一对 IInputChannel
/IOutputChannel
转换为 IDuplexChannel
。我们稍后会回到这里。
首先,客户端需要告诉服务器将回复发送到哪里,就像您在下面的图中所看到的。这通过每个消息的 SOAP 标头中的一些附加元素来完成。
并且,当服务器响应时,它需要指定返回地址。
所以我的目标是
- 客户端必须从 netMsmqBinding 创建一个
IInputChannel
来接收来自服务器的消息。 - 将此
IInputChannel
的地址附加到发送到服务器的每条消息的返回地址。 - 服务器必须从 netMsmqBinding 创建一个
IOutputChannel
来发送消息给客户端。 - 将
IInputChannel
的地址附加到发送到客户端的每条消息的返回地址。
您可以看到我的解决方案并未与 MSMQ 耦合,并且可以与任何支持 IInputChannel
/IOutputChannel
的绑定一起使用。
好吧,首先,我将解释 Mike Taulty 的解决方案为什么在有可靠会话的情况下有效,以及为什么它是一个糟糕的解决方案。诀窍在于返回地址随客户端发送给服务器的第一条消息一起发送。服务器为客户端创建一个会话,并将返回地址存储在内存中。然后,客户端在每条 SOAP 标头中发送其会话 ID,而不带返回地址。显然,如果服务器崩溃,会话就会丢失,客户端的消息就会被丢弃,或者发送到死信队列。
那么,我如何扩展 WCF 来解决这个问题呢?我将创建自己的 BindingElement
,但我们需要了解什么是绑定。
WCF 管道
当我想到绑定时,我就会想到汉堡包。而实际上,有两种汉堡包:一种是你最喜欢的快餐店做的,另一种是你妈妈/妻子/你自己用爱和心意做的。绑定的组成部分是 BindingElement
。
先谈谈快餐。WCF 内置绑定的好处是它们确实易于理解和创建。而且,嗯……这是快餐,所以你不知道里面到底有什么,但它们有名字,你也能猜到里面有一些 BindingElement
。
例如,对于 WSHttpBinding
,你可以猜到里面有一个 HttpTransportBindingElement
,并且你也能猜到它通过 HTTP 发送 SOAP 消息。你猜测这段代码会启用可靠会话,但你不知道确切是哪个组件或“BindingElement
”受到了影响。
wsHttpBinding.ReliableSession.Enabled = true;
好处是,在大多数情况下,您不必了解内部原理。微软做得很好,满足了大多数常见的用例。当您将绑定传递给服务主机或通道工厂时,它将调用
binding.CreateBindingElements()
来创建所有 BindingElement
,这些 BindingElement
基于您在内置 Binding
上设置的属性。
但有时,您需要自己指定要添加到 Binding
中的组件或“BindingElement
”。这称为自制绑定,有两种创建方式
- 继承
Binding
类 - 使用
CustomBinding
我将使用后者,因为这样最容易解释 Binding
的工作原理。
CustomBinding
只是一个您直接指定所有 BindingElement
的绑定。例如
binding = new CustomBinding(
new ListenUriBindingElement()
{
ListenUriBaseAddress = baseClient
},
new CompositeDuplexBindingElement(),
new ReplyToBindingElement(),
new TextMessageEncodingBindingElement(),
CreateMsmqBinding());
这会创建一个包含 ListenUriBindingElement
、CompositeDuplexBindingElement
、ReplyToBindingElement
、TextMessageEncodingBindingElement
和 MsmqTransportBindingElement
(它是 CreateMsmqBinding()
的返回类型)的 Binding
。这个枚举称为“BindingElement 堆栈”;最后一个 BindingElement
在堆栈中最低,它始终是 TransportBindingElement
。
大多数情况下,服务器端和客户端的 BindingElement
堆栈是相同的。每个 BindingElement
都可以决定修改、检查或验证消息,或者它们可以将参数传递给 BindingElement
堆栈中较低的组件。例如,当您为绑定选择消息安全时,WCF 会在内部创建绑定元素,这些元素使用证书或其他内容对消息进行编码/解码。服务器和客户端拥有相同的堆栈;BindingElement
会对在客户端发送的消息进行编码,并在服务器端进行解码。
另一个例子,当您选择将消息编码为纯文本 TextMessageEncodingBindingElement
时,它不会拦截任何消息,但它会将自身作为参数传递到堆栈中。
context.BindingParameters.Add(this);
这样,TransportBindingElement
(堆栈中最低的元素)就可以检索 MessageEncodingBindingElement
,以便知道如何将 Message
序列化和反序列化为字节流。每个 BindingElement
都清楚地说明了它是否能使用 CanBuildChannelListener<TChannel>
和 CanBuildChannelFactory<TChannel>
构建通道类型。而且,每个 BindingElement
在创建其“通道管理器”时都有一个 BindingContext
,用于将参数向下传递到堆栈,或为堆栈中直接下面的 BindingElement
构建“通道管理器”。
通道管理器是 IChannelFactory
或 IChannelListenner
;BuildChannelListener
在服务器端调用,BuildChannelFactory
在客户端调用。
IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
大多数情况下,中间绑定不关心创建通道管理器,因此它们只是将调用委托给堆栈,就像 TextMessageEncodingBindingElement
所做的那样。
return context.BuildInnerChannelFactory<TChannel>();
实际上,这段代码只是调用当前 BindingElement
下方的 BindingElement
的 BuildChannelFactory
。
但是,如果您想拦截和修改消息,则需要将内部通道管理器包装在一个自定义通道管理器中。我所说的“内部通道管理器”是由当前 BindingElement
下方的 BindingElement
创建的通道管理器。
这张图解释了通道管理器是如何创建的。它假设 BindingElement1
和 BindingElement2
希望拦截消息,因此它们包装了内部通道管理器。TextMessageEncodingBindingElement
不需要拦截消息,因此它不会包装 TransportBindingElement1
返回的通道管理器。
这是一个想要拦截消息的绑定元素的示例;这是装饰器模式。
public override IChannelFactory<TChannel>
BuildChannelFactory<TChannel>(BindingContext context)
{
if(!CanBuildChannelFactory<TChannel>(context))
{
throw new ArgumentException("Impossible to build ReplyToBindingElement");
}
var innerChannelFactory = (IChannelFactory<IOutputChannel>)
context.BuildInnerChannelFactory<TChannel>();
return (IChannelFactory<TChannel>)new ReplyToChannelFactory(
context.ListenUriBaseAddress, innerChannelFactory);
}
我的 ReplyToChannelFactory
实现将拦截发送到 innerChannel
的调用。大多数调用都只是委托给 innerChannelFactory
。我们稍后将看到实现。ServiceHost
或 ChannelFactory<ServiceType>
类将使用最顶层绑定元素的通道管理器来发送或接收消息。请不要将 ChannelFactory<ServiceType>
与 IChannelFactory<TChannel>
混淆;ChannelFactory<ServiceType>
是该接口的实现,但它主要用于创建客户端代理,而不是由 BindingElement
返回。
ChannelFactory<ServiceType>
只是一个类,它根据您在客户端代理上调用的方法和参数序列化和反序列化 SOAP 消息的主体,然后将消息传递给由最顶层 BindingElement
(由您的最顶层 BindingElement
创建)创建的通道。ServiceHost
做的事情相同,只是将 IChannelFactory
替换为 IChannelListener
。
这是通道的创建;请注意,IChannelFactories
也使用装饰器模式来创建通道。
然后,当您调用代理上的方法时,会创建一个消息,并调用最顶层通道的 Send
方法
双工 MSMQ 实现
好的,现在,如果您理解了我上面说的话,您会认为实现将是一件轻而易举的事,事实也确实如此!要理解它如何工作有一个很深的学习曲线,但一旦理解了,您就会认为 WCF 非常容易扩展且设计精良。您会发现它并不像看起来那么难。我越了解 WCF,就越多产生项目想法(很棒的,也很糟糕的……)
首先,我创建了名为 ProxyXXXX : XXXX
的辅助类,其中 XXXX 是我想拦截调用的接口。ProxyXXXX
将接收一个内部 XXXX 并将所有调用转发给它。
ProxyXXXX
的每个方法、属性和事件都是 virtual
的,因此如果我想拦截调用,我只需继承 ProxyXXXX
并重写一个类成员。
例如,在这里,我所做的是在 ProxyOutputChannel
内部,我的通道将重写 Send
方法来拦截消息
public virtual void Send(System.ServiceModel.Channels.Message message,
System.TimeSpan timeout)
{
_InnerOutputChannel.Send(message, timeout);
}
ReplyToBindingElement
负责在出站消息中添加标头以实现双工通信。因此,它包装了每个内部 ChannelFactory
。
public override IChannelFactory<TChannel>
BuildChannelFactory<TChannel>(BindingContext context)
{
if(!CanBuildChannelFactory<TChannel>(context))
{
throw new ArgumentException("Impossible to build ReplyToBindingElement");
}
var innerChannel = (IChannelFactory<IOutputChannel>)
context.BuildInnerChannelFactory<TChannel>();
return (IChannelFactory<TChannel>)
new ReplyToChannelFactory(context.ListenUriBaseAddress, innerChannel);
}
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
return base.BuildChannelListener<TChannel>(context);
}
public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
return typeof(TChannel) == typeof(IOutputChannel);
}
然后 ReplyToChannelFactory
包装了每个内部通道
public class ReplyToChannelFactory : ProxyChannelFactory<IOutputChannel>
{
readonly Uri _ReplyAddress;
public ReplyToChannelFactory(Uri replyAddress, IChannelFactory<IOutputChannel> inner)
: base(inner)
{
_ReplyAddress = replyAddress;
}
public override IOutputChannel CreateChannel(System.ServiceModel.EndpointAddress to)
{
return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to));
}
public override IOutputChannel
CreateChannel(System.ServiceModel.EndpointAddress to, Uri via)
{
return new ReplyToChannel(_ReplyAddress, base.CreateChannel(to, via));
}
}
并且,通道将所有必要的标头添加到出站消息中
public class ReplyToChannel : ProxyOutputChannel
{
readonly Uri _ReplyAddress;
public ReplyToChannel(Uri replyAddress, IOutputChannel inner)
: base(inner)
{
_ReplyAddress = replyAddress;
}
public override void Send(Message message)
{
ApplyReplyTo(message);
base.Send(message);
}
public override void Send(Message message, TimeSpan timeout)
{
ApplyReplyTo(message);
base.Send(message, timeout);
}
void ApplyReplyTo(Message message)
{
if(message.Headers.MessageId == null)
{
message.Headers.MessageId = new System.Xml.UniqueId();
}
if(message.Headers.From == null)
{
message.Headers.From =
new System.ServiceModel.EndpointAddress(_ReplyAddress);
}
if(message.Headers.ReplyTo == null)
{
message.Headers.ReplyTo =
new System.ServiceModel.EndpointAddress(_ReplyAddress);
}
}
}
这是我的服务实现和契约
[ServiceContract(CallbackContract = typeof(IConversation))]
public interface IConversation
{
[OperationContract(IsOneWay = true)]
void Say(String something);
}
public class Conversation : IConversation
{
#region IConversation Members
public void Say(string something)
{
Console.WriteLine("Someone says \"{0}\" to you, what is your response ?",
something);
String response = Console.ReadLine();
OperationContext.Current.GetCallbackChannel<IConversation>().Say(response);
}
#endregion
}
但在服务器端,在尝试向回调通道发送消息时会抛出异常。我不知道为什么,但似乎 WCF 没有自动将出站消息的 To 标头设置为入站消息的 ReplyTo 标头的地址……我不知道为什么我只需要在服务器端这样做,而在客户端却不需要,如果有人能向我解释,我将不胜感激……所以,我需要创建一个带有消息检查器的行为来修复这个问题。
public class ReplyToBehavior : IEndpointBehavior
{
public class ReplyToInspector : IDispatchMessageInspector
{
#region IDispatchMessageInspector Members
public object AfterReceiveRequest(ref Message request,
IClientChannel channel, InstanceContext instanceContext)
{
var reply = request.Headers.ReplyTo;
OperationContext.Current.OutgoingMessageHeaders.To = reply.Uri;
OperationContext.Current.OutgoingMessageHeaders.RelatesTo =
request.Headers.MessageId;
return null;
}
public void BeforeSendReply(ref Message reply, object correlationState)
{
}
#endregion
}
#region IEndpointBehavior Members
public void AddBindingParameters(ServiceEndpoint endpoint,
BindingParameterCollection bindingParameters)
{
}
public void ApplyClientBehavior(ServiceEndpoint endpoint,
System.ServiceModel.Dispatcher.ClientRuntime clientRuntime)
{
}
public void ApplyDispatchBehavior(ServiceEndpoint endpoint,
System.ServiceModel.Dispatcher.EndpointDispatcher endpointDispatcher)
{
endpointDispatcher.DispatchRuntime.MessageInspectors.Add(new ReplyToInspector());
}
public void Validate(ServiceEndpoint endpoint)
{
}
#endregion
}
我的实现完成了!
最后两件事
在创建 BindingElement
堆栈期间,我需要两样东西:首先,一个 BindingElement
让我的客户端指定消息的回复地址以及要侦听的 MSMQ 队列。这是 ListenUriBindingElement
的职责。
我从这篇博客文章中获取了它。实现并不复杂,它只是设置 context.ListenUriBaseAddress
。堆栈中较低的 BindingElement
s 需要此值来知道在哪里侦听输入消息。
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
if(listenUriBaseAddress != null)
context.ListenUriBaseAddress = listenUriBaseAddress;
return base.BuildChannelListener<TChannel>(context);
}
我还必须指定我的服务器可以发送消息,并且我的客户端必须侦听 MSMQ 通道(此通道通过之前的 BindingElement
ListenUriBindingElement
设置的 context.ListenUriBaseAddress
指定)。ServiceHost
将启用服务通过回调发送消息,前提是 Binding
可以构建 IDuplexChannel
。唉,事实并非如此,因为堆栈中的一个 BindingElement
不支持它:MsmqTransportBindingElement.CanBuildChannelListener<IDuplexChannel>
返回 false
。所以,我需要一个 BindingElement
,它在接受创建 IInputChannel
和 IOuputChannel
的通道之上创建一个 IDuplexChannel
。这很容易,您只需使用 WCF 随附的CompositeDuplexBindingElement
。
这是 ChannelFactory<ServiceType>
创建我的 BindingElement
堆栈中的最顶层 IChannelFactory
的方式;如果您想知道它在服务端的实现方式,只需将 IChannelFactory
替换为 IChannelListener
,将 ChannelFactory<ServiceType>
替换为 ServiceHost
。
我们说 CompositeDuplexBindingElement
改变了通道的形状。有关更多信息,ReliableSessionBindingElement
也改变了通道的形状;它将 XXXChannel
转换为 IXXXChannel
再转换为 IXXXSessionChannel
来启用有会话的通信。
在客户端,我将使用 DuplexChannelFactory
而不是 ChannelFactory
来指定回调实现。与 ServiceHost
相同的问题通过相同的解决方案得到解决。
客户端和服务器创建
这是我为双方创建的 BindingElement
堆栈
public class BindingFactory
{
public static Binding Create(Uri baseClient)
{
CustomBinding binding = null;
binding = new CustomBinding(
new ListenUriBindingElement()
{
ListenUriBaseAddress = baseClient
},
new CompositeDuplexBindingElement(),
new ReplyToBindingElement(),
new TextMessageEncodingBindingElement(),
CreateMsmqBinding());
return binding;
}
private static MsmqTransportBindingElement CreateMsmqBinding()
{
var binding = new MsmqTransportBindingElement();
binding.MsmqTransportSecurity.MsmqAuthenticationMode =
MsmqAuthenticationMode.None;
binding.MsmqTransportSecurity.MsmqProtectionLevel =
System.Net.Security.ProtectionLevel.None;
binding.UseActiveDirectory = false;
binding.ExactlyOnce = false;
return binding;
}
}
显然,服务器端不需要 ListenUriBindingElement
,因为 ServiceHost
将设置 context.ListenUriBaseAddress
。
public override IChannelListener<TChannel>
BuildChannelListener<TChannel>(BindingContext context)
{
if(listenUriBaseAddress != null)
context.ListenUriBaseAddress = listenUriBaseAddress;
return base.BuildChannelListener<TChannel>(context);
}
为了不覆盖 context.ListenUriBaseAddress
,我将调用
BindingFactory.Create(null)
在服务器端。
客户端创建
static void Main(string[] args)
{
Console.WriteLine("Hit a key");
Console.ReadLine();
Binding bind = BindingFactory.Create(new Uri("net.msmq://SHIELDP/private/client"));
DuplexChannelFactory<IConversation> channel = new DuplexChannelFactory<IConversation>
(new Conversation(),
bind,
new EndpointAddress("net.msmq://SHIELDP/private/server"));
channel.CreateChannel().Say("Hello I'm Nico");
while(true)
{
}
}
服务器创建
class Program
{
static void Main(string[] args)
{
Binding bind = BindingFactory.Create(null);
ServiceHost host = new ServiceHost(typeof(Conversation),
new Uri("net.msmq://SHIELDP/private/server"));
var endpoint = host.AddServiceEndpoint(typeof(IConversation), bind, "");
endpoint.Behaviors.Add(new ReplyToBehavior());
host.Open();
Console.WriteLine("Open");
while(true)
{
}
}
}
现在,您可以通过 MSMQ 在客户端和服务器端使用回调。
结论
我确信这个项目对于有高可用性要求的人来说会很有用。微软没有实现这一点真是太遗憾了;MSMQ 非常适合双工通信……我希望您喜欢这篇文章。下一篇将讨论WCF over Twitter,并将展示如何创建自己的 TransportBindingElement
。如果您喜欢这篇文章,请告诉我!:)