65.9K
CodeProject 正在变化。 阅读更多。
Home

SOA:订阅者-发布者模型,介绍与实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (21投票s)

2005年1月4日

9分钟阅读

viewsIcon

112269

downloadIcon

906

本文简要介绍了 SOA 和订阅者-发布者模型,以及如何使用 Windows .NET 应用程序中的 WSE SOAP 接收器和 WSE SOAP 发送器类来实现该模型。

引言

开发者圈子里又出现了一个新潮词。你听说了吗?它就是 SOA。它是服务导向架构(Service Oriented Architecture)的缩写。这个新概念其实并不新鲜;它的定义由来已久;然而,所有大公司的市场部门都在重新唱响它的调子。微软正在用 .NET 构建模块、WSE、ASP.NET 和 XML 演唱其 SOA 流行歌曲。IBM 正在使用另一位艺术家,名为 On Demand,其他人也紧随其后。

传统分布式应用

这是一种新的思维方式,特别是对于组件架构师来说。让我们思考一下。从 Windows 开发者的角度来看,在面向对象编程中,我们围绕基本的三层方法设计了基本的 COM 应用程序和 COM 组件。

三层

  • 一层称为用户界面层,
  • 第二层称为业务逻辑层,
  • 第三层称为数据层。

我们的基本设计概念简单,如果一个类有与用户界面交互的代码,例如 Windows 应用程序或网页,那么我们就将代码写在用户界面端。我们可以在 Windows Form、DHTML 或其他用户界面组件中编写代码。另一方面,如果我们需要处理业务规则、流程、日常逻辑,也就是我们试图解决的实际问题,我们会将这些代码放置在业务层中的一个或多个独立组件中,并将该文件安装在业务逻辑服务器上,例如 MTS/COM+ 服务。最后,任何涉及访问数据源的逻辑,例如文件、数据库、服务器等,我们都会将其放置在服务器上或在服务器上运行的组件中,例如使用存储过程。

在 OOP 中,我们处理过许多有时令人沮丧的术语和概念。我们不得不学习抽象、继承、实现、封装、接口、组合、聚合等术语。OOP 变得如此复杂,以至于我们创建了 UML 图来可视化地表示所有这些术语和条件。我们投入了大量的时间和精力学习这个范式,然而,我们仍然遇到了一个主要的微妙之处,一个主要的缺陷:互操作性。

COM 不与 EJB 通信,ISAM 数据库不与 COM 通信,CRM 系统不与 EJB 也不与 COM 通信。这些技术不能自动实现平滑的通信模式。我们必须巧妙地“调整”每种技术,才能将这些设计迥异的系统连接起来。为实现此目的所付出的努力和艰辛导致了许多压力和技术的变形。甚至达到了一些技术拥有许多附加功能,而这些功能与其最初设计的架构完全背离的程度。只需看看 ADO 从其第一个版本到当前版本(2.8 或更高版本)的变化,以及从 COM 技术到 .NET 的最重要变化。这最终导致了不可避免的。

SOA 简介

软件设计的新范式:SOA。你提到了所有大公司正在唱的这些歌曲,它们都讲了些什么?SOA 是一种设计系统的新方式。我们现在将系统视为一套精心设计的组件,它完全基于组件功能(服务)的消息通信模式。其核心思想是通过消息通信将系统设计围绕“编排”的服务套件进行。这些服务通过相互传递 XML 形式的消息进行通信;这是重点。

一个 SOA 系统。

左边的图片展示了服务的标准描绘,一个三角形伸出三个分支。这些存根或点被称为“端点”或“触点”。它们是允许 XML 消息使用任何协议进入和传出网络的门户。这与分布式 COM 完全相反,在分布式 COM 中,我们被迫使用专有协议和特殊端口来通过网络传输数据包。

我们可以谈论服务以及如何设计它们以及 SOA 提供所有新优势,但是本介绍的重点是讨论这些服务如何相互通信:消息。消息的基础是 XML。XML 是格式,在这种格式中我们有一个非常具体的格式,称为简单对象访问协议(SOAP)。SOAP 消息包含执行某些工作单元所需的所有数据。它还可以包含安全细节,如用户名和密码、证书信息以及其他安全概念,如加密和哈希。

SOAP 消息为不同的平台(例如 UNIX)提供了与像 Microsoft 这样的其他平台进行通信的方式。SOA 和 SOAP 解决了我们的互操作性问题。传递的数据都是文本,所有平台都理解文本。鉴于此,业界已制定了一套模板或模型,用于这些消息的来回传递。这些模型被称为消息交换模式。

消息交换模式

