65.9K
CodeProject 正在变化。 阅读更多。
Home

C# 中的基于主题的发布/订阅设计模式实现 - 第二部分(使用 WCF)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (33投票s)

2009 年 3 月 23 日

CPOL

12分钟阅读

viewsIcon

197194

downloadIcon

4600

使用 WCF 回调实现基于主题的发布-订阅设计模式

引言

这是关于用 C# 实现基于主题的发布/订阅设计模式系列文章的第二部分。本系列将以两种方法展示发布/订阅设计模式的实现,使用相同的示例程序,并介绍发布/订阅设计模式的通用思想。这两种方法是使用 Socket 编程和使用 WCF。

众所周知,无论我们使用何种技术(Socket/Remoting/WCF)来实现发布/订阅设计模式,最终结果都会基本相同。因此,在本系列文章的每一部分,首先都会讨论实现发布/订阅设计模式的通用思想,然后展示使用特定技术的实现。在这里,发布/订阅设计模式使用两种不同的方法对相同的示例应用程序进行实现,以便您可以将每种方法的实现与其他方法进行比较,并选择最适合您需求的方法。每种方法的优缺点也将在每个部分中讨论。

您可以独立阅读每个部分吗?

在这里,您可以独立阅读每个部分。为了使每个部分都能独立阅读,必要的信息会在每个部分中重复。

背景

大约一年前,我写了一篇讨论 C# 中非基于主题的发布/订阅设计模式实现的博文。之后,我收到了一些请求,希望我能做一个基于主题的版本。我答应了,等我空闲的时候一定会做。几天前,我获得了一些空闲时间,开始将非基于主题的实现更改为基于主题的。但是,一年后再次看到我的代码和文章,我觉得代码冗长且文章信息量不足。然后,我开始写这个两部分的系列文章,以使我的工作做得更好。我写这个两部分的文章系列是为了作为一项学习练习,并期待您的评论。如果您有任何建议,请告诉我。

什么是基于主题的发布订阅设计模式?

在基于主题的发布-订阅模式中,发送方应用程序将每个消息标记为主题名称,而不是引用特定的接收方。然后,消息系统将消息发送给所有已注册接收该主题上消息的应用程序。消息发送方只需要关心创建原始消息,并将服务接收方的任务留给消息基础设施。

在此模式中,发布者和订阅者只能通过消息进行通信。发布-订阅模式解决了紧耦合问题。这是一种非常松耦合的架构,其中发送方甚至不知道他们的订阅者是谁。

PubSubUsingWCF/pubsub.png

图:基于主题的发布订阅范例

上述图中有三种软件应用程序:消息发布者、消息订阅者和发布/订阅服务器。在这里,消息发布者在不知道消息订阅者的情况下将消息发送给消息服务器,而消息订阅者只会收到他们已注册的主题类型的消息。例如,假设我们有 3 个不同的主题(消息类型):a、b、c。但订阅者 1 只关心主题 a,订阅者 2 关心主题 b 和 c,订阅者 3 希望收到主题 a 和 c 的通知。因此,服务器将通知订阅者 1 主题 a,通知订阅者 2 主题 b 和 c,通知订阅者 3 主题 a 和 c。在这里,消息发布者不知道谁在接收消息,消息订阅者也不知道消息发布者是谁。

实施基本思想

要求 1:订阅者应用程序订阅一个或多个主题,并且只接收他们感兴趣的消息。

为了实现这一点,我们必须记录哪个订阅者对哪个主题感兴趣,然后根据这些记录,我们将决定将某个主题的事件中继到哪里。因此,我们必须按主题维护订阅者列表。对于每个主题,都会有一个列表,其中包含对该主题感兴趣的订阅者的地址。当发布者应用程序将某个主题的事件发送到发布/订阅服务器时,发布/订阅服务器需要将该事件中继给对该主题感兴趣的订阅者。为了实现这一点,发布/订阅服务器将获取该主题的订阅者列表,并将事件发送到此列表中的地址。为了按主题维护订阅者地址列表,字典是一个不错的选择。字典的键代表主题名称,值代表订阅者地址列表。

要求 2:发布者和订阅者之间松耦合。发布者不知道订阅者,包括有多少订阅者或他们住在哪里,订阅者也不知道发布者,包括有多少发布者或他们住在哪里。

