65.9K
CodeProject 正在变化。 阅读更多。
Home

一个简化的、非主题驱动的事件通知服务器(WCF实现),支持多种协议。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.67/5 (30投票s)

2008年5月18日

CPOL

7分钟阅读

viewsIcon

133411

downloadIcon

1893

当系统中生成警报时,您将收到通知,而无需过滤您注册的事件类型。

目录

引言

第一部分 - 基础

什么是事件通知服务器 (ENS)?
多协议支持
它是如何工作的?
使用 ENS 和测试客户端的步骤。
局限性

第二部分 - 内部。

数据结构与算法
事件通知服务器的架构。

发布者 & 订阅者模型

第三部分 - 代码文档。

引言

一个简化的事件通知服务器可以让您在系统中生成警报时收到通知,而无需过滤您注册的事件类型。在各种场景下,我们需要这种类型的发布-订阅方案。在这里,我们使用支持多种协议的发布/订阅方案在 WCF 中实现了一个简化的事件通知服务器。在此实现中,如果客户端订阅了服务器,客户端将收到发布者发送的所有事件。

来源

我阅读了 Juval Lowy 在 MSDN 杂志上的文章,但该文章基于主题驱动的发布/订阅方案。然而,在许多情况下,需要一个简单的、非主题驱动的发布/订阅方案。因此,我修改了代码的许多部分,最终创建了一个简化的版本。我希望这个支持多协议的简化实现能对像我一样在项目中需要它的许多人有所帮助。本文中的代码基于 Juval Lowy 的文章,但有些部分未作修改。

第一部分 - 基础

什么是事件通知服务器?

事件通知服务器是一个“事件通知”系统。它连接了随时间变化提供信息的程序(发布者)和想要接收这些变化通知的程序(订阅者)。它是处理发布者和订阅者的中间件。

多协议支持

本质上有两大类应用程序通过网络或互联网进行通信——客户端-服务器和点对点。事件通知服务器采用客户端-服务器架构以获得更好的性能。

在这里,事件通知服务器暴露了基于不同协议的连接。客户端
将根据其需求选择协议进行连接。

图 1 - 多个客户端通过多种协议连接到事件通知服务器。

如果客户端与事件通知服务器 (ENS) 在同一台机器上,它将通过命名管道协议连接到 ENS,因为命名管道协议在同一机器边界上速度最快。


如果客户端与 ENS 在同一局域网 (LAN) 内,它将通过 TCP 协议连接到 ENS,因为 TCP 是面向连接的可靠协议。


如果客户端与 ENS 在同一虚拟专用网络 (VPN) 内,它将通过 TCP 协议连接到 ENS,因为 TCP 是面向连接的可靠协议。

如果客户端与 ENS 在互联网上,它将通过 HTTP 协议连接到 ENS,因为 HTTP 是防火墙友好的协议。默认情况下它是单向的,但使用 wsdualhttp 绑定可以实现双向通信。

它是如何工作的?

在这里,每个客户端都会向 ENS 注册,然后当有事件到达 ENS 时,ENS 会将其发送给已注册的客户端。
例如,我们有 10 个客户端,所有 10 个客户端都会向 ENS 注册以接收通知(客户端程序是独立的程序),然后每当发生发布者事件时,ENS 就会通知所有这些客户端。

使用 ENS 和测试客户端的步骤

如何启动整个系统

在 VS2008 中,只需按 F5 即可启动整个系统,然后它就可以使用了。

事件通知服务器

无需进行任何配置。


它将以启动它的机器 IP 开始。


用于事件接收(发布者服务)
它将按照以下方式打开端口:
8000 用于 HTTP。
8001 用于 TCP。
Mypipe1 用于命名管道。

用于事件广播(订阅服务)
它将按照以下方式打开端口:

8003 用于 HTTP。
8002 用于 TCP。
Mypipe2 用于命名管道。

如何发送事件

1. 要发送单个事件,请单击“Fire a single Event”按钮。