消息交换模式(MEP)有很多种。第一种 SOA 实现称为 XML Web 服务,它使用请求/响应(远程过程调用)MEP。还有死信通道模式,其中消息被发送到服务,在消息处理过程中发生的任何错误都会发送到特殊的“节点”或通道。这些错误,通常称为 SOAP 故障,然后排队到堆栈中。客户端应用程序可以从该队列中检索消息。还有消息路由器模式,其中消息根据其内容或安全凭据路由到其他服务。还有消息拆分器模式,它拆分或组合消息并将其发送到其他目的地,最值得注意的是,还有发布-订阅模式。

发布-订阅模式是指消息进入服务以通知它希望监听“发布者广播给其监听者的消息”。客户端应用程序发送一个关于它是谁以及它可以在何处接收来自发布者的“响应”的“订阅”请求消息。无论该应用程序物理安装在客户端还是服务器上,它都可以运行,并等待发布者服务生成这些响应并将其发送回其订阅者。

客户端应用程序发出订阅请求。

发布者服务将执行其逻辑,并最终遍历其订阅者集合并发送消息。发布者服务甚至可能向所有订阅者发送“即发即弃”类型的消息。这是因为发布者服务可能不一定关心谁成功接收了消息,例如消息确认。

发布者服务向其订阅者发送消息副本。

实现细节

通过微软对社区标准 WSA 的实现,我们可以实现 SOA 的发布者-订阅者模型。微软使用名为 Web 服务增强 (WSE) 的附加工具来实现此标准。微软目前处于 WSE 工具包的 2.0 服务包 2 版本,这也是我们将用来实现此模型的版本。

WSE 为我们提供了许多可在基于 .NET 的应用程序中使用的类和技术。我们将重点关注的两个类是 SoapReceiver 类和 SoapSender 类。

SoapReceivers

SoapReceiver 是一个从 IHttpHandler 接口继承/实现的类(请参阅我关于使用 WSE 与 SimpleHandlerFactory 的文章)。此类别提供了接收 SOAP 消息所需的所有功能。要使用此类别,只需创建一个自定义类并从 SoapReceiver 类继承。此类别要求您重写 Receive 方法。此方法从 SoapSender 类接收 SOAP 消息。以下是 Receive 方法的签名

protected override void Receive ( SoapEnvelope envelope )

Receive 方法将 WSE SoapEnvelope 类作为其参数,该类是 SoapSender 传入的 SOAP 消息。SoapEnvelope 类继承自 System.Xml.XmlDocument 类,并包含许多属性和实例方法,允许开发人员读取和解析传入的 XML SOAP 消息。

SoapSender

SoapSender 是一个继承自抽象类 SoapPort 的类。此类别基本上对应于一个过滤器,允许您修改 SOAP 消息的输入和输出。此基类允许您控制 SOAP 消息的发送和接收。要使用 SoapSender 类,请创建此类的实例并通过其构造函数或显式设置其属性来设置其 Destination 属性(URI)。接下来,分别同步和异步调用 SendBeginSend 方法,并将 SoapEnvelope 类发送到 Destination

源代码解释

演示应用程序分为两个独立的项目。一个承载发布者服务的发布者 Windows 应用程序,以及一个承载客户端订阅响应服务的客户端订阅应用程序。

发布者应用程序分为两部分

  • 一个发布者 Windows .NET 应用程序,
  • 和一个发布者类。

发布者应用程序是一个基本的 Windows 窗体应用程序,通过 ListBox 实时显示订阅者订阅和取消订阅发布者。它还包含一个 TextBox,使发布者能够向所有列出的订阅者发布文章或数据。当发布者单击“发布文章”按钮时,将在服务器上创建一个文件,然后将其内容副本发送给所有订阅者。

publisher 类是一个继承自 SoapReceiver 类的自定义类。它覆盖了 Receive 方法,并检查 SoapEnvelope 上的 SOAPAction。它解析 SoapAction 以确定传入的消息是订阅请求还是取消订阅请求。此外,当发布者应用程序继续运行时,如果发布者决定发布一篇文章,则使用 SoapSender 将其发送给所有监听的订阅者。

发布者代码

using System;
using Microsoft.Web.Services2;
using Microsoft.Web.Services2.Messaging;
using Microsoft.Web.Services2.Addressing;
using System.Web.Services.Protocols;
using System.Xml;
using System.Collections;
using System.IO;
using System.Collections.Specialized;

namespace ArticlePublisherApp
{
    internal class Literals
    {
        static Literals()
        {
            Literals.LocalhostTCP = "soap.tcp://" + 
        System.Net.Dns.GetHostName() + ":";
        }
        internal readonly static string LocalhostTCP;
    }


