65.9K
CodeProject 正在变化。 阅读更多。
Home

事件驱动通信基础

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (11投票s)

2007年9月21日

11分钟阅读

viewsIcon

87978

downloadIcon

1535

本文介绍了如何为 WCF 应用程序添加事件驱动行为。

引言

众所周知,WCF 是一项非常强大的技术。不幸的是,如果没有代理、异步委托、双工通道和回调处理程序等复杂概念,几乎不可能实现这种强大。即使您精通这些概念,也需要编写大量代码来实现有效且可靠的消息传递。

此外,由于 GUI 和事件驱动架构已成为标准,您将需要设计具有多播和过滤功能的异步事件引擎。我相信这些术语不会吓倒您,您绝对能够胜任这项工作,但为什么不利用现成的解决方案来节省您的时间,让您能更多地专注于您的业务任务呢?让我们顺畅地了解其中一种解决方案。

我想介绍一个我将在所有项目中使用到的工具。简而言之,这是一个用于 WCF 的事件驱动框架。我们称之为 Event Driven Communication Foundation (EdCF),它基于发布/订阅模式。复杂的代码已被隐藏,现在您只需列出您的事件并编写事件处理程序,即可组织双向、异步、可靠的消息传递。

现在我将尝试将我们的框架划分为功能块,并深入探讨细节。

发布/订阅模式

首先,我想说几句关于发布/订阅模式。我们为什么选择这种模式作为框架的基础?因为存在多种不同的通信场景:一对一、一对多、多对多。一旦我们决定我们的框架应该能够实现所有这些场景,最合适的设计模式就是发布/订阅模式,并有一个中间的 pub/sub 服务。

它是如何工作的?任何远程对等方都扮演发布者或/和订阅者的角色。发布者通知中间的 Pub/Sub 服务它能够触发事件。如果订阅者希望收到特定事件的通知,它会请求 Pub/Sub 服务将其添加到感兴趣订阅者的列表中。每当发布者更改状态时,它就会触发一个事件。中间的 Pub/Sub 服务会存储订阅者列表,并将事件多播给该列表中的所有订阅者。有关更多信息,请参阅本文

异步消息传递

如果您决定独自完成所有工作,这将是您将面临的第一个棘手的问题。场景很简单:您的服务器需要处理来自两个不同客户端的相同请求。

第一个客户端的连接速度很慢。如果您的服务器不知道异步消息传递,第二个客户端将等待第一个请求处理完毕。如果我们有数十个并发客户端该怎么办?我们应该从线程池中为每个请求执行一个单独的线程。这需要架构级别的决策和对异步委托的了解。为了说明这些代码可能多么令人困惑,我向您展示以下内容:

我有一个订阅者列表,并想发布 MyEvent 事件。我会写类似这样的代码:

public static Publish (TSub[] subscribers, string eventName)
{
    foreach (TSub subscriber in subscribers)
    {
        Invoke(subscriber, eventName)
    }
}

如果我决定异步地做同样的事情,我将需要输入更多内容:

public static Publish (TSub[] subscribers, string eventName)
{
    WaitCallback callbackDelegate = delegate (TSub subscriber) 
    {
        Invoke(subscriber, eventName);
    }

    Action<TSub>queueUp = delegate(TSub subscriber)
    {
        ThreadPool.QueueUserWorkItem(callbackDelegate, subscriber)
    }

    Array.ForEach(subscriber, queueUp);
}

幸运的是,所有这些都被隐藏在框架深处。从最终用户的角度来看,它看起来就像一次方法调用:

PublishingService.MyEvent();

结果收集器

发布/订阅模式与中间 Pub/Sub 服务的一个优点是发布者和订阅者的解耦。订阅者不知道发布者,反之亦然。这是非常重要的行为,因为您需要构建一个稳定且可扩展的解决方案,包含数百个客户端。

但是,如果您需要实现一些紧密耦合的对等方之间的通信呢?在这种情况下,获取事件触发结果的能力会被禁用。例如,想象一个经典的计算器场景。一个客户端连接到服务器,请求它对两个值求和并返回结果。正如您所记得的,我们希望框架能够在广泛的场景中工作。因此,上述问题不应困扰我们的用户。我们应该提供一个解决方案,并能够将事件的结果传递给发布者。

例如,任何单个订阅者都应返回一个字符串。发布者希望从所有订阅者那里获得字符串列表作为事件触发的结果。从最终用户的角度来看,最有用的将是以下代码:

string [] myResult = PublishingService.MyEvent();

