C# 中的基于主题的发布/订阅设计模式实现 - 第二部分(使用 WCF)






4.95/5 (33投票s)
使用 WCF 回调实现基于主题的发布-订阅设计模式
引言
这是关于用 C# 实现基于主题的发布/订阅设计模式系列文章的第二部分。本系列将以两种方法展示发布/订阅设计模式的实现,使用相同的示例程序,并介绍发布/订阅设计模式的通用思想。这两种方法是使用 Socket 编程和使用 WCF。
众所周知,无论我们使用何种技术(Socket/Remoting/WCF)来实现发布/订阅设计模式,最终结果都会基本相同。因此,在本系列文章的每一部分,首先都会讨论实现发布/订阅设计模式的通用思想,然后展示使用特定技术的实现。在这里,发布/订阅设计模式使用两种不同的方法对相同的示例应用程序进行实现,以便您可以将每种方法的实现与其他方法进行比较,并选择最适合您需求的方法。每种方法的优缺点也将在每个部分中讨论。
您可以独立阅读每个部分吗?
在这里,您可以独立阅读每个部分。为了使每个部分都能独立阅读,必要的信息会在每个部分中重复。
背景
大约一年前,我写了一篇讨论 C# 中非基于主题的发布/订阅设计模式实现的博文。之后,我收到了一些请求,希望我能做一个基于主题的版本。我答应了,等我空闲的时候一定会做。几天前,我获得了一些空闲时间,开始将非基于主题的实现更改为基于主题的。但是,一年后再次看到我的代码和文章,我觉得代码冗长且文章信息量不足。然后,我开始写这个两部分的系列文章,以使我的工作做得更好。我写这个两部分的文章系列是为了作为一项学习练习,并期待您的评论。如果您有任何建议,请告诉我。
什么是基于主题的发布订阅设计模式?
在基于主题的发布-订阅模式中,发送方应用程序将每个消息标记为主题名称,而不是引用特定的接收方。然后,消息系统将消息发送给所有已注册接收该主题上消息的应用程序。消息发送方只需要关心创建原始消息,并将服务接收方的任务留给消息基础设施。
在此模式中,发布者和订阅者只能通过消息进行通信。发布-订阅模式解决了紧耦合问题。这是一种非常松耦合的架构,其中发送方甚至不知道他们的订阅者是谁。
上述图中有三种软件应用程序:消息发布者、消息订阅者和发布/订阅服务器。在这里,消息发布者在不知道消息订阅者的情况下将消息发送给消息服务器,而消息订阅者只会收到他们已注册的主题类型的消息。例如,假设我们有 3 个不同的主题(消息类型):a、b、c。但订阅者 1 只关心主题 a,订阅者 2 关心主题 b 和 c,订阅者 3 希望收到主题 a 和 c 的通知。因此,服务器将通知订阅者 1 主题 a,通知订阅者 2 主题 b 和 c,通知订阅者 3 主题 a 和 c。在这里,消息发布者不知道谁在接收消息,消息订阅者也不知道消息发布者是谁。
实施基本思想
要求 1:订阅者应用程序订阅一个或多个主题,并且只接收他们感兴趣的消息。
为了实现这一点,我们必须记录哪个订阅者对哪个主题感兴趣,然后根据这些记录,我们将决定将某个主题的事件中继到哪里。因此,我们必须按主题维护订阅者列表。对于每个主题,都会有一个列表,其中包含对该主题感兴趣的订阅者的地址。当发布者应用程序将某个主题的事件发送到发布/订阅服务器时,发布/订阅服务器需要将该事件中继给对该主题感兴趣的订阅者。为了实现这一点,发布/订阅服务器将获取该主题的订阅者列表,并将事件发送到此列表中的地址。为了按主题维护订阅者地址列表,字典是一个不错的选择。字典的键代表主题名称,值代表订阅者地址列表。
要求 2:发布者和订阅者之间松耦合。发布者不知道订阅者,包括有多少订阅者或他们住在哪里,订阅者也不知道发布者,包括有多少发布者或他们住在哪里。
为了实现这一点,不允许发布者和订阅者之间直接通信。因此,我们必须在他们之间放置一个独立的实体,该实体将维护按主题划分的订阅者地址列表,接收来自订阅者的事件,并将事件中继给订阅者,并公开供订阅者订阅主题的方法。订阅者和发布者只知道这个独立的实体。这个独立的实体称为发布/订阅服务或服务器。这个独立实体的功能被划分为发布服务和订阅服务。发布服务将接收来自发布者的事件并将其转发给订阅者。订阅服务将公开订阅者进行订阅/取消订阅的操作。我们如何筛选订阅者以中继特定主题的事件?为了实现按主题筛选订阅者以中继事件,实现了一个名为 Filter 的实体。Filter 实体同时被发布服务和订阅服务使用。
Filter 单元的功能
- 按主题维护订阅者列表
- 返回特定主题的订阅者列表
- 公开添加新订阅者的方法
- 公开移除订阅者的方法
发布服务的功能
- 接收来自应用程序(发布者)的事件,以通知订阅者
- 将特定主题的事件发送给已订阅该主题的应用程序(订阅者)
订阅服务的功能
- 公开应用程序(
Subscriber
)可以通过通信通道远程调用的方法,以便按主题订阅以接收事件 - 公开应用程序(
Subscriber
)可以通过通信通道远程调用的方法,以便按主题取消订阅
订阅者的功能
- 订阅者应用程序注册自己以接收一个或多个主题的事件
- 当发布者发送事件时接收事件
发布者的功能
- 将事件发送到发布服务进行发布
WCF 中的发布/订阅实现
在这里,WCF 回调用于通知订阅者事件。每个订阅者都实现一个回调接口。当订阅者向订阅服务发送订阅请求时,其回调引用将按主题进行存储,以便将来发生该主题的事件时,服务器可以使用该回调引用通知相应的订阅者。当发布者通过调用发布服务的方法发送某个主题的事件时,发布服务会获取该主题的回调引用列表,并通过调用回调引用上的 publish 方法来通知对此主题感兴趣的订阅者。
实施步骤
步骤 1:创建 Filter 类
Filter
类具有以下职责,并被发布服务和订阅服务使用。
- 按主题维护订阅者列表,并且
- 公开添加新订阅者的方法
- 公开移除订阅者的方法
- 返回特定主题的订阅者列表
class Filter
{
static Dictionary<string, List<IPublishing>> _subscribersList =
new Dictionary<string, List<IPublishing>>();
static public Dictionary<string, List<IPublishing>> SubscribersList
{
get {
lock (typeof(Filter))
{
return _subscribersList;
}
}
}
static public List<IPublishing> GetSubscribers(String topicName)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
return SubscribersList[topicName];
}
else
return null;
}
}
static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (!SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Add(subscriberCallbackReference);
}
}
else
{
List<IPublishing> newSubscribersList = new List<IPublishing>();
newSubscribersList.Add(subscriberCallbackReference);
SubscribersList.Add(topicName, newSubscribersList);
}
}
}
static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
{
lock (typeof(Filter))
{
if (SubscribersList.ContainsKey(topicName))
{
if (SubscribersList[topicName].Contains(subscriberCallbackReference))
{
SubscribersList[topicName].Remove(subscriberCallbackReference);
}
}
}
}
}
字典 _subscribersList
按主题存储订阅者。在这里,键是主题名称 (String 类型)。值
是订阅者回调引用列表(List<IPublishing>
)。
GetSubscribers(String topicName)
方法根据主题名称返回回调引用列表。回调引用包含通知订阅者的必要机制。此方法首先检查字典中是否存在该主题名称。如果存在,则返回该主题相应的回调引用列表。如果不存在,则返回 null
。
名为 AddSubscriber
的方法将订阅者的回调引用按主题添加到列表中。它有两个参数:第一个是主题名称,第二个是回调引用。此方法首先检查该主题是否有列表。如果存在,则将此回调引用添加到该列表中。如果不存在,则为该主题创建一个新列表,并将回调引用添加到新列表中。
名为 RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
的方法将订阅者的回调引用从相应的按主题列表的列表中删除。它有两个参数:第一个是主题名称,第二个是回调引用。它首先检查该主题是否存在列表。如果不存在,则不执行任何操作。如果存在,则检查该回调引用是否存在于列表中。如果存在,则将其删除,否则不执行任何操作。
步骤 2:订阅服务的实现
订阅服务实现 ISubscription
接口。订阅者应用程序调用此服务的这些方法来使其订阅和取消订阅。在订阅者订阅期间,订阅服务会保留订阅者的回调引用,以便以后发布者可以使用该回调引用通知 Subscriber
。
那么,我们如何获得订阅者的回调引用呢?
当订阅者调用订阅服务中的方法时,在调用中使用以下行 Subscriber
服务可以获取回调引用
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
我们通过上述调用获取回调引用,因为每个订阅者都实现了回调接口。
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
#region ISubscription Members
public void Subscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.AddSubscriber(topicName, subscriber);
}
public void UnSubscribe(string topicName)
{
IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
Filter.RemoveSubscriber(topicName, subscriber);
}
#endregion
}
当从 Subscriber 应用程序调用 Subscribe(string topicName)
方法时,它首先通过调用 GetCallbackChannel
方法获取回调引用,然后将主题名称和回调引用传递给 filter 类进行注册。
当从 Subscriber
应用程序调用 UnSubscribe(string topicName)
方法时,它首先通过调用 GetCallbackChannel
方法获取回调引用,然后调用 filter 类的 RemoveSubscriber
方法,参数为:主题名称、要 Unsubscribed
的回调引用。
步骤 3:发布服务的实现
发布服务实现 IPublishing
接口。当发布者调用发布服务名为 Publish(Message e, string topicName)
的方法时,publish
方法会调用 Filter.GetSubscribers(topicName)
方法来获取该主题订阅者的回调引用列表。然后 publish
方法获取“Publish
”的方法信息。然后,对于每个回调引用,使用以下代码行和适当的参数触发“Publish
”方法。
publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
此调用通知对该主题感兴趣的订阅者有关事件的信息。
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IPublishing
{
#region IPublishing Members
public void Publish(Message e, string topicName)
{
List<IPublishing> subscribers = Filter.GetSubscribers(topicName);
Type type = typeof(IPublishing);
MethodInfo publishMethodInfo = type.GetMethod("Publish");
foreach (IPublishing subscriber in subscribers)
{
try
{
publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
}
catch
{
}
}
}
#endregion
}
步骤 4:如何在订阅者应用程序中实现回调
WCF 具有回调客户端的机制。要在 WCF 中实现回调,WCF 服务会公开一个回调契约,所有客户端应用程序都必须实现该契约,并要求通过双工通道进行通信。您还必须使用 Duplex channelFactory
来构建通信通道,并且还需要提供一个 Reference Context。netTcpBinding
、wsdualHttpBinding
和 namedPipeBinding
支持回调,因为它们是双向的。
- 为此,您首先必须公开一个回调契约。回调契约如下所示
[ServiceContract] public interface IPublishing { [OperationContract(IsOneWay = true)] void Publish(Message e, string topicName); }
- 然后,在服务契约中指定回调契约。一个服务契约最多可以有一个回调契约。
[ServiceContract(CallbackContract = typeof(IPublishing))] public interface ISubscription { [OperationContract] void Subscribe(string topicName); [OperationContract] void UnSubscribe(string topicName); }
- 然后,在
Subscriber
应用程序中实现回调契约,因为双工服务的客户端必须实现回调契约类。在此项目中,通过以下代码实现public partial class Subscriber : Form, IPublishing { .........(Code) #region IMyEvents Members public void Publish(Message e, String topicName) { if (e != null) { int itemNum = (lstEvents.Items.Count < 1) ? 0 : lstEvents.Items.Count; lstEvents.Items.Add(itemNum.ToString()); lstEvents.Items[itemNum].SubItems.AddRange(new string[] { e.TopicName.ToString(), e.EventData }); _eventCount += 1; txtAstaEventCount.Text = _eventCount.ToString(); } } #endregion }
- 创建
Subscriber
(回调契约实现类)的实例,并使用它来创建System.ServiceModel.InstanceContext
对象,然后将其作为构造函数参数传递给DuplexChannelFactory
类。public void MakeProxy(string EndpoindAddress, object callbackinstance) { NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None); EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress); InstanceContext context = new InstanceContext(callbackinstance); DuplexChannelFactory<ISubscription> channelFactory = new DuplexChannelFactory<ISubscription>( new InstanceContext(this),netTcpbinding, endpointAddress); _proxy = channelFactory.CreateChannel(); }
- 然后,调用服务契约(订阅服务)的操作,并在客户端代码中处理回调操作。
步骤 5:如何实现发布者
它使用 ChannelFactory
构建与发布服务的通信通道。为此目的使用以下代码
private void CreateProxy()
{
string endpointAddressInString = ConfigurationManager.AppSettings["EndpointAddress"];
EndpointAddress endpointAddress = new EndpointAddress(endpointAddressInString);
NetTcpBinding netTcpBinding = new NetTcpBinding();
_proxy = ChannelFactory<IPublishing>.CreateChannel(netTcpBinding, endpointAddress);
}
然后,使用此代理引用,可以使用以下代码调用发布服务的 publish
方法来发送事件
_proxy.Publish(alertData, topicName);
发布者和订阅者服务的托管
以下行用于托管发布服务。在此终结点中,地址为 net.tcp://:7001/Pub。绑定为 netTcpBinding
,契约是 IPublishing
。如果您想添加多种协议支持,则必须使用 AddServiceEndpoint
方法添加多个终结点。
private void HostPublishService()
{
_publishServiceHost = new ServiceHost(typeof(Publishing));
NetTcpBinding tcpBindingpublish = new NetTcpBinding();
_publishServiceHost.AddServiceEndpoint(typeof(IPublishing), tcpBindingpublish,
"net.tcp://:7001/Pub");
_publishServiceHost.Open();
}
以下三行用于托管订阅服务。在此终结点中,地址为 net.tcp://:7002/Sub。绑定为 netTcpBinding
,契约是 IPublishing
。如果您想添加多种协议支持,则必须使用 AddServiceEndpoint
方法添加多个终结点。
private void HostSubscriptionService()
{
_subscribeServiceHost = new ServiceHost(typeof(Subscription));
NetTcpBinding tcpBinding = new NetTcpBinding(SecurityMode.None);
_subscribeServiceHost.AddServiceEndpoint(typeof(ISubscription), tcpBinding,
"net.tcp://:7002/Sub");
_subscribeServiceHost.Open();
}
为什么 Publish 方法是单向的?
Publish
方法没有返回值,发布者不需要此方法调用的确认。
为什么在此处使用 Channel Factory 来构建通信通道?
如果我们使用发布者和订阅者的代理类,那么当我们更改数据契约、服务契约、回调契约时,我们就需要重新生成发布者和订阅者的代理。如果我们使用通道工厂来构建通道,那么当数据契约、服务契约和回调契约发生变化时,我们就无需更改通道工厂代码。在这里,要使用通道工厂在服务器和发布者/订阅者之间构建通道,所有契约都保存在一个单独的 DLL 中,该 DLL 由服务器、发布者和订阅者共享。
Socket 实现与 WCF 实现的比较
- 基于 Socket 的实现发送逗号分隔的纯文本消息,不冗长;而 WCF 实现发送 SOAP 消息,比较冗长。
- 基于 Socket 的实现不基于任何标准。为了实现互操作性,专有协议不是一个好的选择,而 SOAP 是一种标准。
- 基于 Socket 的实现不需要 WCF 运行时,因此可用于嵌入式编程,而 WCF 实现则不能。
- WCF 实现是面向对象的,而 Socket 实现则不是。
- WCF 实现可以通过添加额外的终结点轻松支持多种协议。但在 Socket 实现中,如果您想添加多种协议支持,您必须做很多工作,例如为每种协议编写不同的实现。
- 要在基于 Socket 的实现中提供安全功能,您必须做大量工作,而且它不会基于标准。但在 WCF 中,您可以基于标准轻松提供消息安全。
- 基于 Socket 的实现比 WCF 实现速度快,消耗的流量更少。
示例代码
此处附加了一个项目,该项目展示了 C# 中基于主题的发布/订阅设计模式的实现(使用 WCF 回调)。
结论
感谢阅读本文。我希望这篇文章能对一些人有所帮助。如果您有任何问题,我很乐意回答。我一直很欣赏您的评论。
历史
- 2009/03/22:初始发布
参考文献
- 发布-订阅词汇表
- 发布/订阅 - 维基百科,自由的百科全书
- 发布/订阅 (MSDN)
- [Baldoni03] Baldoni, R.; M. Contenti, and A. Virgillito. "The Evolution of Publish/Subscribe Communication Systems." Future Directions of Distributed Computing. Springer Verlag LNCS Vol. 2584, 2003.