2. 要发送自动事件,请设置间隔然后单击按钮
“Fire Auto Event”。


如何接收事件


1. 单击测试客户端的“Subscribe”按钮。


2. 然后它将开始接收事件。

局限性

在 NAT 下无法工作。
无身份验证、无加密。
无容错支持。
无机制可防止 DOS(拒绝服务攻击)攻击。

第二部分 - 内部。

数据结构

一个简单的列表包含订阅者列表。

算法

通过引入专用的订阅服务和专用的发布服务来解耦发布者和订阅者。

希望订阅事件的订阅者将向订阅服务注册。

订阅服务将管理订阅者列表。

当发布者出现事件时

o 发布者将从订阅服务请求订阅者列表。

o 然后发布者将事件发送给列表中的每个订阅者。

事件通知服务器的架构

architecture.PNG


发布者 & 订阅者模型

发布/订阅(或 pub/sub)是一种异步消息传递范例,其中消息的发送者(发布者)不被编程为将消息发送到特定的接收者(订阅者)。相反,已发布的邮件的特征在于不知道存在哪些(如果有的话)订阅者。订阅者表达兴趣并接收邮件,而不知道存在哪些(如果有的话)发布者。这种发布者和订阅者的解耦可以实现更大的可伸缩性和更动态的网络拓扑。同样,发布者只能通知它知道的订阅者。发布者无法将事件传递给任何希望接收它的人,也无法广播事件。此外,所有发布者都必须重复编写必要的代码来管理订阅者列表和发布行为本身。订阅者无法询问是否会触发事件,应用程序应该创建该订阅者的实例并让它处理该事件。
设置订阅必须以编程方式完成。

第三部分 - 代码文档。

DataContract

[DataContract]
public class AlertData
 {
   private string _SeqNo;
  private string _Description;
 
   [DataMember]
   public string SeqNo { get { return _SeqNo; } set { _SeqNo = value; } }

  [DataMember]
   public string Description { get { return _Description; } set { _Description    = value; } }
   }
 

事件发布


发布服务在终结点公开事件契约,您需要将事件契约标记为服务契约。

   /// <summary>
   /// This is the service contract of Publish Service
 
   [ServiceContract]
   interface IEvent
   {
   [OperationContract(IsOneWay = true)]
   void OnEvent(AlertData e);
   }
   
 

要提供您自己的发布服务,请派生自 IEvent 并使用该方法将事件传递给所有订阅者。

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
class Publishing : IEvent
{
 
   /// This method is called from the publisher client to send the event.
   public void OnEvent(AlertData e)
   {

       IEvent[] subscribers = Subscription.GetClientList(); 
       Type type = typeof(IEvent);
       MethodInfo methodInfo = type.GetMethod("OnEvent");
       foreach ( IEvent subscriber in subscribers)
       {
           try
           {
               methodInfo.Invoke(subscriber, new object[] { e });
           }
           catch 
           {
       
              }
       }
   }
}

管理注册(订阅)

为了管理注册,我定义了 IRegistration 接口,如示例 1 所示。

/// <summary>
   /// This is the Service contract for SubscriptionService
   /// </summary>
   [ServiceContract(CallbackContract = typeof(IEvent))]
   public interface IRegistration
   {
   [OperationContract]
   void Register( );

  [OperationContract]
   void UnRegister();

   [OperationContract]
   string GetDateTime();
   }
 

回调接口是

   /// <summary>
   /// This is the service contract of Publish Service
   /// </summary>
   [ServiceContract]
   interface IEvent
   {
   [OperationContract(IsOneWay = true)]
   void OnEvent(AlertData e);
}

