65.9K
CodeProject 正在变化。 阅读更多。
Home

WCF 实现发布/订阅模型

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.45/5 (16投票s)

2008 年 1 月 9 日

CPOL

5分钟阅读

viewsIcon

161370

downloadIcon

5047

发布者和订阅者模型的实现。

pubsubmodel.png

引言

发布/订阅模型是指,订阅者是所有客户端订阅的服务,而发布者是向所有已订阅了订阅者服务的客户端发送消息的服务。也就是说,假设我们有 10 个客户端,所有 10 个客户端都会订阅订阅者服务以接收通知(客户端程序是独立的程序),然后每当发生所需事件时,发布者就会通知所有已注册的客户端。

背景

在仔细阅读了 这里 的内容后,我理解了发布/订阅模型的实现方式。想了解更多关于此模型的信息,请看 这里。阅读这些内容应该能让你理解这个模型。

然后,我继续探索了 Windows Vista SDK,并在其中找到了 List Publisher/Subscribe 示例。通过探索它,我能够理解其中的逻辑。

这是其示意图

publish-subscriber-logic.png

简要说明

  • 我们有一个 *WCF 服务*(上面的图显示 WCF 服务托管在 IIS 中)
  • 设计服务和客户端之间的 *回调契约*
  • 设置 *委托和事件*,委托方法处理程序将引发该特定事件
  • 因此,当客户端订阅时,我们增加事件处理程序,该处理程序将指向客户端的回调函数
  • 现在,我们构建一个单独的 *数据源*,它实现了服务回调契约
  • 此数据源负责调用 *Publish* 函数
  • Publish 函数将简单地引发事件,从而现在将调用事件中的所有回调

使用代码

我主要构建了三个组件

  • PubSubService
  • PubSubClient
  • 出版社
PubSubService

这是我们的主服务,它允许客户端 *订阅*、*退订*,以及发布者向已订阅的客户端 *发布* 消息。我们的服务还有一个 CallBackContract,它将成为服务发送通知的客户端源。

下面是我们的 ServiceContract 接口

[ServiceContract(Namespace = “http://ListPublishSubscribe.Service”,
SessionMode = SessionMode.Required, CallbackContract = typeof
(IPubSubContract))]
public interface IPubSubService
{
        [OperationContract(IsOneWay = false, IsInitiating=true)]
        void Subscribe();
        [OperationContract(IsOneWay = false, IsInitiating=true)]
        void Unsubscribe();
        [OperationContract(IsOneWay = false)]
        void PublishNameChange(string Name);
}

我们已经很清楚了。我们的 ServiceContract SessionType 设置为 SessionMode.Required,因此您必须连接客户端,并且需要一个会话,它有三个 OperationContract,分别是

  • Subscribe (订阅)
  • UnSubscribe
  • PublishNameChange

IsOneWay 表示它不是单向契约,而是一个双向契约,这意味着此契约可以由客户端调用,也可以将某些值返回给客户端。IsInitiating 表示此契约可以启动服务器/服务的会话。

我们已将 IPubSubContract 接口指定为我们的 CallbackContract。这意味着它将是 *服务和客户端之间的数据交换媒介*,并且使用这些回调,我们可以让服务 *通知客户端* 关于特定事件。

下面是我们的 IPubSubContract 接口

public interface IPubSubContract
{
        [OperationContract(IsOneWay = true)]
        void NameChange(string Name);
}

我们只有一个名为 NameChangeOperationContract。我们很快就会详细介绍。

既然我们已经定义了接口,现在是时候实现我们的 ServiceContract 了,下面是实现上述 IPubSubService 接口的类。

