65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 x2net 编写可分发的 .NET 应用程序

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2018 年 4 月 5 日

CPOL

5分钟阅读

viewsIcon

16587

downloadIcon

149

分布式新方法介绍

引言

编写分布式应用程序,尤其是跨网络部署的应用程序,往往会带来挑战,这不仅是因为网络编程的复杂性,更重要的是,你的代码,业务逻辑与通信细节混杂在一起,可能不够灵活,难以单独复用和测试。

与此同时,大多数程序员已经知道如何使他们的代码变得灵活、可复用和可测试。是的,减少代码耦合,通常通过引入额外的间接层来实现,这是明确的方向。那么,为什么我们不将相同的技术应用于整体应用程序架构呢?简单地将通信细节与应用程序逻辑解耦,将有助于我们构建一个灵活可分发、完全可测试的由可复用模块组成的应用程序。

在本文中,我们将通过几个简单的 x2net 应用程序示例,了解分布是如何在 x2 方式下工作的。

背景

x2

x2 是一组概念和规范,它促进了高度灵活的跨平台、跨语言分布式系统的开发。在进一步深入之前,建议您先了解它的 README.mdconcepts.md

x2net

x2net 是 x2 的参考端口,用 C# 编写,面向通用的 .NET 环境。

Using the Code

为了专注于结构方面,我们从一个极其简单的应用程序开始,名为 Hello

public class Hello
{
    public static void Main()
    {
        while (true)
        {
            var input = Console.ReadLine();
            if (input == "bye")
            {
                break;
            }
            var greeting = String.Format("Hello, {0}!", input);
            Console.WriteLine(greeting);
        }
    }
}

定义事件

x2 应用程序由逻辑处理单元(或流程)组成,这些单元之间仅通过事件进行通信。因此,在设计时定义共享事件层次结构是关键活动。在这个简单的例子中,我们可以抓住构成问候句的关键特征,它源自名称输入。我们为这个功能定义一个请求/响应事件对,如下所示:

<?xml version="1.0" encoding="utf-8"?>
<x2 namespace="hello">
  <definitions>
    <!-- Hello request event. -->
    <event name="HelloReq" id="1">
      <!-- Input name. -->
      <property name="Name" type="string"/>
    </event>
    <!-- Hello response event. -->
    <event name="HelloResp" id="2">
      <!-- Resultant greeting sentence. -->
      <property name="Greeting" type="string"/>
    </event>
  </definitions>
</x2>

运行此 XML 定义文件的 x2net.xpiler 将生成一个相应的 C# 源文件,我们可以将其包含到我们的项目中。

准备核心逻辑模块

一旦我们定义了事件,我们就可以编写应用程序逻辑处理单元来处理这些事件。这里,我们编写一个简单的处理单元来创建问候语。

public class HelloCase : Case
{
    protected override void Setup()
    {
        // Bind the HelloReq event handler.
        Bind(new HelloReq(), OnHelloReq);
    }

    void OnHelloReq(HelloReq req)
    {
        // Create a new HelloResp event.
        new HelloResp {
            // Set its Greeting property as a generated sentence.
            Greeting = String.Format("Hello, {0}!", req.Name)
        }
            .InResponseOf(req)  // Copy the req._Handle builtin property.
            .Post();            // And post it to the hub.
    }
}

请注意,逻辑处理单元通过发布另一个事件来响应它们感兴趣的事件。它们不知道通信细节:请求事件来自何处,响应事件将发送到何处。因此,这些逻辑处理单元可以自由地位于分布式应用程序的任何位置,而无需任何更改。它们也可以轻松地单独进行测试。

第一个 x2net 应用程序

拥有相关的事件和处理单元后,我们现在就可以用这些结构来设置我们的第一个 x2net 应用程序了。

public class HelloStandalone
{
    class LocalCase : Case
    {
        protected override void Setup()
        {
            // To print the content of HelloResp to the console output.
            Bind(new HelloResp(), (e) => {
                Console.WriteLine(e.Greeting);
            });
        }
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new HelloCase())
                .Add(new LocalCase()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
                new HelloReq { Name = input }.Post();
            }
        }
    }
}

这与我们原来的控制台应用程序的工作方式完全相同,但采用了 x2 的方式。

  • 控制台输入会生成一个 HelloReq 事件。
  • HelloCase 接收 HelloReq 事件,并返回一个 HelloResp 事件,其中包含生成的问候语。
  • LocalCase 接收 HelloResp 事件,并将其内容打印到控制台输出。

