65.9K
CodeProject 正在变化。 阅读更多。
Home

用于 .NET 的轻量级 WebSocket RPC 库介绍

starIconstarIconstarIconstarIconstarIcon

5.00/5 (19投票s)

2018 年 1 月 11 日

CPOL

8分钟阅读

viewsIcon

64538

用于 .NET 的 WebSocket RPC 库,支持自动生成 JavaScript 客户端代码,并兼容 ASP.NET Core。

摘要

引言

本文介绍的轻量级 WebSocket RPC 库,具备建立原始连接、全双工 RPC 和自动生成 JavaScript 客户端代码的能力。该库的设计目标是

  • 轻量级
    唯一的依赖是用于序列化/反序列化的 JSON.NET 库。

  • 简单
    只有两个相关的方法:用于绑定对象/接口到连接的 **Bind**,以及用于调用 RPC 的 **CallAsync**。

此外,它还可以

  • 使用第三方程序集作为 API
    实现的 API(如果仅用于 RPC)不必了解库的任何信息。库只是将连接绑定到一个期望的对象。

  • **自动生成 JavaScript 代码**(WebSocketRPC.JS 包)
    JavaScript WebSocket 客户端代码是从现有的 .NET 接口(API 合同)自动生成的(带有 JsDoc 注释)。

建立连接

通过 WebSocket 服务器或客户端实例化一个用于发送/接收用户定义或 RPC 消息的连接。服务器和客户端的功能都具有相同的参数:地址取消令牌连接回调。连接具有异步事件(OpenCloseReceiveError),以便在发生错误时进行适当的异常处理。

在下面的代码片段中,服务器发送一条消息,客户端显示它并关闭连接。

Server.ListenAsync(8000, CancellationToken.None, (c, wsContext) => 
{
  c.OnOpen    += ()     => c.SendAsync("Hello world");
  c.OnError   += err    => Task(() => Console.WriteLine("Error: " + err.Message));
})
.Wait(0);
Client.ConnectAsync("ws://:8000/", CancellationToken.None, c => 
{
  c.OnOpen    += ()        => Task(() => Console.WriteLine("Opened"));
  c.OnClose   += (s, d)    => Task(() => Console.WriteLine("Closed: " + s));
  c.OnError   += err       => Task(() => Console.WriteLine("Error: " + err.Message));
  c.OnReceive += async msg => { Console.WriteLine(msg); await c.CloseAsync() };
})
.Wait(0);

/*
 Output: 
   Opened
   Hello world
   Closed: NormalClosure
*/

如果使用独立的服务器/客户端(如上例所示),每个连接都关联着自己的长时间运行任务,因此在需要大量处理的情况下,所有 CPU 核心都会被利用。

进行远程过程调用

RPC 通过将对象/接口与连接关联来初始化。有两个基本方法构成了整个 WebSocketRPC 库 API 中最关键的部分:

  1. Bind<TObj>(TObj obj) - 用于将本地对象绑定到连接(传入调用)。该调用创建一个本地调用实例,负责解释传入的文本消息和对象方法调用。

    Bind<TInterface>() - 用于将接口绑定到连接(传出调用)。该调用创建一个远程调用实例,负责将类型安全的代码转换为发送到远程调用者的文本消息。

  2. CallAsync<TInterface>(...) - 用于调用远程函数。该调用位于静态 RPC 类中。

以下示例代码片段展示了如何使用这两组函数的示例。

RPC - 单向

让我们从单向 RPC 连接开始:客户端调用服务器函数。两部分(应用程序)都使用 .NET 实现。消息流如下所示。

'RPC-单向' 示例的消息流。

服务器

服务器实现了一个包含单个函数的 Math API:

class MathAPI //:IMathAPI
{
    public int Add(int a, int b)
    {
        return a + b;
    }
}

在 main 方法中,我们启动服务器并等待新连接。当有新连接可用时,通过 Bind(new MathAPI()) 调用将连接与创建的 API 对象关联起来。拥有单个共享 API 实例也是可能的。

Server.ListenAsync(8000, CancellationToken.None, 
                   (c, wc) => c.Bind<MathAPI>(new MathAPI()))
      .Wait(0);

客户端

客户端必须具有匹配的合同(接口):

interface IMathAPI
{
    int Add(int a, int b);
}

我们使用客户端连接到服务器。新创建的连接通过 Bind<IMathAPI>() 调用与 IMathAPI 接口关联。

Client.ConnectAsync("ws://:8000/", CancellationToken.None, 
                    (c, ws) => c.Bind<IMathAPI>())
      .Wait(0);

