65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 .NET Core 3.0 和 gRPC 实现双向消息流

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.56/5 (17投票s)

2019 年 7 月 26 日

CPOL

4分钟阅读

viewsIcon

42665

downloadIcon

1229

本文介绍了使用 .NET Core 3.0 和 gRPC 进行全双工消息流传输的简洁易用的基础设施。

引言

.NET Core 3.0 最重要的一个新特性是支持 gRPC 消息传递。 维基百科对 gRPC 的描述如下

gRPC(gRPC 远程过程调用)是一个开源的远程过程调用 (RPC) 系统,最初由 Google 开发。它使用 HTTP/2 进行传输,使用 Protocol Buffers 作为接口描述语言,并提供身份验证、双向流和流控制、阻塞或非阻塞绑定以及取消和超时等功能。它能为多种语言生成跨平台的客户端和服务器绑定。最常见的用例包括以微服务风格架构连接服务,以及将移动设备、浏览器客户端连接到后端服务。

gRPC 的“杀手级功能”据说就是支持同时进行全双工流。HTTP/2 协议被用作此目的的传输协议。Protocol Buffers 用于高效的序列化。

如上所述,.NET Core 3.0 框架支持基于 gRPC 的通信。Visual Studio 2019 提供了 gRPC 服务的样板代码。但是,此代码仅提供单向通信,没有消息流。 该项目是当前工作的出发点。在我的代码中,我试图提取服务器和客户端的通用代码,并将其组合成一个支持使用 gRPC 和 .NET Core 3.0 进行双向消息流的便捷基础设施。

本文最初发布时,.NET Core 3.0 仍处于预发布版本。现在,该示例已使用 .NET Core 3.0 的正式发布版本进行了测试,无需进行任何更改。

软件描述

基础设施项目(DLL)GrpcServerHelperGrpcClientHelper 位于 Helpers 文件夹中。它们可以被相应的服务器和客户端引用和使用。这些助手 DLL 是在假定客户端和服务器之间进行全双工 gRPC 消息交换的前提下开发的。但是,消息结构和处理没有限制。这两项功能都取决于具体的服务器和客户端。

gRPC 数据和处理在 proto 文件 communication.proto 中定义,该文件由具体的服务器和客户端共享。该文件提供了不同类型的客户端到服务器和服务器到客户端的消息,以及相应的 enum 和用于开始消息交换的函数 CreateStreaming()。Visual Studio 2019 会从 proto 文件生成 C# 类。

服务器基础设施 (GrpcServerHelper)

GeneralGrpcService<TRequest, TResponse> 为服务器端提供了双向流的基础设施。该类以请求和响应消息类型 TRequestTResponse 分别进行参数化。其方法 CreateDuplexStreaming() 处理主要的例程过程,如订阅者管理、请求消息处理、响应消息生成以及发送回客户端。

public async Task CreateDuplexStreaming(
	IAsyncStreamReader<TRequest> requestStream,
	IServerStreamWriter<TResponse> responseStream,
	ServerCallContext context)
{
    var httpContext = context.GetHttpContext();
    Logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

    if (!await requestStream.MoveNext())
        return;

    var clientId = _messageProcessor.GetClientId(requestStream.Current);
    Logger.LogInformation($"{clientId} connected");
    var subscriber = new SubscribersModel<TResponse>
    {
        Subscriber = responseStream,
        Id = $"{clientId}"
    };

    _serverGrpcSubscribers.AddSubscriber(subscriber);

    do
    {
        if (requestStream.Current == null)
            continue;

        var resultMessage = _messageProcessor.Process(requestStream.Current);
        if (resultMessage == null)
            continue;

        await _serverGrpcSubscribers.BroadcastMessageAsync(resultMessage);
    } while (await requestStream.MoveNext());

    _serverGrpcSubscribers.RemoveSubscriber(subscriber);
    Logger.LogInformation($"{clientId} disconnected");
}

ServerGrpcSubscribersBase<TResponse> 管理订阅者(客户端),而 abstract class MessageProcessorBase<TRequest, TResponse> 暴露 abstract 方法 abstract TResponse Process(TRequest message) 用于处理请求消息并生成响应消息。为了灵活性,假定这些方法的具体实现会负责处理可能发生的任何异常。最后,class SubscribersModel<TResponse> 提供了一个订阅者模型,用于将响应消息发送回客户端。