应用程序需要以终结点形式公开自己的订阅服务,该终结点支持其特定的 IRegistration 接口。为此,应用程序需要提供一个派生自 IRegistration 的服务类,并将回调契约指定为类型参数。

  
   /// This is the class for Subscription Service that is deployed to listen.
   
 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
   class Subscription : IRegistration
   {
 
   /// This the data structue that is used for pub/sub scheme. 
   /// Here list is used to hold the subscribers.
 
   static List<IEvent> m_ClientList;
 
  
   /// This is constructor of the class.It is used here to create the instance
   /// of the pub/sub data structure 
   static Subscription()
   {
   m_ClientList = new List<IEvent>();
   }
  
   /// This method return the complete subscriber list to publisher service.
   
   internal static IEvent[] GetClientList()
   {
   lock (typeof(Subscription))
   {
   return m_ClientList.ToArray();
   }
   }
 
   /// This method is called by subscriber to register itself with Server.
   /// It register the client with pub/sub service. 
   public void Register( )
   {
   lock (typeof(Subscription))
   {
   IEvent subscriber = OperationContext.Current.GetCallbackChannel<IEvent>();
   if (m_ClientList.Contains(subscriber))
   {
   return;
   }
   m_ClientList.Add(subscriber);
}
   }

  
   /// This method is called by subscriber to Unsubscribe itself from Server.
   /// It Unsubscribe the client with pub/sub service.  
   public void UnRegister()
   {
   lock (typeof(Subscription))
   {
   IEvent subscriber = OperationContext.Current.GetCallbackChannel<IEvent>();
   m_ClientList.Remove(subscriber);
    }
   }

   public string GetDateTime()
   {
   return System.DateTime.Now.ToString();
   }

}
 

将订阅者存储在一个通用的静态列表中。

/// This the data structue that is used for pub/sub scheme. 
/// Here list is used to hold the subscribers. 


 
     static List<IEvent> m_ClientList;

Register() 方法从操作调用上下文中提取回调引用。然后从存储中检索事件的订阅者列表。如果列表不包含订阅者,则将其添加。UnRegister() 的操作方式类似。

托管

对于托管,您必须为多种协议公开多次发布服务。

  
   eventServiceHost = new ServiceHost(typeof(Publishing)); 

   ///Here diferent binding is created for different protocol.
   /// For example NetTcpBinding is created for TCP protocol.
   /// For example WSDualHttpBinding is created for HTTP protocol.
   // For example NetNamedPipeBinding is created for Named Pipe protocol.

   System.ServiceModel.Channels.Binding wsDualBindingpublish = new WSDualHttpBinding();
   System.ServiceModel.Channels.Binding tcpBindingpublish = new NetTcpBinding();
   System.ServiceModel.Channels.Binding namedPipeBindingpublish = new NetNamedPipeBinding();
      
///By the following line i add the address of PublishService to eventServiceHost for
///differnt protocol
                eventServiceHost.AddServiceEndpoint(typeof(IEvent), wsDualBindingpublish,
                                        "https://:8000/PublishingService/");
                eventServiceHost.AddServiceEndpoint(typeof(IEvent), tcpBindingpublish,
                                        "net.tcp://:8001/PublishingService");
                eventServiceHost.AddServiceEndpoint(typeof(IEvent), namedPipeBindingpublish,
                                        "net.pipe:///MyPipe1");

//This line is used to open pub service to listen.
   eventServiceHost.Open();

对于托管,您必须为多种协议公开多次订阅服务。

 subscriptionManagerHost = new ServiceHost(typeof(Subscription));
   
   System.ServiceModel.Channels.Binding wsDualBinding = new WSDualHttpBinding(
       WSDualHttpSecurityMode.None);
   System.ServiceModel.Channels.Binding tcpBinding = new NetTcpBinding(SecurityMode.None);
   System.ServiceModel.Channels.Binding namedPipeBinding = new NetNamedPipeBinding();


  ///By the following line i add the address of Subscription Service 
  ///to subscriptionManagerHost for differnt protocol
  ///subscriptionManagerHost.AddServiceEndpoint(typeof(IRegistration), wsDualBinding,
               
                 subscriptionManagerHost.AddServiceEndpoint(typeof(IRegistration),
                                        wsDualBinding,
                                        "https://:8003/SubscriptionServie/");
                subscriptionManagerHost.AddServiceEndpoint(typeof(IRegistration),
                                    tcpBinding,
                                    "net.tcp://:8002/SubscriptionServie");
                subscriptionManagerHost.AddServiceEndpoint(typeof(IRegistration),
                                        namedPipeBinding,
                                        "net.pipe:///MyPipe2");
   
   
   
   //This line is used to open sub service to listen.
   subscriptionManagerHost.Open();
 