为了实现这一点,不允许发布者和订阅者之间直接通信。因此,我们必须在他们之间放置一个独立的实体,该实体将维护按主题划分的订阅者地址列表,接收来自订阅者的事件,并将事件中继给订阅者,并公开供订阅者订阅主题的方法。订阅者和发布者只知道这个独立的实体。这个独立的实体称为发布/订阅服务或服务器。这个独立实体的功能被划分为发布服务和订阅服务。发布服务将接收来自发布者的事件并将其转发给订阅者。订阅服务将公开订阅者进行订阅/取消订阅的操作。我们如何筛选订阅者以中继特定主题的事件?为了实现按主题筛选订阅者以中继事件,实现了一个名为 Filter 的实体。Filter 实体同时被发布服务和订阅服务使用。

PubSubUsingWCF/implementation2.png

图:实施图

Filter 单元的功能

  • 按主题维护订阅者列表
  • 返回特定主题的订阅者列表
  • 公开添加新订阅者的方法
  • 公开移除订阅者的方法

发布服务的功能

  • 接收来自应用程序(发布者)的事件,以通知订阅者
  • 将特定主题的事件发送给已订阅该主题的应用程序(订阅者)

订阅服务的功能

  • 公开应用程序(Subscriber)可以通过通信通道远程调用的方法,以便按主题订阅以接收事件
  • 公开应用程序(Subscriber)可以通过通信通道远程调用的方法,以便按主题取消订阅

订阅者的功能

  • 订阅者应用程序注册自己以接收一个或多个主题的事件
  • 当发布者发送事件时接收事件

发布者的功能

  • 将事件发送到发布服务进行发布

WCF 中的发布/订阅实现

在这里,WCF 回调用于通知订阅者事件。每个订阅者都实现一个回调接口。当订阅者向订阅服务发送订阅请求时,其回调引用将按主题进行存储,以便将来发生该主题的事件时,服务器可以使用该回调引用通知相应的订阅者。当发布者通过调用发布服务的方法发送某个主题的事件时,发布服务会获取该主题的回调引用列表,并通过调用回调引用上的 publish 方法来通知对此主题感兴趣的订阅者。

实施步骤

步骤 1:创建 Filter 类

Filter 类具有以下职责,并被发布服务和订阅服务使用。

  • 按主题维护订阅者列表,并且
  • 公开添加新订阅者的方法
  • 公开移除订阅者的方法
  • 返回特定主题的订阅者列表
 class Filter  
    {
        static Dictionary<string, List<IPublishing>> _subscribersList = 
                                        new Dictionary<string, List<IPublishing>>();
        static public Dictionary<string, List<IPublishing>> SubscribersList
        {
            get {
                lock (typeof(Filter))
                {
                    return _subscribersList;
                }
            }
        }

        static public List<IPublishing> GetSubscribers(String topicName)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    return SubscribersList[topicName];
                }
                else
                    return null;
            }
        }

        static public void AddSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    if (!SubscribersList[topicName].Contains(subscriberCallbackReference))
                    {
                        SubscribersList[topicName].Add(subscriberCallbackReference);
                    }
                }
                else
                {
                    List<IPublishing> newSubscribersList = new List<IPublishing>();
                    newSubscribersList.Add(subscriberCallbackReference);
                    SubscribersList.Add(topicName, newSubscribersList);
                }
            }
        }

        static public void RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference)
        {
            lock (typeof(Filter))
            {
                if (SubscribersList.ContainsKey(topicName))
                {
                    if (SubscribersList[topicName].Contains(subscriberCallbackReference))
                    {
                        SubscribersList[topicName].Remove(subscriberCallbackReference);
                    }
                }
            }
        }
    } 

字典 _subscribersList 按主题存储订阅者。在这里,键是主题名称 (String 类型)。值 是订阅者回调引用列表(List<IPublishing>)。

GetSubscribers(String topicName) 方法根据主题名称返回回调引用列表。回调引用包含通知订阅者的必要机制。此方法首先检查字典中是否存在该主题名称。如果存在,则返回该主题相应的回调引用列表。如果不存在,则返回 null

名为 AddSubscriber 的方法将订阅者的回调引用按主题添加到列表中。它有两个参数:第一个是主题名称,第二个是回调引用。此方法首先检查该主题是否有列表。如果存在,则将此回调引用添加到该列表中。如果不存在,则为该主题创建一个新列表,并将回调引用添加到新列表中。

名为 RemoveSubscriber(String topicName, IPublishing subscriberCallbackReference) 的方法将订阅者的回调引用从相应的按主题列表的列表中删除。它有两个参数:第一个是主题名称,第二个是回调引用。它首先检查该主题是否存在列表。如果不存在,则不执行任何操作。如果存在,则检查该回调引用是否存在于列表中。如果存在,则将其删除,否则不执行任何操作。

步骤 2:订阅服务的实现

