WCF 实现发布/订阅模型






4.45/5 (16投票s)
发布者和订阅者模型的实现。
引言
发布/订阅模型是指,订阅者是所有客户端订阅的服务,而发布者是向所有已订阅了订阅者服务的客户端发送消息的服务。也就是说,假设我们有 10 个客户端,所有 10 个客户端都会订阅订阅者服务以接收通知(客户端程序是独立的程序),然后每当发生所需事件时,发布者就会通知所有已注册的客户端。
背景
在仔细阅读了 这里 的内容后,我理解了发布/订阅模型的实现方式。想了解更多关于此模型的信息,请看 这里。阅读这些内容应该能让你理解这个模型。
然后,我继续探索了 Windows Vista SDK,并在其中找到了 List Publisher/Subscribe 示例。通过探索它,我能够理解其中的逻辑。
这是其示意图
简要说明
- 我们有一个 *WCF 服务*(上面的图显示 WCF 服务托管在 IIS 中)
- 设计服务和客户端之间的 *回调契约*
- 设置 *委托和事件*,委托方法处理程序将引发该特定事件
- 因此,当客户端订阅时,我们增加事件处理程序,该处理程序将指向客户端的回调函数
- 现在,我们构建一个单独的 *数据源*,它实现了服务回调契约
- 此数据源负责调用 *Publish* 函数
- Publish 函数将简单地引发事件,从而现在将调用事件中的所有回调
使用代码
我主要构建了三个组件
- PubSubService
- PubSubClient
- 出版社
PubSubService
这是我们的主服务,它允许客户端 *订阅*、*退订*,以及发布者向已订阅的客户端 *发布* 消息。我们的服务还有一个 CallBackContract
,它将成为服务发送通知的客户端源。
下面是我们的 ServiceContract
接口
[ServiceContract(Namespace = “http://ListPublishSubscribe.Service”,
SessionMode = SessionMode.Required, CallbackContract = typeof
(IPubSubContract))]
public interface IPubSubService
{
[OperationContract(IsOneWay = false, IsInitiating=true)]
void Subscribe();
[OperationContract(IsOneWay = false, IsInitiating=true)]
void Unsubscribe();
[OperationContract(IsOneWay = false)]
void PublishNameChange(string Name);
}
我们已经很清楚了。我们的 ServiceContract
SessionType
设置为 SessionMode.Required
,因此您必须连接客户端,并且需要一个会话,它有三个 OperationContract
,分别是
Subscribe (订阅)
UnSubscribe
PublishNameChange
IsOneWay
表示它不是单向契约,而是一个双向契约,这意味着此契约可以由客户端调用,也可以将某些值返回给客户端。IsInitiating
表示此契约可以启动服务器/服务的会话。
我们已将 IPubSubContract
接口指定为我们的 CallbackContract
。这意味着它将是 *服务和客户端之间的数据交换媒介*,并且使用这些回调,我们可以让服务 *通知客户端* 关于特定事件。
下面是我们的 IPubSubContract
接口
public interface IPubSubContract
{
[OperationContract(IsOneWay = true)]
void NameChange(string Name);
}
我们只有一个名为 NameChange
的 OperationContract
。我们很快就会详细介绍。
既然我们已经定义了接口,现在是时候实现我们的 ServiceContract
了,下面是实现上述 IPubSubService
接口的类。
[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
public class PubSubService : IPubSubService
{
public delegate void NameChangeEventHandler(object sender, ServiceEventArgs e);
public static event NameChangeEventHandler NameChangeEvent;
IPubSubContract ServiceCallback = null;
NameChangeEventHandler NameHandler = null;
public void Subscribe()
{
ServiceCallback = OperationContext.Current.GetCallbackChannel<IPubSubContract>();
NameHandler = new NameChangeEventHandler(PublishNameChangeHandler);
NameChangeEvent += NameHandler;
}
public void Unsubscribe()
{
NameChangeEvent -= NameHandler;
}
public void PublishNameChange(string Name)
{
ServiceEventArgs se = new ServiceEventArgs();
se.Name = Name;
NameChangeEvent(this, se);
}
public void PublishNameChangeHandler(object sender,ServiceEventArgs se)
{
ServiceCallback.NameChange(se.Name);
}
}
我们已经实现了前面定义的三个 OperationContract
,并且还定义了一些委托和事件。它们是什么?
逻辑很简单。每当客户端订阅时,我们都会创建一个事件处理程序并将其添加到我们的事件 NameChangeEvent
中。那么,这里发生的是,如果“n”个客户端加入,我们就有“n”个事件处理程序引用这些客户端。
如何实现这一点?
很简单。创建一个委托,将其关联到一个事件,并在 DelegateHandler
中,当我们想要发布时,调用 ServiceCallback
的 NameChange
契约,由于这是一个 CallbackContract
并且在客户端实现,因此该函数将在客户端调用,这样我们就将消息发布到了客户端。
所以,如果您查看 PublishNameChangeHandler
,您会看到我们调用了 ServiceCallback.NameChange
,这使得它很清楚。
为了简化起见,我已将 PubSubService 托管在 IIS 中。
现在,让我们转向 PubSubClient。
PubSubClient
在这里,我们将添加 WCF 服务,实现 IPubSubContract
接口以从服务获取回调,订阅服务,并准备好在发布者发布消息时接收它们。
通过 *添加服务引用* 添加您的 PubSubService。如果您不熟悉,请参阅我关于向应用程序添加 WCF 服务的帖子。
我们的 PubSubClient 是一个普通的 Windows 窗体应用程序,在窗体上只有一个 Label
,当发布者 *发布* 消息时,此 Label
将被更新。
我已将 PubSubService 用作我的服务的引用。添加 PubSubService 后,我们将通过 PubSubService.map 和 PubSubService.cs 来实现 IPubSubContract
。
[CallbackBehaviorAttribute(UseSynchronizationContext = false)]
public class ServiceCallback : IPubSubServiceCallback
{
public void NameChange(string Name)
{
Client.MyEventCallbackEvent(Name);
}
}
我们的实现很简单。我们在这里的客户端实现 NameChange
契约,现在您应该能够理解我之前所说的意思了。这样,当您从服务通过回调调用时,客户端中的函数就会被调用。
我在上面的 NameChange
函数中做了什么?
我进行了封送(Marshaling)。当您想要访问另一个环境或线程中的变量时,可以使用封送。由于我们的 Windows 窗体运行在单独的线程中,为了安全线程,我在这里使用了封送。
实现封送很简单。我们创建一个委托,将其关联到一个事件,并将事件设为 static
。我们从服务回调引发事件,并更新窗体中的 Label
控件。
下面是我们的客户端类,您可以看到在此处完成的所有委托、事件以及我们的 ServiceCallback
实现。
public partial class Client : Form
{
public delegate void MyEventCallbackHandler(string Name);
public static event MyEventCallbackHandler MyEventCallbackEvent;
delegate void SafeThreadCheck(string Name);
[CallbackBehaviorAttribute(UseSynchronizationContext = false)]
public class ServiceCallback : IPubSubServiceCallback
{
public void NameChange(string Name)
{
Client.MyEventCallbackEvent(Name);
}
}
public Client()
{
InitializeComponent();
InstanceContext context = new InstanceContext(new ServiceCallback());
PubSubServiceClient client = new PubSubServiceClient(context);
MyEventCallbackHandler callbackHandler = new MyEventCallbackHandler(UpdateForm);
MyEventCallbackEvent += callbackHandler;
client.Subscribe();
}
public void UpdateForm(string Name)
{
if (lblDisplay.InvokeRequired)
{
SafeThreadCheck sc = new SafeThreadCheck(UpdateForm);
this.BeginInvoke(sc, new object[] { Name });
}
else
{
lblDisplay.Text += Name;
}
}
}
唯一需要注意的就是封送。在这里,我使用 BeginInvoke
方法,这样它就不会等待该委托处理程序完成,UI 线程会继续执行其工作。如果您使用 Invoke
方法,UI 线程会等待直到其操作完成。实际上,是否使用 BeginInvoke
或 Invoke
取决于具体情况。您也可以在这里使用 Invoke
代替 BeginInvoke
。
出版社
发布者是最简单的。我们添加 PubSubService
,创建服务对象,然后调用 PublishNameChange
契约,这样就会调用已订阅客户端的 ServiceCallback
的 NameChange
契约,从而与客户端交换数据。
下面是发布者代码
public partial class Publisher : Form
{
InstanceContext context = null;
PubSubServiceClient client = null;
public class ServiceCallback : IPubSubServiceCallback
{
public void NameChange(string Name)
{
MessageBox.Show(Name);
}
}
public Publisher()
{
InitializeComponent();
}
private void btnPublish_Click(object sender, EventArgs e)
{
context = new InstanceContext(new ServiceCallback());
client = new PubSubServiceClient(context);
client.PublishNameChange(txtMessage.Text);
client.Close();
}
}
现在,我们已准备好订阅和发布 ;)
希望您阅读我关于如何使用 WCF 实现发布/订阅的文章愉快。如果您有任何疑问,请随时给我发送电子邮件或在下方开始讨论。
历史
- 2008 年 1 月 10 日 - 添加了文章。