使用 x2net 编写可分发的 .NET 应用程序
分布式新方法介绍
引言
编写分布式应用程序,尤其是跨网络部署的应用程序,往往会带来挑战,这不仅是因为网络编程的复杂性,更重要的是,你的代码,业务逻辑与通信细节混杂在一起,可能不够灵活,难以单独复用和测试。
与此同时,大多数程序员已经知道如何使他们的代码变得灵活、可复用和可测试。是的,减少代码耦合,通常通过引入额外的间接层来实现,这是明确的方向。那么,为什么我们不将相同的技术应用于整体应用程序架构呢?简单地将通信细节与应用程序逻辑解耦,将有助于我们构建一个灵活可分发、完全可测试的由可复用模块组成的应用程序。
在本文中,我们将通过几个简单的 x2net 应用程序示例,了解分布是如何在 x2 方式下工作的。
背景
x2
x2 是一组概念和规范,它促进了高度灵活的跨平台、跨语言分布式系统的开发。在进一步深入之前,建议您先了解它的 README.md 和 concepts.md。
x2net
x2net 是 x2 的参考端口,用 C# 编写,面向通用的 .NET 环境。
Using the Code
为了专注于结构方面,我们从一个极其简单的应用程序开始,名为 Hello
。
public class Hello
{
public static void Main()
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
var greeting = String.Format("Hello, {0}!", input);
Console.WriteLine(greeting);
}
}
}
定义事件
x2 应用程序由逻辑处理单元(或流程)组成,这些单元之间仅通过事件进行通信。因此,在设计时定义共享事件层次结构是关键活动。在这个简单的例子中,我们可以抓住构成问候句的关键特征,它源自名称输入。我们为这个功能定义一个请求/响应事件对,如下所示:
<?xml version="1.0" encoding="utf-8"?>
<x2 namespace="hello">
<definitions>
<!-- Hello request event. -->
<event name="HelloReq" id="1">
<!-- Input name. -->
<property name="Name" type="string"/>
</event>
<!-- Hello response event. -->
<event name="HelloResp" id="2">
<!-- Resultant greeting sentence. -->
<property name="Greeting" type="string"/>
</event>
</definitions>
</x2>
运行此 XML 定义文件的 x2net.xpiler 将生成一个相应的 C# 源文件,我们可以将其包含到我们的项目中。
准备核心逻辑模块
一旦我们定义了事件,我们就可以编写应用程序逻辑处理单元来处理这些事件。这里,我们编写一个简单的处理单元来创建问候语。
public class HelloCase : Case
{
protected override void Setup()
{
// Bind the HelloReq event handler.
Bind(new HelloReq(), OnHelloReq);
}
void OnHelloReq(HelloReq req)
{
// Create a new HelloResp event.
new HelloResp {
// Set its Greeting property as a generated sentence.
Greeting = String.Format("Hello, {0}!", req.Name)
}
.InResponseOf(req) // Copy the req._Handle builtin property.
.Post(); // And post it to the hub.
}
}
请注意,逻辑处理单元通过发布另一个事件来响应它们感兴趣的事件。它们不知道通信细节:请求事件来自何处,响应事件将发送到何处。因此,这些逻辑处理单元可以自由地位于分布式应用程序的任何位置,而无需任何更改。它们也可以轻松地单独进行测试。
第一个 x2net 应用程序
拥有相关的事件和处理单元后,我们现在就可以用这些结构来设置我们的第一个 x2net 应用程序了。
public class HelloStandalone
{
class LocalCase : Case
{
protected override void Setup()
{
// To print the content of HelloResp to the console output.
Bind(new HelloResp(), (e) => {
Console.WriteLine(e.Greeting);
});
}
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new HelloCase())
.Add(new LocalCase()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
new HelloReq { Name = input }.Post();
}
}
}
}
这与我们原来的控制台应用程序的工作方式完全相同,但采用了 x2 的方式。
- 控制台输入会生成一个
HelloReq
事件。 HelloCase
接收HelloReq
事件,并返回一个HelloResp
事件,其中包含生成的问候语。LocalCase
接收HelloResp
事件,并将其内容打印到控制台输出。
现在我们有了一个 x2 应用程序,我们可以轻松地更改应用程序的线程模型或分发拓扑。例如,应用以下更改将使我们的每个处理单元都在一个单独的线程中运行:
...
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new HelloCase()))
.Attach(new SingleThreadFlow()
.Add(new LocalCase()));
...
更改其线程模型可能不是很有趣。但如何在几分钟内将其变成一个客户端/服务器应用程序呢?
2 层分发:客户端/服务器
首先,我们准备一个服务器,它将 HelloCase
作为其主要逻辑处理单元运行。
public class HelloTcpServer : AsyncTcpServer
{
public HelloTcpServer() : base("HelloServer")
{
}
protected override void Setup()
{
// Will receive HelloReq events through this link.
EventFactory.Register<HelloReq>();
// Will send out HelloResp events through this link.
// Events will be dispatched according to the _Handle property.
Bind(new HelloResp(), Send);
// Listen on the port 6789 on start.
Listen(6789);
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new HelloCase())
.Add(new HelloTcpServer()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
}
}
}
}
然后,我们可以编写一个简单的客户端来连接服务器以完成工作。
public class HelloTcpClient : TcpClient
{
class LocalCase : Case
{
protected override void Setup()
{
// To print the content of HelloResp to the console output.
Bind(new HelloResp(), (e) => {
Console.WriteLine(e.Greeting);
});
}
}
public HelloTcpClient() : base("HelloClient")
{
}
protected override void Setup()
{
// Will receive HelloResp events through this link.
EventFactory.Register<HelloResp>();
// Will send out every HelloReq events through this link.
Bind(new HelloReq(), Send);
// Connect to localhost:6789 on start.
Connect("127.0.0.1", 6789);
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new LocalCase())
.Add(new HelloTcpClient()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
new HelloReq { Name = input }.Post();
}
}
}
}
请注意,无论 HelloCase
是在独立应用程序中运行还是在服务器中运行,它都不会改变。
在上面的服务器链接中,您可能会想知道我们如何将响应事件发送给最初发出请求的客户端。内置的事件属性 _Handle
可以做到这一点。当一个 x2net 链接从网络接收到事件时,其 _Handle
属性被设置为链接会话句柄。如果响应事件的 _Handle
属性与原始请求的 _Handle
属性相同(这由 InResponseOf
扩展方法完成),服务器就可以通过 _Handle
属性找到目标链接会话。
添加功能
假设我们要添加一项将结果字符串转换为大写字母的新功能。我们将两个事件追加到定义文件中,如下所示:
<?xml version="1.0" encoding="utf-8"?>
<x2 namespace="hello">
<definitions>
<!-- Hello request event. -->
<event name="HelloReq" id="1">
<!-- Input name. -->
<property name="Name" type="string"/>
</event>
<!-- Hello response event. -->
<event name="HelloResp" id="2">
<!-- Resultant greeting sentence. -->
<property name="Greeting" type="string"/>
</event>
<!-- Capitalize request event. -->
<event name="CapitalizeReq" id="3">
<!-- Input string. -->
<property name="Input" type="string"/>
</event>
<!-- Capitalize response event. -->
<event name="CapitalizeResp" id="4">
<!-- Output string. -->
<property name="Output" type="string"/>
</event>
</definitions>
</x2>
然后,我们在共享模块中添加一个新的逻辑处理单元:
public class CapitalizerCase : Case
{
protected override void Setup()
{
// Bind the CapitalizeReq event handler.
Bind(new CapitalizeReq(), OnCapitalizeReq);
}
void OnCapitalizeReq(CapitalizeReq req)
{
// Create a new CapitalizeResp event.
new CapitalizeResp {
// Set its Output property, applying ToUpper() method.
Output = req.Input.ToUpper()
}
.InResponseOf(req) // Copy the req._Handle builtin property.
.Post(); // And post it to the hub.
}
}
然后,我们可以重写我们的独立应用程序,如下所示:
public class HelloStandalone
{
class LocalCase : Case
{
protected override void Setup()
{
// To chain a new CapitalizeReq in response of a HelloResp event.
Bind(new HelloResp(), (e) => {
new CapitalizeReq {
Input = e.Greeting
}.InResponseOf(e).Post();
});
// To print the content of CapitalizeResp to the console output.
Bind(new CapitalizeResp(), (e) => {
Console.WriteLine(e.Output);
});
}
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new HelloCase())
.Add(new CapitalizerCase())
.Add(new LocalCase()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
new HelloReq { Name = input }.Post();
}
}
}
}
3 层分发:客户端/前端服务器/后端服务器
在 x2 应用程序中,添加或删除分发层并不是什么大问题。您需要做的就是设置必要的链接来正确地发送/接收事件。
这是我们的后端服务器,它将 CapitalizerCase
作为其主要逻辑处理单元运行。
public class HelloTcpBackend : AsyncTcpServer
{
public HelloTcpBackend() : base("HelloBackend")
{
}
protected override void Setup()
{
EventFactory.Register<CapitalizeReq>();
Bind(new CapitalizeResp(), Send);
Listen(7890);
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new CapitalizerCase())
.Add(new HelloTcpBackend()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
}
}
}
}
我们还构建了一个前端服务器,它将 HelloCase
作为其主要逻辑处理单元运行,并将大写任务委托给后端服务器。
class BackendClient : AsyncTcpClient
{
public BackendClient() : base("BackendClient") {}
protected override void Setup()
{
EventFactory.Register<CapitalizeResp>();
Bind(new CapitalizeReq(), Send);
Connect("127.0.0.1", 7890);
}
}
public class HelloTcpFrontend : AsyncTcpServer
{
public HelloTcpFrontend() : base("HelloFrontend")
{
}
protected override void Setup()
{
EventFactory.Register<HelloReq>();
Bind(new HelloResp(), OnHelloResp);
Listen(6789);
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new HelloCase())
.Add(new BackendClient())
.Add(new HelloTcpFrontend()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
}
}
}
IEnumerator OnHelloResp(Coroutine coroutine, HelloResp e)
{
// Backup the _Handle builtin property of the original response.
int handle = e._Handle;
// Post a CapitalizeReq event in chain
// and wait for the corresponding response.
yield return coroutine.WaitForSingleResponse(
new CapitalizeReq { Input = e.Greeting },
new CapitalizeResp());
var result = coroutine.Result as CapitalizeResp;
if (result == null)
{
// Timeout
yield break;
}
// Now we got the CapitalizeResp event.
// Set the _Handle property to match the original response.
result._Handle = handle;
// And send the resultant event to the client,
// according to the _Handle builtin property.
Send(result);
}
}
在之前的客户端/服务器分发中,我们依赖于内置的事件属性 _Handle
将响应事件分派到适当的会话。但在这种拓扑结构中,我们不能这样做。如果这是一个基于身份验证的真实应用程序,我们可能会通过经过身份验证的用户标识符来绑定事件。但是,为了处理这个简单示例中的情况,我们提出了上面所示的一个特殊的 x2net 协程处理器。
然后,我们可以使用类似的客户端连接到前端服务器来完成工作。
public class HelloTcpClient : TcpClient
{
class LocalCase : Case
{
protected override void Setup()
{
// To print the content of CapitalizeResp to the console output.
Bind(new CapitalizeResp(), (e) => {
Console.WriteLine(e.Output);
});
}
}
public HelloTcpClient() : base("HelloClient")
{
}
protected override void Setup()
{
EventFactory.Register<CapitalizeResp>();
Bind(new HelloReq(), Send);
Connect("127.0.0.1", 6789);
}
public static void Main()
{
Hub.Instance
.Attach(new SingleThreadFlow()
.Add(new LocalCase())
.Add(new HelloTcpClient()));
using (new Hub.Flows().Startup())
{
while (true)
{
var input = Console.ReadLine();
if (input == "bye")
{
break;
}
new HelloReq { Name = input }.Post();
}
}
}
}
关注点
逻辑-通信解耦本身既不是一个新概念,也不是一个流行的概念。如果您习惯于 SendPacket 式的通信,可能需要一些时间才能适应 x2 式的分发。这种转变有点像从消息传递转向生成式通信,而且绝对值得尝试。