HTTP 客户端连接

我们知道 HTTP 是单向的。要使其双向,我们必须使用 WSDualHttpBinding,并在客户端和服务器上打开两个套接字连接。因此,这里的 ClientBaseAddress 用于在客户端上设置客户端的监听地址。

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <appSettings>
        <add key ="EndpointAddress" value ="https://:8003/SubscriptionServie/"/>
    </appSettings>
 
</configuration>


   WSDualHttpBinding namedpipebinding = new WSDualHttpBinding(WSDualHttpSecurityMode.None);
   EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
   InstanceContext context = new InstanceContext(callbackinstance);
   m_Proxy = new RegistrationProxy(context, namedpipebinding, endpointAddress);  
   string strHostName = Dns.GetHostName();
   IPHostEntry ipEntry = Dns.GetHostByName(strHostName);
   IPAddress[] addr = ipEntry.AddressList;
   namedpipebinding.ClientBaseAddress = new Uri("http://" + 
           addr[0].ToString()    + ":" + "4000" + "/");

TCP 客户端连接

此处使用 SecurityMode.None,以便它可以在跨域和 VPN 上进行通信。

<?xml version="1.0" encoding="utf-8" ?>
   <configuration>
   <appSettings>
   <add key ="EndpointAddress" value ="net.tcp://:8002/SubscriptionServie"/>
   </appSettings>
   </configuration>
           NetTcpBinding NetTcpbinding = new NetTcpBinding(SecurityMode.None);
            EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
            InstanceContext context = new InstanceContext(callbackinstance);
            m_Proxy = new RegistrationProxy(context, NetTcpbinding, endpointAddress);

IPC 客户端连接

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <appSettings>
           <add key ="EndpointAddress" value ="net.pipe:///MyPipe2"/>
    </appSettings>    
</configuration>
           NetNamedPipeBinding namedpipebinding = new NetNamedPipeBinding();
            EndpointAddress endpointAddress = new EndpointAddress(EndpoindAddress);
            InstanceContext context = new InstanceContext(callbackinstance);
            m_Proxy = new RegistrationProxy(context, namedpipebinding, endpointAddress);

HTTP 发布者连接

我们知道 HTTP 是单向的。要使其双向,我们必须使用 WSDualHttpBinding,并在客户端和服务器上打开两个套接字连接。因此,这里的 ClientBaseAddress 用于在客户端上设置客户端的监听地址。

WSDualHttpBinding wsDualBindingpublish = new WSDualHttpBinding();
EndpointAddress endpointAddress = 
  new EndpointAddress("https://:8000/PublishingService/");
string strHostName = Dns.GetHostName();

  IPHostEntry ipEntry = Dns.GetHostByName(strHostName);

  IPAddress[] addr = ipEntry.AddressList;

  wsDualBindingpublish.ClientBaseAddress = new Uri("http://" + addr[0].ToString() 
  + ":" + "3000" + "/");
proxy = new PublisherProxy(wsDualBindingpublish, endpointAddress);

结论

这里的代码实现了发布/订阅方案最简单的场景,没有主题驱动,这在许多场景下很有用。

在未来的帖子中,我将提供此功能的容错实现

然后我将展示如何使用此实现来实现 Windows 和证书身份验证。

参考

关于单向调用、回调和事件您需要了解的内容

编程 WCF 服务 - 由微软软件传奇 Juval Lowy 撰写

wiki

© . All rights reserved.