但是,这段代码将冻结发布者线程,直到所有订阅者都回复。我们不会有这种缺点。我们希望异步收集结果,并在它们准备好后立即获取它们。解决方案是使用回调。发布者能够触发 IMyEvents_Pub 接口中描述的任何事件。

[ServiceContract]
public interface IMyEvents_Pub
{
    [OperationContract(IsOneWay = true)]
    [EventDrivenOperationContract(IsWithReply = true)]
    string MyEvent();
}

如果您需要获取事件的结果,您需要使用 [EventDrivenOperationContract(IsWithReply = true)] 属性标记 IMyEvents_Pub 接口的相应方法。基于这些信息,框架将自动生成所有基础设施、接口、回调契约、代理和绑定。您唯一需要做的就是在发布者端定义一个实现 IMyEvents_PubCallback 接口的类。有关更多信息,请参阅本文的“代码生成”部分。

public class MyPublishCallbackHandler : IMyEvents_PubCallback
{
    public void MyEventReply(string[] returnedResults)
    {
        Console.WriteLine("Returned results: ");
        foreach (string s in returnedResults)
            Console.WriteLine("\t" + s);
    }
}

发布者触发事件。一旦结果准备好,Pub/Sub 服务将使用结果数组作为参数回调发布者。

订阅者过滤

订阅者过滤对于分布式应用程序来说是一项相当常见的任务。它在订阅者列表不是永久性的并且取决于我们需要的事件时设置。例如,以一个简单的 Chat 示例,包含两个 ChatRoom。如果一个订阅者进入第一个 ChatRoom,它不关心第二个 ChatRoom 中发布的任何消息。因此,我们应该能够阻止接收错误的消息。要利用框架的过滤选项,最终用户应该这样做:

  1. 定义一个 Filter 枚举作为 Pub/Sub 服务的成员:
    [Flags]
    public enum Filter
    {
        ChatRoom1 = 0x1,
        ChatRoom2 = 0x2
    }
  2. [EventDrivenOperationContract(IsWithFilter = true)] 属性标记相应的事件:
    [ServiceContract]
    public interface IMyEvents_Pub
    {
        [OperationContract(IsOneWay = true)]
        [EventDrivenOperationContract(IsWithFilter = true)]
        void MyEvent(Filter filter);
    }
  3. 使用相应的过滤器订阅订阅者:
    SubscribeWithFilter("MyEvent", (int)Filter.ChatRoom1);

要使用多个过滤器进行订阅,只需使用按位 OR 运算符:

SubscribeWithFilter("MyEvent", 
    (int)(Filter.ChatRoom1 | Filter.ChatRoom2));

代码生成

这种简单性似乎是不言而喻的,但框架在后台完成了繁重的工作。正如我之前提到的,该框架在我们日常工作中被积极使用。有时编写相同的代码片段一次又一次会非常乏味。因此,我们决定应用自动代码生成来消除一些繁琐的工作。尽管这种代码生成被广泛使用,但最终用户永远不会遇到它。因此,为了节省您我的时间,我将不揭示所有地方,也不会深入研究所有代码。我只会给您举一个例子,您就会明白其中的窍门。

让我们回到“结果收集器”部分。要使用此功能,最终用户只需编写五行简短的代码:

//Define an event and mark it with special atribute 

[EventDrivenOperationContract(IsWithReply = true)]
string MyEvent();

//Write a callback handler 

public class MyPublishCallbackHandler : IMyEvents_PubCallback
{
public void MyEventReply(string[] returnedResults)
{            
}
}

很简单,不是吗?这是因为框架代替我们完成了所有工作。

  1. 它生成 IPublishCallback 接口。发布者应该实现这个接口来收集结果。这个接口包含带有 Reply 后缀的方法,对应于用 [EventDrivenOperationContract(IsWithReply = true)] 属性标记的每个事件。
  2. 它提供了 PublishCallbackBiulder 构建器,它生成 IPublishCallback 类型的透明代理实例。代理是从调用的 OperationContext 中检索的,并在框架中积极使用。
  3. 它包装了 IMyEvent_Pub 接口,因为它需要在发布者端实现 IPublishCallback
  4. 它生成 ReturnedResultsConverter 类。正如您所理解的,任何单个订阅者返回一个值,但发布者应该有一个来自所有订阅者的所有结果的列表。ReturnResultConverter 类在这里非常有帮助。它使得能够将订阅者返回的结果(类型为 Object[] (System.Reflection.MethodInfo.Invoke() 返回 Object))转换为正确返回类型的数组集合(例如,如果订阅者返回 String,则发布者期望 String[])。
  5. ReturnedResultsConverter 类的内容对应于用 ReplyResultsConverter 后缀标记的每个事件方法。

