65.9K
CodeProject 正在变化。 阅读更多。
Home

基于 WCF 的服务总线架构(3): 构建您的 ESB

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.87/5 (17投票s)

2011年2月9日

CPOL

4分钟阅读

viewsIcon

78274

downloadIcon

8290

基于 WCF 的服务总线架构(3): 构建您的 ESB

引言

距离我完成上一章已经快一年了。一年太久了,我得花几个小时来回顾一年前我做的事情。好消息是我还记得如何启动每个应用程序并将它们连接在一起,更好的消息是它们仍然运行得很好。最好的消息是,我可以继续写这篇文章了。

在上一章中,我们已经成功地将WCF服务从业务逻辑中抽象出来。基于我们所做的工作,我将创建我们的服务总线。让我们回到FuctionServiceMessageServiceFunctionService负责为客户端调用提供服务。MessageService负责向客户端订阅消息。

例如,有一个HubBeijing服务管理北京的产品库存,还有一个HubChina服务管理中国的产品库存。它们分别公开了FunctionServiceMessageService。它们分别管理自己的库存。我设置了一个简单的规则:如果客户端请求的产品数量超过产品库存,每个Hub都会发送一个缺货消息。假设这两个系统是独立的。北京是中国的一部分,当北京的Hub缺货时,中国的Hub将提供客户端请求的数量。由于北京的Hub和中国的Hub是独立的,它们不会直接通信,我们需要第三方将它们连接起来。这个第三方就是我们说的服务总线。我们已经将服务从业务逻辑中抽象出来,现在我们要将服务总线从业务逻辑中抽象出来,使其成为一个独立的组件。

背景

请查看如何将WCF服务从业务逻辑中抽象出来以及如何基于WCF实现订阅以获取更多上下文。

Using the Code

ESB组件实际上是一个连接不同服务并接受不同订阅的客户端。ESB组件能够连接许多服务提供商。它能够调用不同服务提供商的方法,并且也能够监听不同服务提供商的订阅。

BusEngine.RegisterNewService("Beijing Hub", "net.tcp://:8886/Inventory");
BusEngine.RegisterNewService("China Hub", "net.tcp://:8887/Inventory");
//In RegisterNewService, there is a reference to SAOTEnpoint. 
//The method will open RemoteCall service as well as Subscription Service
BusEngine.RegisterNewService
//Disconnected event will happen when any registered services connection is broken, 
//in the event, arg sender will tell you which service is broken.
BusEngine.Disconnected += new EventHandler(BusEngine_Disconnected);
//HeartBeatMessage will tell you if one service connection is still connected.
BusEngine.HeartBeatMessage += new EventHandler(BusEngine_HeartBeatMessage);
//MessageDelivered Event will get all messages from all registered services. 
//In the event, argument Trilobita.SAOT.Object.Message e will let you know 
//the detail information of the message.
//Trilobita.SAOT.Object.Message.ServiceName indicates the message is from which 
//registered service.
//Trilobita.SAOT.Object.Message. FunctionCode indicates customer defined flag like: 
//OUTOFSTOCK means the message is to notify one particular product is out of stock.
BusEngine.MessageDelivered += new EventHandler<trilobita.saot.object.message>
				(BusEngine_MessageDelivered);

//In order to provide an abstracted service bus, The message structure is predefined. 
//Users can use the predefined structure to define the customized data. 
//Content and Context is prepared for complex customized data. See the below diagram:

public class Message: EventArgs
    {
        #region Properties
        [DataMember]
        public string Title { get; set; }
        [DataMember]
        public string Content { get; set; }
        [DataMember]
        public string Context { get; set; }
        [DataMember]
        public DateTime Createtime { get; set; }
        [DataMember]
        public string PrimaryID { get; set; }
        [DataMember]
        public string Sender { get; set; }
        [DataMember]
        public string To { get; set; }
        [DataMember]
        public string CC { get; set; }
        [DataMember]
        public DateTime ExpiredTime { get; set; }
        [DataMember]
        public string Attachment { get; set; }
        [DataMember]
        public string FunctionCode { get; set; }
        [DataMember]
        public string ServiceName { get; set; }
        #endregion
    }