当连接打开时,使用包含所有绑定器的 static RPC 类调用远程函数。首先,使用 For<TInterface>() 调用选择所有与 IMathAPI 接口关联的连接,然后使用 CallAsync<IMathAPI>(...) 调用。由于我们只有一个客户端,因此只有一个连接,选择器 First() 会选择结果。

var apis = await RPC.For<IMathAPI>();   
int r = apis.CallAsync(x => x.Add(5, 3)).First(); //r = 8

RPC - 双向

双向绑定表示客户端-服务器的相互 RPC 调用(例如,客户端调用服务器的 API 方法,服务器调用客户端的进度更新方法)。其示意图如下所示。

'RPC-双向' 示例的消息流。

服务器

服务器的 TaskAPI 包含一个函数,该函数在执行过程中会更新进度,并仅报告给调用了该方法的客户端。

interface IProgressAPI
{
  void WriteProgress(float progress);
}

class TaskAPI //:ITaskAPI
{
  public async Task<int> LongRunningTask(int a, int b)
  {
    for (var p = 0; p <= 100; p += 5)
    {
       await Task.Delay(250);
       await RPC.For<IProgressAPI>(this).CallAsync(x => x.WriteProgress((float)p / 100));
    }
		
    return a + b;
  }
}

RPC.For(...) 选择器仅选择与 IProgressAPI 和此对象关联的连接。这种选择会过滤掉实现 IProgressAPI 但未实际调用方法的客户端。因此,只有发起调用的客户端才会收到进度更新。

await RPC.For<IRemoteAPI>(this).CallAsync(x => x.WriteProgress((float)p / 100));

当服务器启动并且新连接打开时,使用 Bind<TObj, TInterface>(TObj obj) 调用将连接绑定到本地(对象)和远程(接口)API。此调用只是以下调用的简写:Bind<TObj>(TObj obj); Bind<TInterface>();

Server.ListenAsync(8000, CancellationToken.None, 
                  (c, wc) => c.Bind<TaskAPI, IProgressAPI>(new TaskAPI()))
      .Wait(0);

客户端

客户端实现了 IProgressAPI,并拥有一个与服务器 TaskAPI 匹配的接口。

class ProgressAPI //:IProgressAPI
{
  void WriteProgress(float progress)
  {
      Console.Write("Completed: " + progress * 100 + "%\r");
  }
}

interface ITaskAPI
{
  Task<int> LongRunningTask(int a, int b);
}

使用 Client 类建立连接,并以与上一示例非常相似的方式绑定。主要区别在于客户端还实现了自己的 ProgressAPI,因此我们有了双向绑定。

Client.ConnectAsync("ws://:8000/", CancellationToken.None, 
                   (c, wc) => c.Bind<ProgressAPI, ITaskAPI>(new ProgressAPI()))
      .Wait(0);
...
var r = RPC.For<ITaskAPI>().CallAsync(x => LongRunningTask(5, 3)).First();
Console.WriteLine("Result: " + r);


/*
 Output:
   Completed: 0%
   Completed: 5%
     ...
   Completed: 100%
   Result: 8
*/ 

JavaScript 客户端

在许多场景中,客户端不是用 .NET 实现的,而是用 JavaScript 实现的。该库允许您从声明的接口或类创建客户端。创建的 API **也将拥有从 XML .NET 注释生成的 JsDoc 注释**(如果存在)- *XML 文件生成必须首先启用*。

服务器

让我们使用与双向绑定示例相同的服务器实现,但这次客户端将用 JavaScript 编写。GenerateCallerWithDoc<T> 函数会生成 JavaScript 代码,剩下的就是将代码保存到文件中。

//the server code is the same as in the previous sample

//generate JavaScript client (file)
var code = RPCJs.GenerateCallerWithDoc<TaskAPI>();
File.WriteAllText("TaskAPI.js", code);

'RPC-双向' 示例的自动生成的 JavaScript 客户端代码。

客户端

首先实例化包含 WebSocket RPC 代码的生成 API。为了实现 IProgressAPI 接口,API 实例只是扩展了必需的 writeProgress 函数。最后一步是调用 connect(onConnect) 函数并调用远程 API 函数。

//init API
var api = new TaskAPI("ws://:8001");

//implement the interface by extending the 'TaskAPI' object
api.writeProgress = function (p)
{
  console.log("Completed: " + p * 100 + "%");
  return true;
}

//connect and excecute (when connection is opened)
api.connect(async () => 
{
  var r = await api.longRunningTask(5, 3);
  console.log("Result: " + r);
});

序列化

到目前为止,只使用了简单的对象类型作为参数或返回值。在实际场景中,情况可能并非如此。

让我们设想一个图像处理 API。定义的函数 ProcessImage 返回一个 2D RGB 图像数组(Bgr<byte>[,] - DotImaging 框架)。