现在我们有了一个 x2 应用程序,我们可以轻松地更改应用程序的线程模型或分发拓扑。例如,应用以下更改将使我们的每个处理单元都在一个单独的线程中运行:

...
    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new HelloCase()))
            .Attach(new SingleThreadFlow()
                .Add(new LocalCase()));
...

更改其线程模型可能不是很有趣。但如何在几分钟内将其变成一个客户端/服务器应用程序呢?

2 层分发:客户端/服务器

首先,我们准备一个服务器,它将 HelloCase 作为其主要逻辑处理单元运行。

public class HelloTcpServer : AsyncTcpServer
{
    public HelloTcpServer() : base("HelloServer")
    {
    }

    protected override void Setup()
    {
        // Will receive HelloReq events through this link.
        EventFactory.Register<HelloReq>();
        // Will send out HelloResp events through this link.
        // Events will be dispatched according to the _Handle property.
        Bind(new HelloResp(), Send);
        // Listen on the port 6789 on start.
        Listen(6789);
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new HelloCase())
                .Add(new HelloTcpServer()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
            }
        }
    }
}

然后,我们可以编写一个简单的客户端来连接服务器以完成工作。

public class HelloTcpClient : TcpClient
{
    class LocalCase : Case
    {
        protected override void Setup()
        {
            // To print the content of HelloResp to the console output.
            Bind(new HelloResp(), (e) => {
                Console.WriteLine(e.Greeting);
            });
        }
    }

    public HelloTcpClient() : base("HelloClient")
    {
    }

    protected override void Setup()
    {
        // Will receive HelloResp events through this link.
        EventFactory.Register<HelloResp>();
        // Will send out every HelloReq events through this link.
        Bind(new HelloReq(), Send);
        // Connect to localhost:6789 on start.
        Connect("127.0.0.1", 6789);
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new LocalCase())
                .Add(new HelloTcpClient()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
                new HelloReq { Name = input }.Post();
            }
        }
    }
}

请注意,无论 HelloCase 是在独立应用程序中运行还是在服务器中运行,它都不会改变。

在上面的服务器链接中,您可能会想知道我们如何将响应事件发送给最初发出请求的客户端。内置的事件属性 _Handle 可以做到这一点。当一个 x2net 链接从网络接收到事件时,其 _Handle 属性被设置为链接会话句柄。如果响应事件的 _Handle 属性与原始请求的 _Handle 属性相同(这由 InResponseOf 扩展方法完成),服务器就可以通过 _Handle 属性找到目标链接会话。

添加功能

假设我们要添加一项将结果字符串转换为大写字母的新功能。我们将两个事件追加到定义文件中,如下所示:

<?xml version="1.0" encoding="utf-8"?>
<x2 namespace="hello">
  <definitions>
    <!-- Hello request event. -->
    <event name="HelloReq" id="1">
      <!-- Input name. -->
      <property name="Name" type="string"/>
    </event>
    <!-- Hello response event. -->
    <event name="HelloResp" id="2">
      <!-- Resultant greeting sentence. -->
      <property name="Greeting" type="string"/>
    </event>

    <!-- Capitalize request event. -->
    <event name="CapitalizeReq" id="3">
      <!-- Input string. -->
      <property name="Input" type="string"/>
    </event>
    <!-- Capitalize response event. -->
    <event name="CapitalizeResp" id="4">
      <!-- Output string. -->
      <property name="Output" type="string"/>
    </event>
  </definitions>
</x2>

然后,我们在共享模块中添加一个新的逻辑处理单元:

public class CapitalizerCase : Case
{
    protected override void Setup()
    {
        // Bind the CapitalizeReq event handler.
        Bind(new CapitalizeReq(), OnCapitalizeReq);
    }

    void OnCapitalizeReq(CapitalizeReq req)
    {
        // Create a new CapitalizeResp event.
        new CapitalizeResp {
            // Set its Output property, applying ToUpper() method.
            Output = req.Input.ToUpper()
        }
            .InResponseOf(req)  // Copy the req._Handle builtin property.
            .Post();            // And post it to the hub.
    }
}

然后,我们可以重写我们的独立应用程序,如下所示:

public class HelloStandalone
{
    class LocalCase : Case
    {
        protected override void Setup()
        {
            // To chain a new CapitalizeReq in response of a HelloResp event.
            Bind(new HelloResp(), (e) => {
                new CapitalizeReq {
                    Input = e.Greeting
                }.InResponseOf(e).Post();
            });
            // To print the content of CapitalizeResp to the console output.
            Bind(new CapitalizeResp(), (e) => {
                Console.WriteLine(e.Output);
            });
        }
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new HelloCase())
                .Add(new CapitalizerCase())
                .Add(new LocalCase()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
                new HelloReq { Name = input }.Post();
            }
        }
    }
}

3 层分发:客户端/前端服务器/后端服务器

在 x2 应用程序中,添加或删除分发层并不是什么大问题。您需要做的就是设置必要的链接来正确地发送/接收事件。

这是我们的后端服务器,它将 CapitalizerCase 作为其主要逻辑处理单元运行。

public class HelloTcpBackend : AsyncTcpServer
{
    public HelloTcpBackend() : base("HelloBackend")
    {
    }

    protected override void Setup()
    {
        EventFactory.Register<CapitalizeReq>();
        Bind(new CapitalizeResp(), Send);
        Listen(7890);
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new CapitalizerCase())
                .Add(new HelloTcpBackend()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
            }
        }
    }
}

我们还构建了一个前端服务器,它将 HelloCase 作为其主要逻辑处理单元运行,并将大写任务委托给后端服务器。

class BackendClient : AsyncTcpClient
{
    public BackendClient() : base("BackendClient") {}

    protected override void Setup()
    {
        EventFactory.Register<CapitalizeResp>();
        Bind(new CapitalizeReq(), Send);
        Connect("127.0.0.1", 7890);
    }
}

public class HelloTcpFrontend : AsyncTcpServer
{
    public HelloTcpFrontend() : base("HelloFrontend")
    {
    }

    protected override void Setup()
    {
        EventFactory.Register<HelloReq>();
        Bind(new HelloResp(), OnHelloResp);
        Listen(6789);
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new HelloCase())
                .Add(new BackendClient())
                .Add(new HelloTcpFrontend()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
            }
        }
    }

    IEnumerator OnHelloResp(Coroutine coroutine, HelloResp e)
    {
        // Backup the _Handle builtin property of the original response.
        int handle = e._Handle;

        // Post a CapitalizeReq event in chain
        // and wait for the corresponding response.
        yield return coroutine.WaitForSingleResponse(
            new CapitalizeReq { Input = e.Greeting },
            new CapitalizeResp());

        var result = coroutine.Result as CapitalizeResp;
        if (result == null)
        {
            // Timeout
            yield break;
        }

        // Now we got the CapitalizeResp event.
        // Set the _Handle property to match the original response.
        result._Handle = handle;
        // And send the resultant event to the client,
        // according to the _Handle builtin property.
        Send(result);
    }
}

在之前的客户端/服务器分发中,我们依赖于内置的事件属性 _Handle 将响应事件分派到适当的会话。但在这种拓扑结构中,我们不能这样做。如果这是一个基于身份验证的真实应用程序,我们可能会通过经过身份验证的用户标识符来绑定事件。但是,为了处理这个简单示例中的情况,我们提出了上面所示的一个特殊的 x2net 协程处理器。

然后,我们可以使用类似的客户端连接到前端服务器来完成工作。

public class HelloTcpClient : TcpClient
{
    class LocalCase : Case
    {
        protected override void Setup()
        {
            // To print the content of CapitalizeResp to the console output.
            Bind(new CapitalizeResp(), (e) => {
                Console.WriteLine(e.Output);
            });
        }
    }

    public HelloTcpClient() : base("HelloClient")
    {
    }

    protected override void Setup()
    {
        EventFactory.Register<CapitalizeResp>();
        Bind(new HelloReq(), Send);
        Connect("127.0.0.1", 6789);
    }

    public static void Main()
    {
        Hub.Instance
            .Attach(new SingleThreadFlow()
                .Add(new LocalCase())
                .Add(new HelloTcpClient()));

        using (new Hub.Flows().Startup())
        {
            while (true)
            {
                var input = Console.ReadLine();
                if (input == "bye")
                {
                    break;
                }
                new HelloReq { Name = input }.Post();
            }
        }
    }
}

关注点

逻辑-通信解耦本身既不是一个新概念,也不是一个流行的概念。如果您习惯于 SendPacket 式的通信,可能需要一些时间才能适应 x2 式的分发。这种转变有点像从消息传递转向生成式通信,而且绝对值得尝试。

© . All rights reserved.