使用 WCF 回调服务的节流






4.97/5 (29投票s)
在使用具有瞬态订阅者的回调时,您必须保持通道打开。这是管理和使用服务节流以使您的服务能够处理超过 10 个订阅者的方法。
引言
我在一个利用 BizTalk 处理来自各种源的消息并将其分发到多个发送端口的环境中工作。现在,有人可以使用 BizTalk 作为 ESB;然而,这带来了一些问题,主要是需要为每个需要将消息路由到的新应用程序创建一个新的发送端口。也就是说,除非有人可以保证所有开发人员只查看一个消息队列而绝不将消息出列。老实说,这会带来更多问题,而不是解决问题,更不用说这是一种糟糕的做法了。这促使我使用 WCF 回调服务作为我们 ESB(企业服务总线)的单一入口点。
背景
考虑到 WCF(Windows Communication Foundation)支持回调契约,从而支持事件驱动架构,人们可以轻松地在 BizTalk 中创建一个发送端口到消息队列(此处为 MSMQ),然后公开一个 WCF 事件驱动的 Web 服务,该服务可以轻松地被许多客户端订阅并接收消息。在我的旅程中,我遇到了一些问题。这就是我今天写这篇文章的原因,希望能让你们中的一些人避免浪费太多时间去寻找。
Using the Code
让我们从简单部分开始,即服务契约。这包括回调契约以及允许客户端订阅和取消订阅的操作。我将使用我能想到最基本的小型 Web 服务,以免偏离我遇到的实际问题。
[ServiceContract(SessionMode=SessionMode.Required,
CallbackContract=typeof(IEventSystemCallback))]
interface IEventSystem
{
[OperationContract(IsOneWay=true)]
void Subscribe();
[OperationContract(IsOneWay = true)]
void Unsubscribe();
}
interface IEventSystemCallback
{
[OperationContract(IsOneWay = true)]
void OnMessageReceived(string message);
}
现在,这就是在 WCF 中生成事件驱动 Web 服务所需的一切;至少,就契约而言。如果您注意到,服务将公开两个操作(Subscribe
和 Unsubscribe
)。请记住,为了演示目的,这是最基本的。在实际的 ESB 解决方案中,我有一个对象代表实际想要订阅的事件以及用于跟踪目的的其他详细信息。
IEventSystemCallback
接口是回调契约,正如您将在 IEventSystem
接口的服务契约中注意到的那样。因此,您不需要在 IEventSystemCallback
契约上设置服务契约属性。您只需要公开客户端预期会收到的所有方法(或事件)。
现在,让我们进入困难的部分。我们将从服务本身开始。我将向您展示完整的服务实现,然后我们再进行分解。
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
internal sealed class EventService:IEventSystem
{
public delegate void CallbackDelegate<T>(T t);
public static CallbackDelegate<string> MessageReceived;
#region IEventSystem Members
public void Subscribe()
{
IEventSystemCallback callback =
OperationContext.Current.GetCallbackChannel<IEventSystemCallback>();
MessageReceived += callback.OnMessageReceived;
ICommunicationObject obj = (ICommunicationObject)callback;
obj.Closed += new EventHandler(EventService_Closed);
obj.Closing += new EventHandler(EventService_Closing);
}
void EventService_Closing(object sender, EventArgs e)
{
Console.WriteLine("Client Closing...");
}
void EventService_Closed(object sender, EventArgs e)
{
MessageReceived -= ((IEventSystemCallback)sender).OnMessageReceived;
Console.WriteLine("Closed Client Removed!");
}
public void Unsubscribe()
{
IEventSystemCallback callback =
OperationContext.Current.GetCallbackChannel<IEventSystemCallback>();
MessageReceived -= callback.OnMessageReceived;
}
#endregion
public static void SendMessage(string Message)
{
try
{
MessageReceived(Message);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
public static void NotifyServiceStop()
{
SendMessage("Service Stopping...");
}
}
首先,我们从服务本身开始
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
internal sealed class EventService:IEventSystem
在这里,我们实现了 IEventSystem
接口并添加了一个服务行为属性来设置 InstanceContextMode
。您可以将 InstanceContextMode
设置为 PerCall
、PerSession
或 Single
。我们将在后面的节流部分更详细地讨论这一点。现在,您只需知道我选择了 PerSession
,因为回调契约的使用需要一个会话,而我只是想为稍后解释节流提供便利。
*注意:InstanceContextMode.PerSession
和 SessionMode=SessionMode.Required
之间没有关联。您可以使用 Required
SessionMode
组合任何 InstanceContextMode
,正如我们稍后将看到的。
public delegate void CallbackDelegate<T>(T t);
public static CallbackDelegate<string> MessageReceived;
关于回调服务,最难理解的事情之一是如何存储所有订阅者。这就是我创建静态 GenericDelegate
的原因。通常,我会将所有委托放在另一个命名空间和文件位置,但为了方便演示,我将委托创建直接放在服务类中。您也可以将订阅者引用(或者如果您愿意,可以将 InstanceContext
)存储在静态集合对象(List<T>
、ArrayList
、Hashtable
等)中。我选择委托是因为它非常易于使用,并且与事件服务的工作方式一致。所有 Instance Contexts 都将作为 InvocationList
对象的目标存储,当您连接事件时,正如我们在这里将看到的。
public void Subscribe()
{
IEventSystemCallback callback =
OperationContext.Current.GetCallbackChannel<IEventSystemCallback>();
MessageReceived += callback.OnMessageReceived;
ICommunicationObject obj = (ICommunicationObject)callback;
obj.Closed += new EventHandler(EventService_Closed);
obj.Closing += new EventHandler(EventService_Closing);
}
void EventService_Closing(object sender, EventArgs e)
{
Console.WriteLine("Client Closing...");
}
void EventService_Closed(object sender, EventArgs e)
{
MessageReceived -= ((IEventSystemCallback)sender).OnMessageReceived;
Console.WriteLine("Closed Client Removed!");
}
public void Unsubscribe()
{
IEventSystemCallback callback =
OperationContext.Current.GetCallbackChannel<IEventSystemCallback>();
MessageReceived -= callback.OnMessageReceived;
}
ClientChannel
由订阅者自动在每次服务方法调用时传递。因此,当他们订阅时,我们会继续连接本次演示中使用的所有事件。只需要连接回调契约的 OnMessageReceived
方法,但是如何处理断开连接的客户端?我看到过一种方法,即服务在每次回调时尝试监视通道状态,但为什么要在可以使用 ICommunicationObject
(或如果您选择此强制类型转换,则为 IDuplexContextChannel)的 Closing
和/或 Closed
事件时才需要付出如此大的开销呢?您还可以在通道上连接其他有用的事件,例如:Faulted
、Opening
和 Opened
。
最后,我创建了几个方法来实际将消息发送给订阅者。在实际的 ESB 服务中,这就是 MSMQ 监视发生的地方,并将所有新消息触发给订阅者。
public static void SendMessage(string Message)
{
try
{
MessageReceived(Message);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
public static void NotifyServiceStop()
{
SendMessage("Service Stopped!");
}
我还添加了一个 NotifyServiceStop
方法,原因与我连接订阅者的 Closing
和 Closed
事件相同。最好通知订阅者服务即将关闭。在我的实现中,我有一个 ServiceNotification
对象,可以发送多种类型的通知,就像我拥有 EventNotifications
用于传输多个事件一样。
我已将此演示托管在控制台应用程序中使用 ServiceHost
。您可以使用 Windows 服务以相同的方式自托管,或利用 IIS 7 中的 WAS 来托管 netTcp 服务。还有其他几种方法可以实现这一点,但这超出了本文的范围。
class Program
{
private static int count;
public static int Count
{
get
{
count += 1;
return count;
}
}
static void Main(string[] args)
{
ServiceHost host = new ServiceHost(typeof(EventService));
host.Closing += new EventHandler(host_Closing);
host.Open();
Console.WriteLine("Service Started");
Timer timer = new Timer(delegate(object state)
{
string message = "Message " + Count.ToString();
try
{
if (EventService.MessageReceived != null)
{
Console.WriteLine(
EventService.MessageReceived.GetInvocationList().Length.ToString() +
" Subscribers");
EventService.SendMessage(message);
Console.WriteLine("Sent: " + message);
}
else
{
Console.WriteLine("0 Subscribers");
Console.WriteLine("Skipped: " + message);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}, null, 5000, 5000);
Console.ReadKey(true);
host.Close();
}
static void host_Closing(object sender, EventArgs e)
{
try
{
EventService.NotifyServiceStop();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
这里没什么特别的。只是普通的 ServiceHost
实现。唯一需要指出的是,我连接了 host.Closing
事件以通知订阅者服务正在关闭,并且通过使用静态委托来存储订阅者,您可以检查 InvocationList
的不同方面。在这里,我只是显示有多少订阅者正在收听。
接下来是配置……
这是让我感到困惑最多的地方,事实上,我仍然有一些问题可能需要您帮助解答。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<behaviors>
<serviceBehaviors>
<behavior name="ExposeMexAndThrottleBehavior">
<serviceMetadata httpGetEnabled="true"
httpGetUrl="https://:1111/EventService/Mex"/>
<serviceThrottling maxConcurrentCalls="3"
maxConcurrentInstances="100"
maxConcurrentSessions="100"/>
</behavior>
</serviceBehaviors>
</behaviors>
<services>
<service name="ESBService.EventService"
behaviorConfiguration="ExposeMexAndThrottleBehavior">
<endpoint address="net.tcp://:9999/EventService/"
binding="netTcpBinding"
contract="ESBService.IEventSystem"/>
<endpoint address="https://:1111/EventService/Mex"
binding="mexHttpBinding"
contract="IMetadataExchange"/>
</service>
</services>
</system.serviceModel>
</configuration>
我不会介绍如何创建终结点或公开元数据,因为网络上有很多示例。在此配置中,值得关注的是 serviceThrottling
。在我使用第一个 WCF 双工服务时,这是最令人困惑的。事实上,我甚至不认为我需要它,因为我根本不想“节流”我的服务。我会一个接一个地启动我的服务并开始连接客户端,但我仍然被限制在最多 10 个订阅客户端。我尝试在 netTcpBinding
上设置 MaxConnections
属性,但没有成功。
<bindings>
<netTcpBinding>
<binding name="tcpSettings" maxConnections="50"/>
</netTcpBinding>
</bindings>
这个设置对我来说仍然很困惑,因为无论我将其设置为多少,它都不会影响我的服务。我读到您可以使用 serviceThrottling
或 maxConnections
属性来“节流”服务,并且服务将使用两者之间的最低设置。无论我将 MaxConnections
设置为多少,似乎都没有任何区别,所以根本不要使用它。
解决方案是通过使用 serviceThrottling
设置来呈现的,这实际上具有误导性。对我来说,“节流”这个词意味着限制实例、会话和调用,而实际上它们默认已经“节流”了。如果您移除 serviceThrottling
行为并在服务运行时检查 ServiceHost.Description.Behaviors
,您将看不到 ServiceThrottlingBehavior
。这使我误以为 serviceThrottling
没有实现,除非明确指定。因此,我一直认为这与我的 TcpBinding
设置有关。结果发现,即使您在服务中看不到节流行为,它仍然存在,并将您的服务节流到不切实际的限制。也许,为 serviceThrottling
行为起一个更好的名字可能是 serviceLimiting
。
为了描述这些节流设置的含义,我们现在开始介绍它们。
请记住,我使用的是 PerSession
InstanceContextMode
。这会影响我设置节流参数的方式。
<serviceThrottling maxConcurrentCalls="3"
maxConcurrentInstances="100"
maxConcurrentSessions="100"/>
maxConcurrentSessions
在这里最重要。在此示例中,它允许 100 个订阅者连接到我的服务。每次订阅,服务都会创建一个新会话,从而创建一个新实例,因此 maxConcurrentInstances
也设置为相同。在实际情况中,您将根据硬件限制和预期的用户数以及增长空间来设置这些设置。
maxConcurrentCalls
应设置为您 maxConcurrentInstances
的 1% 到 3% 之间。这就是为什么我的设置为 3。
未指定(或未创建 serviceThrottling
行为)时的默认值
maxConcurrentCalls
= 16maxConcurrentSessions
= 10maxConcurrentInstances
= 无限制
现在,如果您使用 PerCall
或 Single
InstanceContextMode
,您的节流设置将有所不同。在此演示中,我将不详细介绍每种配置。如果您需要帮助,请随时发帖,我会尽快回复您。
客户端代码
public partial class Form1 : Form,ESB.IEventSystemCallback
{
public Form1()
{
InitializeComponent();
}
ESB.EventSystemClient client;
private void Form1_Load(object sender, EventArgs e)
{
try
{
client =
new ESBClient.ESB.EventSystemClient(new InstanceContext(this));
client.Subscribe();
}
catch (Exception ex)
{
listBox1.Items.Add(ex.ToString());
}
}
#region IEventSystemCallback Members
public void OnMessageReceived(string message)
{
listBox1.Items.Add(message);
}
#endregion
* 要使用示例代码,请在两个单独的 VS2008 IDE 中打开客户端和服务解决方案。运行服务,然后按 CTRL+F5 打开多个客户端。随意打开和关闭任意数量的客户端。您还可以调整节流设置,然后重复此过程。