public class ImageProcessingAPI
{
    public Bgr<byte>[,] ProcessImage(Uri imgUri)
    {..}
}

为了传输图像数据,将 2D RGB 数组转换为 base-64 JPG 图像。序列化机制围绕 JSON.NET 构建,因此我们只需要编写一个简单的数据转换器:JpgBase64Converter

class JpgBase64Converter : JsonConverter
{
    private Type supportedType = typeof(Bgr<byte>[,]);
    ...
    //omitted for the simplicity and the lack of importance
}

最后,要注册转换器,只需调用 RPC.AddConverter(...) 并传入一个对象转换器实例。

RPC.AddConverter(new JpgBase64Converter());
...
//the rest of the Client/Server code

设置

设置可分为两组:连接设置和 RPC 设置。所有设置都是其各自类的静态成员。

连接设置

  • MaxMessageSize

    支持的最大消息大小(字节)。如果超过该数量,连接将以消息过大为由关闭。

  • 编码

    消息文本编码/解码。所有消息必须使用此编码传输才能正确解码。

RPC 设置

  • RpcTerminationDelay

    远程过程必须完成的最大时间,否则会抛出 OperationCancelledException

异常处理

如果远程过程中发生异常,消费者端也会抛出异常。这样,调试体验就类似于调试本地代码。

调试 RPC 调用时出现异常(.NET)。

调试 RPC 调用时出现异常(JavaScript)。

如果代码在未附加调试器的情况下运行,异常最终会出现在 OnError 事件中。所有事件都是基于异步的函数,以便更好地管理异常。

如果需要日志记录,只需使用连接提供的可以多次定义的事件之一。

HTTP 支持

在许多情况下,WebSocket 服务器/客户端也需要 HTTP 功能。服务器和客户端实现将 HTTP 请求处理程序实现为一个参数,如下所示:

static async Task ListenAsync(
                      ..., 
                      Func<HttpListenerRequest, HttpListenerResponse, Task> onHttpRequestAsync, 
                      ...) 
{...}   

static async Task ConnectAsync(
                    ..., 
                    Func<HttpListenerRequest, HttpListenerResponse, Task> onHttpRequestAsync, 
                    ...) 
{...}    

虽然可以直接使用,但为了保持简单,可以使用作为 NuGet 包提供的 SimpleHTTP 库。下面是一个示例:

//add GET route which returns the requested route as a text. (SimpleHTTP library)
Route.Add("/{route}", (rq, rp, args) => rp.AsText(args["route"]));    

//assign 'OnHttpRequestAsync' to the HTTP handler action.
Server.ListenAsync(8000, CancellationToken.None, (c, wsContext) => c.Bind(...), 
                   Route.OnHttpRequestAsync)
.Wait(0);

HTTPS/WSS 支持

要为独立服务器启用安全(HTTPS/WSS)连接,需要为 HttpListener 设置证书。将解释基于 Windows 的方法,因为在编写本文时(2018 年 1 月)通用操作系统支持尚未准备就绪。当前状态可在:Github Issue Tracker - .NET Core 中查看。

基于 Windows 的解决方案包括将证书导入本地证书存储,并使用 netsh 实用程序进行适当的 HTTPS 预留。该库包含两个脚本,位于 SimpleHTTP 库的存储库的 **脚本映射** 中。第一个脚本生成一个测试证书,另一个将证书导入存储区并进行 HTTPS 预留。如何在 Richard Astbury 的博客文章 中手动(非脚本方式)执行的步骤

ASP.NET Core 支持

ASP.NET 支持由 WebSocketRPC.AspCore NuGet 包提供。初始化在启动类中的 Configure 方法中完成。首先,通过调用 UseWebSockets() 来初始化套接字支持。最后,为我们可能拥有的每个 API 调用 MapWebSocketRPC(...) 扩展方法。该调用将一个路由与一个 API 连接关联起来。对于我们可能拥有的每个 API,都可以多次使用该调用。

class Startup
{
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        //the MVC initialization, etc.

        //initialize web-sockets
        app.UseWebSockets();
        //define route for a new connection and bind the API
        app.MapWebSocketRPC("/taskService", 
                            (httpCtx, c) => c.Bind<TaskAPI, IProgressAPI>(new TaskAPI()));
    }
}    

结论

本文介绍了轻量级 WebSocket RPC 库,该库具备建立原始连接、全双工 RPC 和自动生成 JavaScript 客户端代码的能力。主要目标是展示 RPC 绑定的简单概念,并通过示例演示库的功能。完整的示例在存储库中,等待测试和有效利用。:)

历史

  • 2018 年 1 月 11 日 - 发布第一个版本
© . All rights reserved.