使用 .NET Core 3.0 和 gRPC 实现双向消息流






4.56/5 (17投票s)
本文介绍了使用 .NET Core 3.0 和 gRPC 进行全双工消息流传输的简洁易用的基础设施。
引言
.NET Core 3.0 最重要的一个新特性是支持 gRPC 消息传递。 维基百科对 gRPC 的描述如下:
gRPC(gRPC 远程过程调用)是一个开源的远程过程调用 (RPC) 系统,最初由 Google 开发。它使用 HTTP/2 进行传输,使用 Protocol Buffers 作为接口描述语言,并提供身份验证、双向流和流控制、阻塞或非阻塞绑定以及取消和超时等功能。它能为多种语言生成跨平台的客户端和服务器绑定。最常见的用例包括以微服务风格架构连接服务,以及将移动设备、浏览器客户端连接到后端服务。
gRPC 的“杀手级功能”据说就是支持同时进行全双工流。HTTP/2 协议被用作此目的的传输协议。Protocol Buffers 用于高效的序列化。
如上所述,.NET Core 3.0 框架支持基于 gRPC 的通信。Visual Studio 2019 提供了 gRPC 服务的样板代码。但是,此代码仅提供单向通信,没有消息流。 该项目是当前工作的出发点。在我的代码中,我试图提取服务器和客户端的通用代码,并将其组合成一个支持使用 gRPC 和 .NET Core 3.0 进行双向消息流的便捷基础设施。
本文最初发布时,.NET Core 3.0 仍处于预发布版本。现在,该示例已使用 .NET Core 3.0 的正式发布版本进行了测试,无需进行任何更改。
软件描述
基础设施项目(DLL)GrpcServerHelper
和 GrpcClientHelper
位于 Helpers 文件夹中。它们可以被相应的服务器和客户端引用和使用。这些助手 DLL 是在假定客户端和服务器之间进行全双工 gRPC 消息交换的前提下开发的。但是,消息结构和处理没有限制。这两项功能都取决于具体的服务器和客户端。
gRPC 数据和处理在 proto 文件 communication.proto 中定义,该文件由具体的服务器和客户端共享。该文件提供了不同类型的客户端到服务器和服务器到客户端的消息,以及相应的 enum
和用于开始消息交换的函数 CreateStreaming()
。Visual Studio 2019 会从 proto 文件生成 C# 类。
服务器基础设施 (GrpcServerHelper)
类 GeneralGrpcService<TRequest, TResponse>
为服务器端提供了双向流的基础设施。该类以请求和响应消息类型 TRequest
和 TResponse
分别进行参数化。其方法 CreateDuplexStreaming()
处理主要的例程过程,如订阅者管理、请求消息处理、响应消息生成以及发送回客户端。
public async Task CreateDuplexStreaming(
IAsyncStreamReader<TRequest> requestStream,
IServerStreamWriter<TResponse> responseStream,
ServerCallContext context)
{
var httpContext = context.GetHttpContext();
Logger.LogInformation($"Connection id: {httpContext.Connection.Id}");
if (!await requestStream.MoveNext())
return;
var clientId = _messageProcessor.GetClientId(requestStream.Current);
Logger.LogInformation($"{clientId} connected");
var subscriber = new SubscribersModel<TResponse>
{
Subscriber = responseStream,
Id = $"{clientId}"
};
_serverGrpcSubscribers.AddSubscriber(subscriber);
do
{
if (requestStream.Current == null)
continue;
var resultMessage = _messageProcessor.Process(requestStream.Current);
if (resultMessage == null)
continue;
await _serverGrpcSubscribers.BroadcastMessageAsync(resultMessage);
} while (await requestStream.MoveNext());
_serverGrpcSubscribers.RemoveSubscriber(subscriber);
Logger.LogInformation($"{clientId} disconnected");
}
类 ServerGrpcSubscribersBase<TResponse>
管理订阅者(客户端),而 abstract class MessageProcessorBase<TRequest, TResponse>
暴露 abstract
方法 abstract TResponse Process(TRequest message)
用于处理请求消息并生成响应消息。为了灵活性,假定这些方法的具体实现会负责处理可能发生的任何异常。最后,class SubscribersModel<TResponse>
提供了一个订阅者模型,用于将响应消息发送回客户端。
客户端基础设施 (GrpcClientHelper)
唯一的 abstract class GrpcClientBase<TRequest, TResponse>
提供了 Do()
方法来确保从客户端进行消息流。
public async Task Do(
Channel channel,
Action onConnection = null,
Action onShuttingDown = null)
{
using (var duplex = CreateDuplexClient(channel))
{
onConnection?.Invoke();
var responseTask = Task.Run(async () =>
{
while (await duplex.ResponseStream.MoveNext(CancellationToken.None))
Console.WriteLine($"{duplex.ResponseStream.Current}");
});
string payload;
while (!string.IsNullOrEmpty(payload = MessagePayload))
await duplex.RequestStream.WriteAsync(CreateMessage(payload));
await duplex.RequestStream.CompleteAsync();
}
onShuttingDown?.Invoke();
await channel.ShutdownAsync();
}
GrpcServer 和 GrpcClient
具体的服务器 GrpcServer
是通过 Visual Studio 2019 的 ASP.MVC Core 模板创建的 gRPC 项目。它实现了 class MessageProcessor : MessageProcessorBase<RequestMessage, ResponseMessage>
和 class ServerGrpcSubscribers : ServerGrpcSubscribersBase<ResponseMessage>
,并扩展了 GrpcServerHelper
的相应存根类。MessageProcessor
类提供了客户端请求消息所需的处理,并在需要时发送响应消息。ServerGrpcSubscribers
类调整了订阅者管理(在我们的具体情况下,它与父类没有区别,仅用于说明目的)。这两个类 ServerGrpcSubscribers
和 MessageProcessor
都应该在 Startup
类的 ConfigureServices(IServiceCollection services)
方法中作为单例添加到 services
集合中。
services.AddSingleton<ServerGrpcSubscribers>();
services.AddSingleton<MessageProcessor>();
服务类 DuplexService
扩展了生成的 Messaging.MessagingBase
类型,应在 Startup.Configure()
方法中映射到端点,如下所示:
app.UseEndpoints(endpoints =>
{
// ...
endpoints.MapGrpcService<DuplexService>();
});
服务器和客户端支持交换加密消息(HTTPS)。为此,服务器在其 Kestrel 配置中使用 grpcServer.pfx 文件,客户端使用证书文件 certificate.crt。这两个文件都使用 OpenSSL 应用程序创建,该应用程序根据例如 此处提供的指南安装。
为了使用 Proto Buffer 序列化,服务器和客户端共享同一个 proto 文件 communication.proto。该文件定义了流式消息服务,
service Messaging
{
rpc CreateStreaming (stream RequestMessage) returns (stream ResponseMessage);
}
请求和响应消息格式以及相应的枚举器。
我未能找到一种方法在 Visual Studio 中将 proto 文件包含到服务器和客户端项目中以确保生成相应的 C# 代码。因此,我在 GrpcServer.csproj 和 GrpcClient.csproj 项目文件中手动执行了此操作。在这些文件中,我添加了用注释 <!-- PROTO: The following section was added manually -->
标记的部分。
运行示例
要运行示例,请生成整个解决方案(这将同时导致根据 proto 文件生成类)。然后使用...命令运行 GrpcServer...
dotnet GrpcServer.dll
...命令,并在其完全启动后,继续使用一个或多个...命令。
dotnet GrpcClient.dll
...命令。
在客户端控制台中,输入以 Enter 结尾的某条消息。消息将被服务器接收和处理(在 MessageProcessor.Process()
方法中有预留的处理代码位置,如您可能看到的注释所示)。如果您的消息包含问号 ?
,则服务器会将响应消息广播给其所有当前订阅者。客户端将在其控制台中输出响应。
结论
这项工作提出了使用 gRPC 过程在 .NET 3.0 中进行同时双向消息流的基础设施。使用此基础设施将具体服务器和客户端的代码减少到只有几行。希望这对我的尊敬的同行们有所帮助。
历史
- 2019 年 7 月 26 日:初版