基于 WCF 的服务总线架构(3): 构建您的 ESB






4.87/5 (17投票s)
基于 WCF 的服务总线架构(3):
引言
距离我完成上一章已经快一年了。一年太久了,我得花几个小时来回顾一年前我做的事情。好消息是我还记得如何启动每个应用程序并将它们连接在一起,更好的消息是它们仍然运行得很好。最好的消息是,我可以继续写这篇文章了。
在上一章中,我们已经成功地将WCF服务从业务逻辑中抽象出来。基于我们所做的工作,我将创建我们的服务总线。让我们回到FuctionService
和MessageService
。FunctionService
负责为客户端调用提供服务。MessageService
负责向客户端订阅消息。
例如,有一个HubBeijing服务管理北京的产品库存,还有一个HubChina服务管理中国的产品库存。它们分别公开了FunctionService
和MessageService
。它们分别管理自己的库存。我设置了一个简单的规则:如果客户端请求的产品数量超过产品库存,每个Hub都会发送一个缺货消息。假设这两个系统是独立的。北京是中国的一部分,当北京的Hub缺货时,中国的Hub将提供客户端请求的数量。由于北京的Hub和中国的Hub是独立的,它们不会直接通信,我们需要第三方将它们连接起来。这个第三方就是我们说的服务总线。我们已经将服务从业务逻辑中抽象出来,现在我们要将服务总线从业务逻辑中抽象出来,使其成为一个独立的组件。
背景
请查看如何将WCF服务从业务逻辑中抽象出来以及如何基于WCF实现订阅以获取更多上下文。
Using the Code
ESB组件实际上是一个连接不同服务并接受不同订阅的客户端。ESB组件能够连接许多服务提供商。它能够调用不同服务提供商的方法,并且也能够监听不同服务提供商的订阅。
BusEngine.RegisterNewService("Beijing Hub", "net.tcp://:8886/Inventory");
BusEngine.RegisterNewService("China Hub", "net.tcp://:8887/Inventory");
//In RegisterNewService, there is a reference to SAOTEnpoint.
//The method will open RemoteCall service as well as Subscription Service
BusEngine.RegisterNewService
//Disconnected event will happen when any registered services connection is broken,
//in the event, arg sender will tell you which service is broken.
BusEngine.Disconnected += new EventHandler(BusEngine_Disconnected);
//HeartBeatMessage will tell you if one service connection is still connected.
BusEngine.HeartBeatMessage += new EventHandler(BusEngine_HeartBeatMessage);
//MessageDelivered Event will get all messages from all registered services.
//In the event, argument Trilobita.SAOT.Object.Message e will let you know
//the detail information of the message.
//Trilobita.SAOT.Object.Message.ServiceName indicates the message is from which
//registered service.
//Trilobita.SAOT.Object.Message. FunctionCode indicates customer defined flag like:
//OUTOFSTOCK means the message is to notify one particular product is out of stock.
BusEngine.MessageDelivered += new EventHandler<trilobita.saot.object.message>
(BusEngine_MessageDelivered);
//In order to provide an abstracted service bus, The message structure is predefined.
//Users can use the predefined structure to define the customized data.
//Content and Context is prepared for complex customized data. See the below diagram:
public class Message: EventArgs
{
#region Properties
[DataMember]
public string Title { get; set; }
[DataMember]
public string Content { get; set; }
[DataMember]
public string Context { get; set; }
[DataMember]
public DateTime Createtime { get; set; }
[DataMember]
public string PrimaryID { get; set; }
[DataMember]
public string Sender { get; set; }
[DataMember]
public string To { get; set; }
[DataMember]
public string CC { get; set; }
[DataMember]
public DateTime ExpiredTime { get; set; }
[DataMember]
public string Attachment { get; set; }
[DataMember]
public string FunctionCode { get; set; }
[DataMember]
public string ServiceName { get; set; }
#endregion
}
我们仍然使用Hub的例子。ESB组件连接HubBeijing服务和HubChina服务。当北京的客户端请求的产品数量超过HubBeijing的库存时,HubBeijing会向所有注册者订阅一条消息。ESB组件也会收到消息并触发MessageEvent
。现在,我们需要HubChina提供所请求的产品。因此,我们需要一个外部应用程序来捕获ESB组件中的MessageEvent
,并在其中编写相应的逻辑来调用HubChina服务提供产品。请看下面的图
步骤0:启动服务 [项目:SAOTEngine]
//service engine has been abstracted, it is not able to recognize unknown type object.
//AddKnowType method will tell service engine to be able to serialize the
//given type object
ServiceController.AddKnowType(typeof(Inventory));
ServiceController.AddKnowType(typeof(Inventory[]));
//open, close event to notify external application if the service is successfully opened.
ServiceController.ServiceOpenedEvent +=
new EventHandler(ServiceController_ServiceOpenedEvent);
ServiceController.ServiceFaultEvent +=
new EventHandler(ServiceController_ServiceFaultEvent);
//open service for client invoking
ServiceController.OpenFunctionService(int.Parse(this.txtPort.Text), this.txtName.Text);
//open service for subscription
ServiceController.OpenMessageService(int.Parse(this.txtPort.Text), this.txtName.Text);
步骤0:客户端连接服务 [项目:SAOTEndPoint]
//client also need know the object type to deserialize the object
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory));
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory[]));
//connect service, including subscription service
this.MyService = new SAOTServiceStructre<istandardservice>
("myservice",this.txtServer.Text);
//get subscription service client
MessageDistributor messager = this.MyService.MessageService;
//HeartBeat message indicate the connection is still ok
messager.HeartBeatFromServer += new EventHandler(Instance_HeartBeatFromServer);
//Disconnected message indicate the connection is broken
messager.ServerDisconnectedEvent += new EventHandler(messager_ServerDisconnectedEvent);
//Normal Message, the structure is predefined, please refer to the previous section
messager.MessageDelivered += new EventHandler<trilobita.saot.object.message>
(messager_MessageDelivered);
步骤0:服务总线连接北京Hub和中国Hub [项目:ServiceBusMonitor, SAOTServiceBus]
//client also need know the object type to deserialize the object
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory));
FunctionServiceConnector.AddKnownType(typeof(SAOT.Inventory[]));
//add beijing service
BusEngine.RegisterNewService("Beijing Hub", "net.tcp://:8886/Inventory");
//add china service
BusEngine.RegisterNewService("China Hub", "net.tcp://:8887/Inventory");
BusEngine.Disconnected += new EventHandler(BusEngine_Disconnected);
BusEngine.HeartBeatMessage += new EventHandler(BusEngine_HeartBeatMessage);
BusEngine.MessageDelivered += new EventHandler
<trilobita.saot.object.message>(BusEngine_MessageDelivered);
步骤1:北京客户端请求新产品 [项目:InventoryClient]
//since the service is abstracted, we use reflect to invoke remote method
this.GetService().Inovke("ChangeInventory", this.txtUser.Text, inventory.ProductID,
inventory.CurrentInventory-inv.CurrentInventory);
//T is the unknown type. we use AddKnowType to let service know T
object obj = typeof(T).InvokeMember(methodname, BindingFlags.Static |
BindingFlags.Public | BindingFlags.IgnoreCase | BindingFlags.InvokeMethod,
null, null, args);
return obj;
步骤2:北京Hub服务订阅库存不足 [项目:InventoryServer]
//Inventory.cs
public bool ChangeInventory(string user, int number)
{
int restnumber = this.CurrentInventory + number;
if (restnumber > 0)
{
this.CurrentInventory += number;
if (InventoryChangedEvent != null)
InventoryChangedEvent(null, new InventoryArgs
{ Inventory = this, ChangedNumber = number, Operator=user });
return true;
}
else
{
//when the inventory is out of stock, trigger OutofStockEvent
if (OutofStockEvent != null)
OutofStockEvent(null, new InventoryArgs
{ Inventory = this, ChangedNumber = number, Operator = user });
return false;
}
}
//ServiceAdapter.cs
Inventory.InventoryChangedEvent +=
new EventHandler<inventoryargs>(Inventory_InventoryChangedEvent);
static void Inventory_OutofStockEvent(object sender, InventoryArgs e)
{
Trilobita.SAOT.Object.Message message = new Trilobita.SAOT.Object.Message();
message.Sender = e.Operator;
message.PrimaryID = e.Inventory.ProductID.ToString();
message.Title = e.ChangedNumber.ToString();
Trilobita.SAOT.Service.MessageBox box =
new Trilobita.SAOT.Service.MessageBox("OutofStock");
box.MessageEnable = true;
box.EmailEnable = false;
box.DatabaseEnable = false;
//subscribe that inventory is out of stock
box.SendMessage(message);
}
</inventoryargs>
步骤3:服务总线向中国Hub请求库存 [项目:ServiceBusMonitor]
//get china hub service and request for change inventory
//FmMonitor.cs
this.ChinaFunction.Inovke("ChangeInventory",e.Sender,productid,changednumber);
步骤4:… 我实际上没有实现它。默认情况下,中国Hub会满足请求。
如何演示应用程序
- 双击SAOT.exe,确认
ServicePort
文本框的值为8886,服务名称为Inventory。点击确认打开服务,如果服务成功打开,表单上会显示一条消息。SAOT服务用于远程调用,MessageSender
用于订阅。此服务模拟北京Hub。 - 重复步骤1,只需将
ServicePort
值更改为8887。此服务模拟中国Hub。 - 双击ServiceBusMonitor.exe,确保服务列表已连接。这是ESB应用程序。
- 如果您严格按照上述步骤操作,请双击ClientTest.exe并单击确认按钮。如果连接良好,左上角的指示灯会闪烁。
- 在
ClientTest
表单中,输入一个产品名称和默认库存,例如产品名称:computer,库存:50。切换到第一个SAOT.exe(北京Hub),您将看到computer,库存为50。切换到第二个SAOT.exe(中国Hub),您将看到computer,库存为500,因为我在ESB应用程序中加入了一个逻辑,如果北京Hub有新产品,中国Hub将准备10倍的库存。关键在于,第一个SAOT.exe(北京Hub)没有连接第二个SAOT.exe(中国Hub),而ClientTest.exe只连接了第一个SAOT.exe(北京Hub)。新产品的库存是从ClientTest
开始的。第二个SAOT.exe(中国Hub)之所以可以增加10倍的库存,是因为ServiceBusMonitor
检测到了北京Hub的新产品库存消息,并自动增加了中国Hub的库存。 - 在
ClientTest
表单中,将computer的库存从50更改为40(为简化演示,我没有弹出表单来请求产品数量)。这相当于我们请求10台computer。切换到北京Hub,您将看到产品库存已更改为40。 - 在
ClientTest
表单中,将computer的库存从40更改为-10(抱歉,我只是想拿走50台computer)。切换到北京Hub,数量没有改变。切换到中国Hub,数量是450。这里的逻辑是,当客户端请求50台computer时,北京Hub将发送OutOfStock
消息,ServicebusMonitor
收到消息并将50台computer转移给北京Hub,以保持北京Hub库存的安全水平。