ESBComponent.jpg

我们仍然使用Hub的例子。ESB组件连接HubBeijing服务和HubChina服务。当北京的客户端请求的产品数量超过HubBeijing的库存时,HubBeijing会向所有注册者订阅一条消息。ESB组件也会收到消息并触发MessageEvent。现在,我们需要HubChina提供所请求的产品。因此,我们需要一个外部应用程序来捕获ESB组件中的MessageEvent,并在其中编写相应的逻辑来调用HubChina服务提供产品。请看下面的图

步骤0:启动服务 [项目:SAOTEngine]

//service engine has been abstracted, it is not able to recognize unknown type object. 
//AddKnowType method will tell service engine to be able to serialize the 
//given type object 
ServiceController.AddKnowType(typeof(Inventory));
            ServiceController.AddKnowType(typeof(Inventory[]));
//open, close event to notify external application if the service is successfully opened.
            ServiceController.ServiceOpenedEvent += 
		new EventHandler(ServiceController_ServiceOpenedEvent);
            ServiceController.ServiceFaultEvent += 
		new EventHandler(ServiceController_ServiceFaultEvent);

//open service for client invoking
ServiceController.OpenFunctionService(int.Parse(this.txtPort.Text), this.txtName.Text);
//open service for subscription
ServiceController.OpenMessageService(int.Parse(this.txtPort.Text), this.txtName.Text);

步骤0:客户端连接服务 [项目:SAOTEndPoint]

//client also need know the object type to deserialize the object 
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory));
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory[]));

//connect service, including subscription service 
this.MyService = new SAOTServiceStructre<istandardservice>
			("myservice",this.txtServer.Text);

//get subscription service client
MessageDistributor messager = this.MyService.MessageService;
            
//HeartBeat message indicate the connection is still ok
messager.HeartBeatFromServer += new EventHandler(Instance_HeartBeatFromServer);
//Disconnected message indicate the connection is broken
messager.ServerDisconnectedEvent += new EventHandler(messager_ServerDisconnectedEvent);
//Normal Message, the structure is predefined, please refer to the previous section
messager.MessageDelivered += new EventHandler<trilobita.saot.object.message>
				(messager_MessageDelivered);

步骤0:服务总线连接北京Hub和中国Hub [项目:ServiceBusMonitor, SAOTServiceBus]

//client also need know the object type to deserialize the object 
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory));
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory[]));

//add beijing service
BusEngine.RegisterNewService("Beijing Hub", "net.tcp://:8886/Inventory");
//add china service
BusEngine.RegisterNewService("China Hub", "net.tcp://:8887/Inventory");
BusEngine.Disconnected += new EventHandler(BusEngine_Disconnected);
BusEngine.HeartBeatMessage += new EventHandler(BusEngine_HeartBeatMessage);
BusEngine.MessageDelivered += new EventHandler
		<trilobita.saot.object.message>(BusEngine_MessageDelivered);

步骤1:北京客户端请求新产品 [项目:InventoryClient]

//since the service is abstracted, we use reflect to invoke remote method
this.GetService().Inovke("ChangeInventory", this.txtUser.Text, inventory.ProductID, 
                    inventory.CurrentInventory-inv.CurrentInventory);
//T  is the unknown type. we use AddKnowType to let service know T
object obj = typeof(T).InvokeMember(methodname, BindingFlags.Static |
BindingFlags.Public | BindingFlags.IgnoreCase | BindingFlags.InvokeMethod, 
		null, null, args);
return obj;

步骤2:北京Hub服务订阅库存不足 [项目:InventoryServer]