订阅服务实现 ISubscription 接口。订阅者应用程序调用此服务的这些方法来使其订阅和取消订阅。在订阅者订阅期间,订阅服务会保留订阅者的回调引用,以便以后发布者可以使用该回调引用通知 Subscriber

那么,我们如何获得订阅者的回调引用呢?

当订阅者调用订阅服务中的方法时,在调用中使用以下行 Subscriber 服务可以获取回调引用

IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>(); 

我们通过上述调用获取回调引用,因为每个订阅者都实现了回调接口。

 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class Subscription : ISubscription
{
    #region ISubscription Members

    public void Subscribe(string topicName)
    {
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.AddSubscriber(topicName, subscriber);
    }

    public void UnSubscribe(string topicName)
    {
        IPublishing subscriber = OperationContext.Current.GetCallbackChannel<IPublishing>();
        Filter.RemoveSubscriber(topicName, subscriber);
    }

    #endregion
} 

当从 Subscriber 应用程序调用 Subscribe(string topicName) 方法时,它首先通过调用 GetCallbackChannel 方法获取回调引用,然后将主题名称和回调引用传递给 filter 类进行注册。

当从 Subscriber 应用程序调用 UnSubscribe(string topicName) 方法时,它首先通过调用 GetCallbackChannel 方法获取回调引用,然后调用 filter 类的 RemoveSubscriber 方法,参数为:主题名称、要 Unsubscribed 的回调引用。

步骤 3:发布服务的实现

发布服务实现 IPublishing 接口。当发布者调用发布服务名为 Publish(Message e, string topicName) 的方法时,publish 方法会调用 Filter.GetSubscribers(topicName) 方法来获取该主题订阅者的回调引用列表。然后 publish 方法获取“Publish”的方法信息。然后,对于每个回调引用,使用以下代码行和适当的参数触发“Publish”方法。

 publishMethodInfo.Invoke(subscriber, new object[] { e, topicName }); 