现在看看隐藏的代码。让我们想象一下,我们有一个单独的 MyEvent 事件,正如本节开头所定义的。

  1. 生成的 IPublishCallback 如下所示:
    public interface IPublishCallback 
    {
        [OperationContract(IsOneWay=true)]
        void MyEventReply(string[] returnedResults);        
    }
  2. 生成的 PublishCallBackBuilder 如下所示:
    public class PublishCallbackBiulder 
    { 
        public static IPublishCallback publishCallback;
           
        public static void BuildCallback() 
        {
            publishCallback = OperationContext.Current.GetCallbackChannel();
        }
    }
  3. 生成的代理如下所示:
    [System.ServiceModel.ServiceContract(CallbackContract=
        typeof(EDSOAFW.IPublishCallback), Name="IMyEvents_Pub")]
        public interface IMyEvents_Pub_Wrap : EDSOA.IMyEvents_Pub {}
  4. 生成的 ReturnedResultsConverter 如下所示:
    public class ReturnedResultsConverter 
    {
        public static string[] MyEventReplyResultsConverter(
            object[] returnedResults) 
        {
            string[] returnArray = new string[returnedResults.Length];
            for (int i = 0; (i < returnedResults.Length); i = (i + 1))
            {
                returnArray[i] = (String)returnedResults[i];
            }
            return returnArray;
        }
    }

当然,我们可以手动完成,但想象一下您有几十个不同的事件。您将不得不为每个事件不断重写此代码。框架将不会让您在乏味的编码中因无聊而死。

最后的乐章:我想通过一个简单的聊天示例来展示所有这些东西。

聊天示例简介

首先,让我们定义基本术语。这将确保我们说着同一种语言。以下术语将在将来广泛使用,因此请快速浏览列表,以检查您的理解是否与我相同。

发布/订阅服务是一个中间远程对等方,负责管理发布者和订阅者列表。它将消息从发布者传递给订阅者。该服务还负责消息路由、批处理和排队。如果我们用客户端-服务器架构来说,这就是我们的服务器。

发布者是初始化消息发送的远程方。简单来说,这是一个能够向服务器发送消息的客户端。

订阅者是等待消息的远程方。这是一个能够接收消息的客户端。

发布/订阅客户端是一个结合了发布者和订阅者功能的应用程序。

事件是指满足以下声明的过程:

  1. 发布者向多个订阅者发送相同的消息。
  2. 所有订阅者都同时被调用。这意味着,如果与特定订阅者的连接很慢(或中断),发送给其余客户端的邮件不会等待有问题订阅者回复。

我们的示例实现了一个简单的聊天。因此,从业务角度来看,我们有稍微不同的术语。我们有 ChatRoom,它只不过是发布/订阅服务。我们还有 ChatClient,这是一个可以同时充当发布者和订阅者的客户端应用程序。

在继续之前,我想提一件事。在我的写作中,我假设您熟悉基本的 WCF 概念。如果您不熟悉,当然可以继续使用 EdCF,但我建议您查找一些关于 WCF 基础知识的信息。您可以从 MSDN 开始

好的,我们已经就基本术语达成了一致。现在,我们需要采取一系列步骤来构建聊天示例。我将每个步骤的描述放在一个单独的 HOWTO 中。

HOWTO:创建和配置发布/订阅服务(ChatRoom)

目前,该示例只有一个事件:OnMessageSent。如果您想添加其他事件,只需将相应的添加到 IMyEvents_PubIMyEvents_Sub 接口中即可。假设您想添加一个名为 MyNewEvent 的新事件。您需要做的唯一更改就是修改用于发布的契约接口……

[erviceContract]
public interface IMyEvents_Pub
{
    [OperationContract(IsOneWay = true)]
    [EventDrivenOperationContract(IsWithReply = true)]
    void OnMessageSent(string message);

    [OperationContract(IsOneWay = true)]
    void MyNewEvent();
}

……以及相应的订阅契约:

[erviceContract]
public interface IMyEvents_Sub
{
    [OperationContract(IsOneWay = false)]
    string OnMessageSent(string message);

    [OperationContract(IsOneWay = true)]
    void MyNewEvent();
}

您需要做的最后一件事是在 App.config 文件中将 BaseAddress 值更改为适当的 IP 地址。

<service name="ChatRoom.MySubscriptionService" 
    behaviorConfiguration="ForDebuggingBehavior">
<host>
    <baseAddresses>
        <add baseAddress="https://:8000/MySubscriptionService"/>
    </baseAddresses>
</host><service name="ChatRoom.MyPublishService_Wrapper" 
    behaviorConfiguration="ForDebuggingBehavior">