客户端基础设施 (GrpcClientHelper)

唯一的 abstract class GrpcClientBase<TRequest, TResponse> 提供了 Do() 方法来确保从客户端进行消息流。

public async Task Do(
	Channel channel, 
	Action onConnection = null, 
	Action onShuttingDown = null)
{
    using (var duplex = CreateDuplexClient(channel))
    {
        onConnection?.Invoke();

        var responseTask = Task.Run(async () =>
        {
            while (await duplex.ResponseStream.MoveNext(CancellationToken.None))
            Console.WriteLine($"{duplex.ResponseStream.Current}");
        });

        string payload;
        while (!string.IsNullOrEmpty(payload = MessagePayload))
        await duplex.RequestStream.WriteAsync(CreateMessage(payload));

        await duplex.RequestStream.CompleteAsync();
    }

    onShuttingDown?.Invoke();
    await channel.ShutdownAsync();
}

GrpcServer 和 GrpcClient

具体的服务器 GrpcServer 是通过 Visual Studio 2019 的 ASP.MVC Core 模板创建的 gRPC 项目。它实现了 class MessageProcessor : MessageProcessorBase<RequestMessage, ResponseMessage>class ServerGrpcSubscribers : ServerGrpcSubscribersBase<ResponseMessage>,并扩展了 GrpcServerHelper 的相应存根类。MessageProcessor 类提供了客户端请求消息所需的处理,并在需要时发送响应消息。ServerGrpcSubscribers 类调整了订阅者管理(在我们的具体情况下,它与父类没有区别,仅用于说明目的)。这两个类 ServerGrpcSubscribersMessageProcessor 都应该在 Startup 类的 ConfigureServices(IServiceCollection services) 方法中作为单例添加到 services 集合中。

services.AddSingleton<ServerGrpcSubscribers>();
services.AddSingleton<MessageProcessor>();

服务类 DuplexService 扩展了生成的 Messaging.MessagingBase 类型,应在 Startup.Configure() 方法中映射到端点,如下所示:

app.UseEndpoints(endpoints =>
{
    // ...
    endpoints.MapGrpcService<DuplexService>();
});

服务器和客户端支持交换加密消息(HTTPS)。为此,服务器在其 Kestrel 配置中使用 grpcServer.pfx 文件,客户端使用证书文件 certificate.crt。这两个文件都使用 OpenSSL 应用程序创建,该应用程序根据例如 此处提供的指南安装。

为了使用 Proto Buffer 序列化,服务器和客户端共享同一个 proto 文件 communication.proto。该文件定义了流式消息服务,

service Messaging
{
    rpc CreateStreaming (stream RequestMessage) returns (stream ResponseMessage);
}

请求和响应消息格式以及相应的枚举器。

我未能找到一种方法在 Visual Studio 中将 proto 文件包含到服务器和客户端项目中以确保生成相应的 C# 代码。因此,我在 GrpcServer.csprojGrpcClient.csproj 项目文件中手动执行了此操作。在这些文件中,我添加了用注释 <!-- PROTO: The following section was added manually --> 标记的部分。

运行示例

要运行示例,请生成整个解决方案(这将同时导致根据 proto 文件生成类)。然后使用...命令运行 GrpcServer...

dotnet GrpcServer.dll

...命令,并在其完全启动后,继续使用一个或多个...命令。

dotnet GrpcClient.dll

...命令。

在客户端控制台中,输入以 Enter 结尾的某条消息。消息将被服务器接收和处理(在 MessageProcessor.Process() 方法中有预留的处理代码位置,如您可能看到的注释所示)。如果您的消息包含问号 ?,则服务器会将响应消息广播给其所有当前订阅者。客户端将在其控制台中输出响应。

结论

这项工作提出了使用 gRPC 过程在 .NET 3.0 中进行同时双向消息流的基础设施。使用此基础设施将具体服务器和客户端的代码减少到只有几行。希望这对我的尊敬的同行们有所帮助。

历史

  • 2019 年 7 月 26 日:初版
© . All rights reserved.