事件驱动通信基础






4.83/5 (11投票s)
2007年9月21日
11分钟阅读

87978

1535
本文介绍了如何为 WCF 应用程序添加事件驱动行为。
引言
众所周知,WCF 是一项非常强大的技术。不幸的是,如果没有代理、异步委托、双工通道和回调处理程序等复杂概念,几乎不可能实现这种强大。即使您精通这些概念,也需要编写大量代码来实现有效且可靠的消息传递。
此外,由于 GUI 和事件驱动架构已成为标准,您将需要设计具有多播和过滤功能的异步事件引擎。我相信这些术语不会吓倒您,您绝对能够胜任这项工作,但为什么不利用现成的解决方案来节省您的时间,让您能更多地专注于您的业务任务呢?让我们顺畅地了解其中一种解决方案。
我想介绍一个我将在所有项目中使用到的工具。简而言之,这是一个用于 WCF 的事件驱动框架。我们称之为 Event Driven Communication Foundation (EdCF),它基于发布/订阅模式。复杂的代码已被隐藏,现在您只需列出您的事件并编写事件处理程序,即可组织双向、异步、可靠的消息传递。
现在我将尝试将我们的框架划分为功能块,并深入探讨细节。
发布/订阅模式
首先,我想说几句关于发布/订阅模式。我们为什么选择这种模式作为框架的基础?因为存在多种不同的通信场景:一对一、一对多、多对多。一旦我们决定我们的框架应该能够实现所有这些场景,最合适的设计模式就是发布/订阅模式,并有一个中间的 pub/sub 服务。
它是如何工作的?任何远程对等方都扮演发布者或/和订阅者的角色。发布者通知中间的 Pub/Sub 服务它能够触发事件。如果订阅者希望收到特定事件的通知,它会请求 Pub/Sub 服务将其添加到感兴趣订阅者的列表中。每当发布者更改状态时,它就会触发一个事件。中间的 Pub/Sub 服务会存储订阅者列表,并将事件多播给该列表中的所有订阅者。有关更多信息,请参阅本文。
异步消息传递
如果您决定独自完成所有工作,这将是您将面临的第一个棘手的问题。场景很简单:您的服务器需要处理来自两个不同客户端的相同请求。
第一个客户端的连接速度很慢。如果您的服务器不知道异步消息传递,第二个客户端将等待第一个请求处理完毕。如果我们有数十个并发客户端该怎么办?我们应该从线程池中为每个请求执行一个单独的线程。这需要架构级别的决策和对异步委托的了解。为了说明这些代码可能多么令人困惑,我向您展示以下内容:
我有一个订阅者列表,并想发布 MyEvent
事件。我会写类似这样的代码:
public static Publish (TSub[] subscribers, string eventName)
{
foreach (TSub subscriber in subscribers)
{
Invoke(subscriber, eventName)
}
}
如果我决定异步地做同样的事情,我将需要输入更多内容:
public static Publish (TSub[] subscribers, string eventName)
{
WaitCallback callbackDelegate = delegate (TSub subscriber)
{
Invoke(subscriber, eventName);
}
Action<TSub>queueUp = delegate(TSub subscriber)
{
ThreadPool.QueueUserWorkItem(callbackDelegate, subscriber)
}
Array.ForEach(subscriber, queueUp);
}
幸运的是,所有这些都被隐藏在框架深处。从最终用户的角度来看,它看起来就像一次方法调用:
PublishingService.MyEvent();
结果收集器
发布/订阅模式与中间 Pub/Sub 服务的一个优点是发布者和订阅者的解耦。订阅者不知道发布者,反之亦然。这是非常重要的行为,因为您需要构建一个稳定且可扩展的解决方案,包含数百个客户端。
但是,如果您需要实现一些紧密耦合的对等方之间的通信呢?在这种情况下,获取事件触发结果的能力会被禁用。例如,想象一个经典的计算器场景。一个客户端连接到服务器,请求它对两个值求和并返回结果。正如您所记得的,我们希望框架能够在广泛的场景中工作。因此,上述问题不应困扰我们的用户。我们应该提供一个解决方案,并能够将事件的结果传递给发布者。
例如,任何单个订阅者都应返回一个字符串。发布者希望从所有订阅者那里获得字符串列表作为事件触发的结果。从最终用户的角度来看,最有用的将是以下代码:
string [] myResult = PublishingService.MyEvent();
但是,这段代码将冻结发布者线程,直到所有订阅者都回复。我们不会有这种缺点。我们希望异步收集结果,并在它们准备好后立即获取它们。解决方案是使用回调。发布者能够触发 IMyEvents_Pub
接口中描述的任何事件。
[ServiceContract]
public interface IMyEvents_Pub
{
[OperationContract(IsOneWay = true)]
[EventDrivenOperationContract(IsWithReply = true)]
string MyEvent();
}
如果您需要获取事件的结果,您需要使用 [EventDrivenOperationContract(IsWithReply = true)]
属性标记 IMyEvents_Pub
接口的相应方法。基于这些信息,框架将自动生成所有基础设施、接口、回调契约、代理和绑定。您唯一需要做的就是在发布者端定义一个实现 IMyEvents_PubCallback
接口的类。有关更多信息,请参阅本文的“代码生成”部分。
public class MyPublishCallbackHandler : IMyEvents_PubCallback
{
public void MyEventReply(string[] returnedResults)
{
Console.WriteLine("Returned results: ");
foreach (string s in returnedResults)
Console.WriteLine("\t" + s);
}
}
发布者触发事件。一旦结果准备好,Pub/Sub 服务将使用结果数组作为参数回调发布者。
订阅者过滤
订阅者过滤对于分布式应用程序来说是一项相当常见的任务。它在订阅者列表不是永久性的并且取决于我们需要的事件时设置。例如,以一个简单的 Chat
示例,包含两个 ChatRoom
。如果一个订阅者进入第一个 ChatRoom
,它不关心第二个 ChatRoom
中发布的任何消息。因此,我们应该能够阻止接收错误的消息。要利用框架的过滤选项,最终用户应该这样做:
- 定义一个
Filter
枚举作为 Pub/Sub 服务的成员:[Flags] public enum Filter { ChatRoom1 = 0x1, ChatRoom2 = 0x2 }
- 用
[EventDrivenOperationContract(IsWithFilter = true)]
属性标记相应的事件:[ServiceContract] public interface IMyEvents_Pub { [OperationContract(IsOneWay = true)] [EventDrivenOperationContract(IsWithFilter = true)] void MyEvent(Filter filter); }
- 使用相应的过滤器订阅订阅者:
SubscribeWithFilter("MyEvent", (int)Filter.ChatRoom1);
要使用多个过滤器进行订阅,只需使用按位 OR
运算符:
SubscribeWithFilter("MyEvent",
(int)(Filter.ChatRoom1 | Filter.ChatRoom2));
代码生成
这种简单性似乎是不言而喻的,但框架在后台完成了繁重的工作。正如我之前提到的,该框架在我们日常工作中被积极使用。有时编写相同的代码片段一次又一次会非常乏味。因此,我们决定应用自动代码生成来消除一些繁琐的工作。尽管这种代码生成被广泛使用,但最终用户永远不会遇到它。因此,为了节省您我的时间,我将不揭示所有地方,也不会深入研究所有代码。我只会给您举一个例子,您就会明白其中的窍门。
让我们回到“结果收集器”部分。要使用此功能,最终用户只需编写五行简短的代码:
//Define an event and mark it with special atribute
[EventDrivenOperationContract(IsWithReply = true)]
string MyEvent();
//Write a callback handler
public class MyPublishCallbackHandler : IMyEvents_PubCallback
{
public void MyEventReply(string[] returnedResults)
{
}
}
很简单,不是吗?这是因为框架代替我们完成了所有工作。
- 它生成
IPublishCallback
接口。发布者应该实现这个接口来收集结果。这个接口包含带有Reply
后缀的方法,对应于用[EventDrivenOperationContract(IsWithReply = true)]
属性标记的每个事件。 - 它提供了
PublishCallbackBiulder
构建器,它生成IPublishCallback
类型的透明代理实例。代理是从调用的OperationContext
中检索的,并在框架中积极使用。 - 它包装了
IMyEvent_Pub
接口,因为它需要在发布者端实现IPublishCallback
。 - 它生成
ReturnedResultsConverter
类。正如您所理解的,任何单个订阅者返回一个值,但发布者应该有一个来自所有订阅者的所有结果的列表。ReturnResultConverter
类在这里非常有帮助。它使得能够将订阅者返回的结果(类型为Object[] (System.Reflection.MethodInfo.Invoke() 返回 Object)
)转换为正确返回类型的数组集合(例如,如果订阅者返回String
,则发布者期望String[]
)。 ReturnedResultsConverter
类的内容对应于用ReplyResultsConverter
后缀标记的每个事件方法。
现在看看隐藏的代码。让我们想象一下,我们有一个单独的 MyEvent
事件,正如本节开头所定义的。
- 生成的
IPublishCallback
如下所示:public interface IPublishCallback { [OperationContract(IsOneWay=true)] void MyEventReply(string[] returnedResults); }
- 生成的
PublishCallBackBuilder
如下所示:public class PublishCallbackBiulder { public static IPublishCallback publishCallback; public static void BuildCallback() { publishCallback = OperationContext.Current.GetCallbackChannel
(); } } - 生成的代理如下所示:
[System.ServiceModel.ServiceContract(CallbackContract= typeof(EDSOAFW.IPublishCallback), Name="IMyEvents_Pub")] public interface IMyEvents_Pub_Wrap : EDSOA.IMyEvents_Pub {}
- 生成的
ReturnedResultsConverter
如下所示:public class ReturnedResultsConverter { public static string[] MyEventReplyResultsConverter( object[] returnedResults) { string[] returnArray = new string[returnedResults.Length]; for (int i = 0; (i < returnedResults.Length); i = (i + 1)) { returnArray[i] = (String)returnedResults[i]; } return returnArray; } }
当然,我们可以手动完成,但想象一下您有几十个不同的事件。您将不得不为每个事件不断重写此代码。框架将不会让您在乏味的编码中因无聊而死。
最后的乐章:我想通过一个简单的聊天示例来展示所有这些东西。
聊天示例简介
首先,让我们定义基本术语。这将确保我们说着同一种语言。以下术语将在将来广泛使用,因此请快速浏览列表,以检查您的理解是否与我相同。
发布/订阅服务是一个中间远程对等方,负责管理发布者和订阅者列表。它将消息从发布者传递给订阅者。该服务还负责消息路由、批处理和排队。如果我们用客户端-服务器架构来说,这就是我们的服务器。
发布者是初始化消息发送的远程方。简单来说,这是一个能够向服务器发送消息的客户端。
订阅者是等待消息的远程方。这是一个能够接收消息的客户端。
发布/订阅客户端是一个结合了发布者和订阅者功能的应用程序。
事件是指满足以下声明的过程:
- 发布者向多个订阅者发送相同的消息。
- 所有订阅者都同时被调用。这意味着,如果与特定订阅者的连接很慢(或中断),发送给其余客户端的邮件不会等待有问题订阅者回复。
我们的示例实现了一个简单的聊天。因此,从业务角度来看,我们有稍微不同的术语。我们有 ChatRoom
,它只不过是发布/订阅服务。我们还有 ChatClient
,这是一个可以同时充当发布者和订阅者的客户端应用程序。
在继续之前,我想提一件事。在我的写作中,我假设您熟悉基本的 WCF 概念。如果您不熟悉,当然可以继续使用 EdCF,但我建议您查找一些关于 WCF 基础知识的信息。您可以从 MSDN 开始。
好的,我们已经就基本术语达成了一致。现在,我们需要采取一系列步骤来构建聊天示例。我将每个步骤的描述放在一个单独的 HOWTO
中。
HOWTO:创建和配置发布/订阅服务(ChatRoom)
目前,该示例只有一个事件:OnMessageSent
。如果您想添加其他事件,只需将相应的添加到 IMyEvents_Pub
和 IMyEvents_Sub
接口中即可。假设您想添加一个名为 MyNewEvent
的新事件。您需要做的唯一更改就是修改用于发布的契约接口……
[erviceContract]
public interface IMyEvents_Pub
{
[OperationContract(IsOneWay = true)]
[EventDrivenOperationContract(IsWithReply = true)]
void OnMessageSent(string message);
[OperationContract(IsOneWay = true)]
void MyNewEvent();
}
……以及相应的订阅契约:
[erviceContract]
public interface IMyEvents_Sub
{
[OperationContract(IsOneWay = false)]
string OnMessageSent(string message);
[OperationContract(IsOneWay = true)]
void MyNewEvent();
}
您需要做的最后一件事是在 App.config 文件中将 BaseAddress
值更改为适当的 IP 地址。
<service name="ChatRoom.MySubscriptionService"
behaviorConfiguration="ForDebuggingBehavior">
<host>
<baseAddresses>
<add baseAddress="https://:8000/MySubscriptionService"/>
</baseAddresses>
</host>
…
<service name="ChatRoom.MyPublishService_Wrapper"
behaviorConfiguration="ForDebuggingBehavior">
<host>
<baseAddresses>
<add baseAddress="https://:8000/MyPublishService"/>
</baseAddresses>
</host>
HOHOWTO:创建用于发布/订阅的代理
好了,我们已经配置并托管了 WCF 服务。现在我们想要一个允许我们的客户端连接到服务的代理。正如 MSDN 所说,您有三个选项:
- 从服务中检索 WSDL 并手工创建代理。
- 使用 Visual Studio 2005 的“添加服务引用”功能。
- 使用 SvcUtil.exe 工具生成代理类。
但是,我建议选择另一个选项。ChatSample
解决方案已经创建了用于发布和订阅的代理。如果您想在您的解决方案中添加一个新事件,只需使用这些代理并进行少量修改。例如,假设您想添加一个新的 MyNewEvent
。
ChatClient
>> clientForSubscription.cs
public interface IMySubscriptionServiceCallback
{
[System.ServiceModel.OperationContractAttribute(
Action="http://tempuri.org/IMySubscriptionService/OnMessageSent",
ReplyAction=
"http://tempuri.org/IMySubscriptionService/OnMessageSentResponse")]
string OnMessageSent(string message);
[System.ServiceModel.OperationContractAttribute(
Action="http://tempuri.org/IMySubscriptionService/MyNewEvent",
ReplyAction=
"http://tempuri.org/IMySubscriptionService/MyNewEventResponse")]
void MyNewEvent();
}
ChatClient
>> clientForPublishing.cs
public interface IMyEvents_Pub
{
[System.ServiceModel.OperationContractAttribute(IsOneWay=true,
Action="http://tempuri.org/IMyEvents_Pub/OnMessageSent")]
void OnMessageSent(string message);
[System.ServiceModel.OperationContractAttribute(IsOneWay=true,
Action="http://tempuri.org/IMyEvents_Pub/MyNewEvent")]
void MyNewEvent();
}
HOWTO:配置发布/订阅客户端(ChatClient)
既然您已经准备了代理,您的客户端就可以连接到服务了。但是,您需要设置传输设置和服务的地址。所有可配置值都位于 App.config 文件中。在这里您可以更改许多参数。这个数字真的很大,如果您想微调传输,我建议您参考官方文档。但是,如果您想立即开始,只需要设置服务的地址和用于回调的客户端基地址。
<client>
<endpoint address="https://:8000/MySubscriptionService"
binding="wsDualHttpBinding" bindingConfiguration="sub1Binding"
contract="IMySubscriptionService" name="MainEndpoint" />
<endpoint address="https://:8000/MyPublishService"
binding="wsDualHttpBinding"
bindingConfiguration="pub1Binding" contract="IMyEvents_Pub" />
</client>
<bindings>
<wsDualHttpBinding>
<binding name="sub1Binding" openTimeout="00:10:00"
receiveTimeout="00:10:00" sendTimeout="00:10:00"
bypassProxyOnLocal="false"
clientBaseAddress="https://:8000/myClient1sub/"
useDefaultWebProxy="true" />
<binding name="pub1Binding" openTimeout="00:10:00"
receiveTimeout="00:10:00" sendTimeout="00:10:00"
bypassProxyOnLocal="false"
clientBaseAddress="https://:8000/myClient1pub/"
useDefaultWebProxy="true" />
</wsDualHttpBinding>
</bindings>
HOWTO:订阅事件并发布事件
我们快接近尾声了。整个基础设施已经准备就绪,现在我们必须完成最后四个步骤。看看我们还需要编写的代码。
- 第一步是创建一个用于事件订阅的代理。
MySubscriptionServiceClient subscriptionServiceClient = new MySubscriptionServiceClient( subInstanceContext, endpointConfigurationName);
- 第二步是订阅我们的事件。
subscriptionServiceClient.Subscribe("MyNewEvent");
- 第三步是创建一个用于事件发布的代理。
MyEvents_PubClient eventsClient = new MyEvents_PubClient(pubInstanceContext);
- 最后一步是发布事件。
eventsClient.MyNewEvent();
就这样,您现在拥有了支持多播的异步、可靠的消息传递。
关注点
在文章的结尾,我想请 CodeProject 社区不要犹豫写下任何评论和问题。我对 SOA 和事件驱动架构了解很多,但我希望能了解更多。我能够分享我的知识,我相信您也有可以分享的东西。请在本文底部的讨论区给我留言。
历史
- 2007 年 9 月 21 日 -- 原始版本发布