    public delegate void NewSubscriberEventHandler(string subscriberName,
         string ID, Uri replyTo );
    public delegate void RemoveSubscriberEventHandler( string ID);

    /// <SUMMARY>
    /// Summary description for Publisher.
    /// </SUMMARY>
    public class Publisher : SoapReceiver
    {
    
      public event NewSubscriberEventHandler NewSubscriberEvent;
      public event RemoveSubscriberEventHandler RemoveSubscriberEvent;

    public Publisher()
    {
     _subscribers = new Hashtable();
     fsw = new FileSystemWatcher();
     System.Configuration.AppSettingsReader configurationAppSettings = 
      new System.Configuration.AppSettingsReader();
      string folderWatch =  
        ((string)(configurationAppSettings.GetValue("Publish." + 
                                  "PublishFolder", typeof(string))));
     try
     {
       fsw = new System.IO.FileSystemWatcher(folderWatch);
      }
      catch
      {
      throw new Exception("Directory '" + folderWatch 
          + "' referenced does not exist. " +
        "Change the fileName variable or create this directory in " + 
            "order to run this demo.");
      }      fsw.Filter = "*.txt";
      fsw.Created += new FileSystemEventHandler(fsw_Created);
      fsw.Changed += new FileSystemEventHandler(fsw_Created);
      fsw.EnableRaisingEvents = true;
      }

    protected void OnNewSubscriberEvent(string Name, string ID, Uri replyTo)
    {
     if (NewSubscriberEvent    != null)
       NewSubscriberEvent(Name, ID, replyTo);
    }
     protected void OnRemoveSubscriberEvent(string ID)
    {
         if (RemoveSubscriberEvent != null)
        RemoveSubscriberEvent(ID);
    }

    private void AddSubscriber(string ID, Uri replytoAddress, string Name)
    {
      SoapSender ssend = new SoapSender(replytoAddress);
      SoapEnvelope response = new SoapEnvelope();
      response.CreateBody();
      response.Body.InnerXml = String.Format("<?xml:namespace prefix=x />" +
          "<x:AddSubscriber xmlns:x=\"urn:ArticlePublisherApp:Publisher\">" 
          "<NOTIFY>Name: {0} ID: {1}</NOTIFY></x:AddSubscriber>", Name, ID);
       Action act = new Action("response");
      response.Context.Addressing.Action = act;
      ssend.Send(response);
      _subscribers.Add ( ID, new Subscriber(Name,replytoAddress, ID)  );
      OnNewSubscriberEvent(Name, ID, replytoAddress);
      }

    private void RemoveSubscriber(string ID, Uri replytoAddress)
    {
        if (_subscribers.Contains(ID) )
        {
         _subscribers.Remove(ID);
         SoapSender ssend = new SoapSender(replytoAddress);
         SoapEnvelope response = new SoapEnvelope();
         response.CreateBody();
         response.Body.InnerXml = 
           String.Format("<x:RemoveSubscriber xmlns:x=\"" +
                "urn:ArticlePublisherApp:Publisher\">" +
                 "<NOTIFY>ID: {0} Removed</NOTIFY>" +
                   "</x:RemoveSubscriber>", ID);
         Action act = new Action("response");
         response.Context.Addressing.Action = act;
         ssend.Send(response);
         OnRemoveSubscriberEvent(ID);
        }
    }

    protected override void Receive( SoapEnvelope envelope )
    {
        
    //Determine Action if no SoapAction throw exception
        Action act = envelope.Context.Addressing.Action;
      if (act == null)
        throw new SoapHeaderException("Soap Action must be set",
         new XmlQualifiedName());
            
      string subscriberName = String.Empty ;
      string subscriberID = String.Empty;
      switch (act.ToString().ToLower())
      {
       case "subscribe":
       //add new subscriber
         subscriberName = 
               envelope.SelectSingleNode ( "//name").InnerText ;
         subscriberID = System.Guid.NewGuid().ToString();
         AddSubscriber(subscriberID, 
         envelope.Context.Addressing.From.Address.Value,
          subscriberName);
        break;;        case "unsubscribe":
          subscriberID = 
                envelope.SelectSingleNode("//name") .InnerText ;
          RemoveSubscriber(subscriberID, 
          envelope.Context.Addressing.From.Address.Value);
        break;
        default:
        break;
       }
            
     }
        