此调用通知对该主题感兴趣的订阅者有关事件的信息。

 [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IPublishing
{
    #region IPublishing Members
    public void Publish(Message e, string topicName)
    {
        List<IPublishing> subscribers = Filter.GetSubscribers(topicName);

        Type type = typeof(IPublishing);
        MethodInfo publishMethodInfo = type.GetMethod("Publish");
        foreach (IPublishing subscriber in subscribers)
        {
            try
            {
                publishMethodInfo.Invoke(subscriber, new object[] { e, topicName });
            }
            catch
            {

            }
        }
    }

    #endregion
} 
步骤 4:如何在订阅者应用程序中实现回调

WCF 具有回调客户端的机制。要在 WCF 中实现回调,WCF 服务会公开一个回调契约,所有客户端应用程序都必须实现该契约,并要求通过双工通道进行通信。您还必须使用 Duplex channelFactory 来构建通信通道,并且还需要提供一个 Reference Context。netTcpBindingwsdualHttpBindingnamedPipeBinding 支持回调,因为它们是双向的。

  1. 为此,您首先必须公开一个回调契约。回调契约如下所示
    [ServiceContract]
    public interface IPublishing
    {
        [OperationContract(IsOneWay = true)]
        void Publish(Message e, string topicName);
    } 
  2. 然后,在服务契约中指定回调契约。一个服务契约最多可以有一个回调契约。
    [ServiceContract(CallbackContract = typeof(IPublishing))]
    public interface ISubscription
    {
        [OperationContract]
        void Subscribe(string topicName);
    
        [OperationContract]
        void UnSubscribe(string topicName);
    }
  3. 然后,在 Subscriber 应用程序中实现回调契约,因为双工服务的客户端必须实现回调契约类。在此项目中,通过以下代码实现
    public partial class Subscriber : Form, IPublishing
        {
           .........(Code)
    
            #region IMyEvents Members
    
            public void Publish(Message e, String topicName)
            {
                if (e != null)
                {
                    int itemNum = (lstEvents.Items.Count < 1) ? 0 : lstEvents.Items.Count;
                    lstEvents.Items.Add(itemNum.ToString());
                    lstEvents.Items[itemNum].SubItems.AddRange(new string[] 
                                       { e.TopicName.ToString(), e.EventData });
                    _eventCount += 1;
                    txtAstaEventCount.Text = _eventCount.ToString();
    
                }
            }
    
            #endregion
        }
  4. 创建 Subscriber(回调契约实现类)的实例,并使用它来创建 System.ServiceModel.InstanceContext 对象,然后将其作为构造函数参数传递给 DuplexChannelFactory 类。
     public void MakeProxy(string EndpoindAddress, object callbackinstance)
            {
                NetTcpBinding netTcpbinding = new NetTcpBinding(SecurityMode.None);
                EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
                InstanceContext context = new InstanceContext(callbackinstance);        
    
                DuplexChannelFactory<ISubscription> channelFactory = 
                                        new DuplexChannelFactory<ISubscription>(
                    new InstanceContext(this),netTcpbinding, endpointAddress);
                _proxy  = channelFactory.CreateChannel();
            }
  5. 然后,调用服务契约(订阅服务)的操作,并在客户端代码中处理回调操作。
步骤 5:如何实现发布者

它使用 ChannelFactory 构建与发布服务的通信通道。为此目的使用以下代码

   private void CreateProxy()
        {
            string endpointAddressInString = ConfigurationManager.AppSettings["EndpointAddress"];
            EndpointAddress endpointAddress = new EndpointAddress(endpointAddressInString);
            NetTcpBinding netTcpBinding = new NetTcpBinding();
            _proxy = ChannelFactory<IPublishing>.CreateChannel(netTcpBinding, endpointAddress);
        } 

然后,使用此代理引用,可以使用以下代码调用发布服务的 publish 方法来发送事件

_proxy.Publish(alertData, topicName); 

发布者和订阅者服务的托管

以下行用于托管发布服务。在此终结点中,地址为 net.tcp://:7001/Pub。绑定为 netTcpBinding,契约是 IPublishing。如果您想添加多种协议支持,则必须使用 AddServiceEndpoint 方法添加多个终结点。

 private void HostPublishService()
        {
            _publishServiceHost = new ServiceHost(typeof(Publishing));
            NetTcpBinding tcpBindingpublish = new NetTcpBinding();
            _publishServiceHost.AddServiceEndpoint(typeof(IPublishing), tcpBindingpublish,
                                    "net.tcp://:7001/Pub");
            _publishServiceHost.Open();
        }

以下三行用于托管订阅服务。在此终结点中,地址为 net.tcp://:7002/Sub。绑定为 netTcpBinding,契约是 IPublishing。如果您想添加多种协议支持,则必须使用 AddServiceEndpoint 方法添加多个终结点。

private void HostSubscriptionService()
        {
            _subscribeServiceHost = new ServiceHost(typeof(Subscription));
            NetTcpBinding tcpBinding = new NetTcpBinding(SecurityMode.None);
            _subscribeServiceHost.AddServiceEndpoint(typeof(ISubscription), tcpBinding,
                                "net.tcp://:7002/Sub");
            _subscribeServiceHost.Open();
        }

为什么 Publish 方法是单向的?

Publish 方法没有返回值,发布者不需要此方法调用的确认。

为什么在此处使用 Channel Factory 来构建通信通道?

如果我们使用发布者和订阅者的代理类,那么当我们更改数据契约、服务契约、回调契约时,我们就需要重新生成发布者和订阅者的代理。如果我们使用通道工厂来构建通道,那么当数据契约、服务契约和回调契约发生变化时,我们就无需更改通道工厂代码。在这里,要使用通道工厂在服务器和发布者/订阅者之间构建通道,所有契约都保存在一个单独的 DLL 中,该 DLL 由服务器、发布者和订阅者共享。

Socket 实现与 WCF 实现的比较

  • 基于 Socket 的实现发送逗号分隔的纯文本消息,不冗长;而 WCF 实现发送 SOAP 消息,比较冗长。
  • 基于 Socket 的实现不基于任何标准。为了实现互操作性,专有协议不是一个好的选择,而 SOAP 是一种标准。
  • 基于 Socket 的实现不需要 WCF 运行时,因此可用于嵌入式编程,而 WCF 实现则不能。
  • WCF 实现是面向对象的,而 Socket 实现则不是。
  • WCF 实现可以通过添加额外的终结点轻松支持多种协议。但在 Socket 实现中,如果您想添加多种协议支持,您必须做很多工作,例如为每种协议编写不同的实现。
  • 要在基于 Socket 的实现中提供安全功能,您必须做大量工作,而且它不会基于标准。但在 WCF 中,您可以基于标准轻松提供消息安全。
  • 基于 Socket 的实现比 WCF 实现速度快,消耗的流量更少。

示例代码

此处附加了一个项目,该项目展示了 C# 中基于主题的发布/订阅设计模式的实现(使用 WCF 回调)。

结论

感谢阅读本文。我希望这篇文章能对一些人有所帮助。如果您有任何问题,我很乐意回答。我一直很欣赏您的评论。

历史

  • 2009/03/22:初始发布

参考文献

© . All rights reserved.