//Inventory.cs
public bool ChangeInventory(string user, int number)
        {
            int restnumber = this.CurrentInventory + number;
            if (restnumber > 0)
            {
                this.CurrentInventory += number;
                if (InventoryChangedEvent != null) 
                    InventoryChangedEvent(null, new InventoryArgs 
		{ Inventory = this, ChangedNumber = number, Operator=user });
                return true;
            }
            else
            {
            //when the inventory is out of stock, trigger OutofStockEvent
                if (OutofStockEvent != null)
                    OutofStockEvent(null, new InventoryArgs 
		{ Inventory = this, ChangedNumber = number, Operator = user });
                return false;
            }
            
        }
 //ServiceAdapter.cs       
Inventory.InventoryChangedEvent += 
	new EventHandler<inventoryargs>(Inventory_InventoryChangedEvent);
static void Inventory_OutofStockEvent(object sender, InventoryArgs e)
        {
            Trilobita.SAOT.Object.Message message = new Trilobita.SAOT.Object.Message();
            message.Sender = e.Operator;
            message.PrimaryID = e.Inventory.ProductID.ToString();
            message.Title = e.ChangedNumber.ToString();

            Trilobita.SAOT.Service.MessageBox box = 
		new Trilobita.SAOT.Service.MessageBox("OutofStock");
            box.MessageEnable = true;
            box.EmailEnable = false;
            box.DatabaseEnable = false;
            //subscribe that inventory is out of stock
            box.SendMessage(message);
        }
</inventoryargs>

步骤3:服务总线向中国Hub请求库存 [项目:ServiceBusMonitor]

//get china hub service and request for change inventory
//FmMonitor.cs
this.ChinaFunction.Inovke("ChangeInventory",e.Sender,productid,changednumber);

步骤4:… 我实际上没有实现它。默认情况下,中国Hub会满足请求。

ESB.jpg

如何演示应用程序

  1. 双击SAOT.exe,确认ServicePort文本框的值为8886,服务名称为Inventory。点击确认打开服务,如果服务成功打开,表单上会显示一条消息。SAOT服务用于远程调用,MessageSender用于订阅。此服务模拟北京Hub。
  2. 重复步骤1,只需将ServicePort值更改为8887。此服务模拟中国Hub。
  3. 双击ServiceBusMonitor.exe,确保服务列表已连接。这是ESB应用程序。
  4. 如果您严格按照上述步骤操作,请双击ClientTest.exe并单击确认按钮。如果连接良好,左上角的指示灯会闪烁。
  5. ClientTest表单中,输入一个产品名称和默认库存,例如产品名称:computer,库存:50。切换到第一个SAOT.exe(北京Hub),您将看到computer,库存为50。切换到第二个SAOT.exe(中国Hub),您将看到computer,库存为500,因为我在ESB应用程序中加入了一个逻辑,如果北京Hub有新产品,中国Hub将准备10倍的库存。关键在于,第一个SAOT.exe(北京Hub)没有连接第二个SAOT.exe(中国Hub),而ClientTest.exe只连接了第一个SAOT.exe(北京Hub)。新产品的库存是从ClientTest开始的。第二个SAOT.exe(中国Hub)之所以可以增加10倍的库存,是因为ServiceBusMonitor检测到了北京Hub的新产品库存消息,并自动增加了中国Hub的库存。
  6. ClientTest表单中,将computer的库存从50更改为40(为简化演示,我没有弹出表单来请求产品数量)。这相当于我们请求10台computer。切换到北京Hub,您将看到产品库存已更改为40。
  7. ClientTest表单中,将computer的库存从40更改为-10(抱歉,我只是想拿走50台computer)。切换到北京Hub,数量没有改变。切换到中国Hub,数量是450。这里的逻辑是,当客户端请求50台computer时,北京Hub将发送OutOfStock消息,ServicebusMonitor收到消息并将50台computer转移给北京Hub,以保持北京Hub库存的安全水平。

Picture_2.png - Click to enlarge image

© . All rights reserved.