    private void fsw_Created(object sender, System.IO.FileSystemEventArgs e)
    {
      Uri uriThis =  new Uri (Literals.LocalhostTCP + "9090/Publisher" );
      // Send each subscriber a message
      foreach(object o in _subscribers)
      {
        DictionaryEntry de = (DictionaryEntry)o;
        Subscriber s = (Subscriber)_subscribers[de.Key];
        SoapEnvelope responseMsg = new SoapEnvelope ();
          FileStream fs = new FileStream(e.FullPath ,FileMode.Open,
        FileAccess.Read , FileShare.ReadWrite );
        StreamReader sr = new StreamReader(fs);
        string strContents = sr.ReadToEnd() ;
        sr.Close();
        fs.Close();
            // Set the From Addressing value
        responseMsg.Context.Addressing.From = new From ( uriThis );
        responseMsg.Context.Addressing.Action  = new Action( "notify");
        responseMsg.CreateBody();
        responseMsg.Body.InnerXml = "<x:ArticlePublished xmlns:x=\"" + 
            "urn:ArticlePublisherApp:Publisher\">" +
             "<NOTIFY><FILE>" + e.Name + "</FILE><CONTENTS>" 
              + strContents + "</CONTENTS></NOTIFY></x:ArticlePublished>";

        // Send a Response Message
        SoapSender msgSender = new SoapSender (s.ReplyTo );
        msgSender.Send ( responseMsg );
       }
    }
    internal StringCollection GetSubscribers()
    {
     StringCollection coll = new StringCollection();
     foreach(Subscriber s in _subscribers)
     {
      coll.Add(String.Format("Name - {0}\t ID - {1}\t Reply To Uri {2}",
       s.Name,  s.ID,  s.ReplyTo.ToString()));
     }
     return coll;
    }
    private Hashtable _subscribers;
    private FileSystemWatcher fsw;
    }

    public class Subscriber
    {
        public string Name;
        public Uri ReplyTo;
        public string ID;
        public Subscriber(string name, Uri replyTo, string id)
        {
            Name = name;
            ReplyTo = replyTo;
            ID = id;
        }
    }
    }

客户端订阅者

客户端订阅者应用程序也分为两部分

  • 一个客户端订阅者 Windows .NET 应用程序。
  • 和一个 Subscriber 类。

客户端订阅应用程序是一个基本的 Windows 窗体应用程序,包含一个 MainMenu 和一个 StatusBar,以及一个只读 TextBoxMainMenu 有一个 MenuItem,可以向发布者发送订阅请求,以及取消订阅请求以停止订阅发布者。StatusBar 在客户端注册时显示客户端的注册 ID。只读 TextBox 显示发布者在任何给定时间决定发布文章/数据时发布的任何文章。Subscriber 类是一个继承自 SoapReceiver 类的自定义类。它重写了 Receive 方法,并检查 SoapEnvelope 上的 SoapAction 头部。它解析 SoapAction 以确定从发布者发送回的消息是订阅或取消订阅请求的简单响应,还是通知消息,告知订阅窗体发布者正在发送文章。要真正测试该应用程序,请启动客户端订阅应用程序的多个实例。

订阅者类代码

using Microsoft.Web.Services2.Messaging;
using Microsoft.Web.Services2.Addressing;
using System.Web.Services.Protocols;
using System.Xml;

namespace ClientSubscriptionApp
{

  public delegate void ResponseFromServerEventHandler(
                                        string Response);
  public delegate void SubscriptionNotificationEventHandler(
                                    string Notification);

  public class SubscriberNotification : SoapReceiver 
  {
    public event ResponseFromServerEventHandler ResponseFromServerEvent;
    public event SubscriptionNotificationEventHandler SubscriptionNotificationEvent;

    public SubscriberNotification()
    {    }
    
    protected void OnResponseFromServer (string Response)
    {
    if (ResponseFromServerEvent != null)
        ResponseFromServerEvent(Response);
    }

    protected void OnSubscriptionNotification(string Notification)
    {
    if (SubscriptionNotificationEvent != null)
        SubscriptionNotificationEvent(Notification);
    }

    protected override void Receive(Microsoft.Web.Services2.SoapEnvelope envelope)
    {
    string sResponse = string.Empty;
    Action act = envelope.Context.Addressing.Action;
    if (act == null)
       throw new  SoapHeaderException("Soap Action must be present",
         new XmlQualifiedName()) ;
    switch (act.Value.ToLower() )
    {
      case "response":
        sResponse = envelope.SelectSingleNode("//notify").InnerText ;
        OnResponseFromServer(sResponse);
      break;
      case "notify":
        sResponse = envelope.SelectSingleNode("//notify").InnerText ;
        OnSubscriptionNotification(sResponse);
      break;
      default :
      break;
     }
    }
  }
}

祝您编码愉快!

如果您喜欢这种 SOA 调调…敬请关注 BizTalk Server 2004 的文章!!!

© . All rights reserved.