[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
 public class PubSubService : IPubSubService
 {
    public delegate void NameChangeEventHandler(object sender, ServiceEventArgs e);
    public static event NameChangeEventHandler NameChangeEvent;

    IPubSubContract ServiceCallback = null;
    NameChangeEventHandler NameHandler = null;

    public void Subscribe()
    {
        ServiceCallback = OperationContext.Current.GetCallbackChannel<IPubSubContract>();
        NameHandler = new NameChangeEventHandler(PublishNameChangeHandler);
        NameChangeEvent += NameHandler;
    }

    public void Unsubscribe()
    {
        NameChangeEvent -= NameHandler;
    }

    public void PublishNameChange(string Name)
    {
        ServiceEventArgs se = new ServiceEventArgs();
        se.Name = Name;
        NameChangeEvent(this, se);
    }

    public void PublishNameChangeHandler(object sender,ServiceEventArgs se)
    {
        ServiceCallback.NameChange(se.Name);

    }
}

我们已经实现了前面定义的三个 OperationContract,并且还定义了一些委托和事件。它们是什么?

逻辑很简单。每当客户端订阅时,我们都会创建一个事件处理程序并将其添加到我们的事件 NameChangeEvent 中。那么,这里发生的是,如果“n”个客户端加入,我们就有“n”个事件处理程序引用这些客户端。

如何实现这一点?

很简单。创建一个委托,将其关联到一个事件,并在 DelegateHandler 中,当我们想要发布时,调用 ServiceCallbackNameChange 契约,由于这是一个 CallbackContract 并且在客户端实现,因此该函数将在客户端调用,这样我们就将消息发布到了客户端。

所以,如果您查看 PublishNameChangeHandler,您会看到我们调用了 ServiceCallback.NameChange,这使得它很清楚。

为了简化起见,我已将 PubSubService 托管在 IIS 中。

现在,让我们转向 PubSubClient。

PubSubClient

在这里,我们将添加 WCF 服务,实现 IPubSubContract 接口以从服务获取回调,订阅服务,并准备好在发布者发布消息时接收它们。

通过 *添加服务引用* 添加您的 PubSubService。如果您不熟悉,请参阅我关于向应用程序添加 WCF 服务的帖子。

我们的 PubSubClient 是一个普通的 Windows 窗体应用程序,在窗体上只有一个 Label,当发布者 *发布* 消息时,此 Label 将被更新。

我已将 PubSubService 用作我的服务的引用。添加 PubSubService 后,我们将通过 PubSubService.mapPubSubService.cs 来实现 IPubSubContract

[CallbackBehaviorAttribute(UseSynchronizationContext = false)]
public class ServiceCallback : IPubSubServiceCallback
{
      public void NameChange(string Name)
      {
             Client.MyEventCallbackEvent(Name);
      }
}

我们的实现很简单。我们在这里的客户端实现 NameChange 契约,现在您应该能够理解我之前所说的意思了。这样,当您从服务通过回调调用时,客户端中的函数就会被调用。

我在上面的 NameChange 函数中做了什么?

我进行了封送(Marshaling)。当您想要访问另一个环境或线程中的变量时,可以使用封送。由于我们的 Windows 窗体运行在单独的线程中,为了安全线程,我在这里使用了封送。

实现封送很简单。我们创建一个委托,将其关联到一个事件,并将事件设为 static。我们从服务回调引发事件,并更新窗体中的 Label 控件。

下面是我们的客户端类,您可以看到在此处完成的所有委托、事件以及我们的 ServiceCallback 实现。

public partial class Client : Form
{
     public delegate void MyEventCallbackHandler(string Name);
     public static event MyEventCallbackHandler MyEventCallbackEvent;

     delegate void SafeThreadCheck(string Name);

     [CallbackBehaviorAttribute(UseSynchronizationContext = false)]
     public class ServiceCallback : IPubSubServiceCallback
     {
         public void NameChange(string Name)
         {
             Client.MyEventCallbackEvent(Name);
         }
     }

     public Client()
     {
          InitializeComponent();            

          InstanceContext context = new InstanceContext(new ServiceCallback());
          PubSubServiceClient client = new PubSubServiceClient(context);

          MyEventCallbackHandler callbackHandler = new MyEventCallbackHandler(UpdateForm);
          MyEventCallbackEvent += callbackHandler;

          client.Subscribe();
     }

     public void UpdateForm(string Name)
     {
          if (lblDisplay.InvokeRequired)
          {
              SafeThreadCheck sc = new SafeThreadCheck(UpdateForm);
              this.BeginInvoke(sc, new object[] { Name });
          }
          else
          {
              lblDisplay.Text += Name;
          }
     }

 }

唯一需要注意的就是封送。在这里,我使用 BeginInvoke 方法,这样它就不会等待该委托处理程序完成,UI 线程会继续执行其工作。如果您使用 Invoke 方法,UI 线程会等待直到其操作完成。实际上,是否使用 BeginInvokeInvoke 取决于具体情况。您也可以在这里使用 Invoke 代替 BeginInvoke

出版社

发布者是最简单的。我们添加 PubSubService,创建服务对象,然后调用 PublishNameChange 契约,这样就会调用已订阅客户端的 ServiceCallbackNameChange 契约,从而与客户端交换数据。

下面是发布者代码

public partial class Publisher : Form
{
     InstanceContext context = null;
     PubSubServiceClient client = null;

     public class ServiceCallback : IPubSubServiceCallback
     {
         public void NameChange(string Name)
         {
              MessageBox.Show(Name);
         }
     }

     public Publisher()
     {
         InitializeComponent();
     }

     private void btnPublish_Click(object sender, EventArgs e)
     {
         context = new InstanceContext(new ServiceCallback());
         client = new PubSubServiceClient(context);
         client.PublishNameChange(txtMessage.Text);
         client.Close();
     }
}

现在,我们已准备好订阅和发布 ;)

希望您阅读我关于如何使用 WCF 实现发布/订阅的文章愉快。如果您有任何疑问,请随时给我发送电子邮件或在下方开始讨论。

历史

  • 2008 年 1 月 10 日 - 添加了文章。
© . All rights reserved.