<host>
    <baseAddresses>
        <add baseAddress="https://:8000/MyPublishService"/>
    </baseAddresses>
</host>

HOHOWTO:创建用于发布/订阅的代理

好了,我们已经配置并托管了 WCF 服务。现在我们想要一个允许我们的客户端连接到服务的代理。正如 MSDN 所说,您有三个选项:

  1. 从服务中检索 WSDL 并手工创建代理。
  2. 使用 Visual Studio 2005 的“添加服务引用”功能。
  3. 使用 SvcUtil.exe 工具生成代理类。

但是,我建议选择另一个选项。ChatSample 解决方案已经创建了用于发布和订阅的代理。如果您想在您的解决方案中添加一个新事件,只需使用这些代理并进行少量修改。例如,假设您想添加一个新的 MyNewEvent

ChatClient >> clientForSubscription.cs

public interface IMySubscriptionServiceCallback
{
    [System.ServiceModel.OperationContractAttribute(
        Action="http://tempuri.org/IMySubscriptionService/OnMessageSent", 
        ReplyAction=
        "http://tempuri.org/IMySubscriptionService/OnMessageSentResponse")]
        string OnMessageSent(string message);

    [System.ServiceModel.OperationContractAttribute(
        Action="http://tempuri.org/IMySubscriptionService/MyNewEvent",
        ReplyAction=
        "http://tempuri.org/IMySubscriptionService/MyNewEventResponse")]
    void MyNewEvent();
}

ChatClient >> clientForPublishing.cs

public interface IMyEvents_Pub
{    
    [System.ServiceModel.OperationContractAttribute(IsOneWay=true,
        Action="http://tempuri.org/IMyEvents_Pub/OnMessageSent")]
    void OnMessageSent(string message);

    [System.ServiceModel.OperationContractAttribute(IsOneWay=true,
        Action="http://tempuri.org/IMyEvents_Pub/MyNewEvent")]
    void MyNewEvent();
}

HOWTO:配置发布/订阅客户端(ChatClient)

既然您已经准备了代理,您的客户端就可以连接到服务了。但是,您需要设置传输设置和服务的地址。所有可配置值都位于 App.config 文件中。在这里您可以更改许多参数。这个数字真的很大,如果您想微调传输,我建议您参考官方文档。但是,如果您想立即开始,只需要设置服务的地址和用于回调的客户端基地址。

<client>
    <endpoint address="https://:8000/MySubscriptionService"
        binding="wsDualHttpBinding" bindingConfiguration="sub1Binding"
        contract="IMySubscriptionService" name="MainEndpoint" />

    <endpoint address="https://:8000/MyPublishService" 
        binding="wsDualHttpBinding"
        bindingConfiguration="pub1Binding" contract="IMyEvents_Pub" /> 
</client>

<bindings>
    <wsDualHttpBinding>
        <binding name="sub1Binding" openTimeout="00:10:00" 
            receiveTimeout="00:10:00" sendTimeout="00:10:00" 
            bypassProxyOnLocal="false" 
            clientBaseAddress="https://:8000/myClient1sub/"
            useDefaultWebProxy="true" />

        <binding name="pub1Binding" openTimeout="00:10:00" 
            receiveTimeout="00:10:00" sendTimeout="00:10:00" 
            bypassProxyOnLocal="false" 
            clientBaseAddress="https://:8000/myClient1pub/"
            useDefaultWebProxy="true" />

    </wsDualHttpBinding>
</bindings>

HOWTO:订阅事件并发布事件

我们快接近尾声了。整个基础设施已经准备就绪,现在我们必须完成最后四个步骤。看看我们还需要编写的代码。

  1. 第一步是创建一个用于事件订阅的代理。
    MySubscriptionServiceClient subscriptionServiceClient = 
        new MySubscriptionServiceClient(
        subInstanceContext, endpointConfigurationName);
  2. 第二步是订阅我们的事件。
    subscriptionServiceClient.Subscribe("MyNewEvent");
  3. 第三步是创建一个用于事件发布的代理。
    MyEvents_PubClient eventsClient = 
        new MyEvents_PubClient(pubInstanceContext);
  4. 最后一步是发布事件。
    eventsClient.MyNewEvent();

就这样,您现在拥有了支持多播的异步、可靠的消息传递。

关注点

在文章的结尾,我想请 CodeProject 社区不要犹豫写下任何评论和问题。我对 SOA 和事件驱动架构了解很多,但我希望能了解更多。我能够分享我的知识,我相信您也有可以分享的东西。请在本文底部的讨论区给我留言。

历史

  • 2007 年 9 月 21 日 -- 原始版本发布
© . All rights reserved.