65.9K
CodeProject 正在变化。 阅读更多。
Home

双工 gRPC

starIconstarIconstarIconstarIconstarIcon

5.00/5 (17投票s)

2020年11月27日

CPOL

8分钟阅读

viewsIcon

33579

关于如何创建 C# 双工(流式)gRPC 客户端/服务器的小型演示。

目录

引言

所以,如果你接触编程有一段时间了,你肯定会遇到过执行某种远程过程调用的需求。有很多框架允许这样做,比如 WebApi、WCF,几乎你可以想到的任何 REST API,Service Stack 等等。

这一切都相当标准,我们调用远程系统的某个方法,传入一些参数(或不传),然后得到一个响应(或不得到)。

那些也使用过 WCF 的人会记得,WCF 也支持双工通道,其中服务器端代码能够通过回调通道回叫客户端。这对于通知/价格更新之类的事情非常有用。

现在 WCF 已经过时,而且基本上只与 Windows 绑定,REST 就是 REST,这一点无可争议。然而,随着时间的推移,新框架的出现是自然的,Google 的人就推出了一个名为 gRPC 的框架,它允许你用他们熟知的 protobuf 语法定义你的契约,然后使用 Go/C++/Java/Python/C# 来构建基于 proto 文件的可用服务。

虽然有很多关于 gRPC 的优秀教程,但关于双工通信的教程却不多。本文将尝试演示双工 gRPC,并解释一些核心的 gRPC 概念。

代码在哪里?

本文的代码可以在 https://github.com/sachabarber/DuplexGRPC 找到。

什么是 GPC

本节内容摘自 此处的 入门指南。

在 gRPC 中,客户端应用程序可以直接调用不同机器上服务器应用程序的方法,就像调用本地对象一样,从而更容易创建分布式应用程序和服务。与许多 RPC 系统一样,gRPC 的基础是定义一个服务,指定可以远程调用的方法及其参数和返回类型。在服务器端,服务器实现此接口并运行 gRPC 服务器来处理客户端调用。在客户端,客户端有一个存根(在某些语言中称为客户端),它提供与服务器相同的方法。

gRPC 客户端和服务器可以在各种环境中运行和通信 - 从 Google 内部的服务器到你自己的桌面 - 并且可以用 gRPC 支持的任何语言编写。因此,例如,你可以轻松地用 Java 创建一个 gRPC 服务器,用 Go、Python 或 Ruby 编写客户端。此外,最新的 Google API 将有 gRPC 版本,可以让你轻松地将 Google 功能集成到你的应用程序中。

使用 Protocol Buffers

默认情况下,gRPC 使用 Protocol Buffers,这是 Google 成熟的开源结构化数据序列化机制(尽管它也可以用于 JSON 等其他数据格式)。以下是其工作原理的快速介绍。如果你已经熟悉 Protocol Buffers,可以跳到下一节。

使用 Protocol Buffers 的第一步是在 proto 文件中定义要序列化的数据结构:这是一个普通的文本文件,扩展名为 .proto。Protocol Buffer 数据被组织成消息,每个消息都是一个小的逻辑信息记录,包含一系列称为字段的名称-值对。这是一个简单的例子

message Person {
  string name = 1;
  int32 id = 2;
  bool has_ponycopter = 3;
}

然后,一旦你指定了数据结构,就可以使用 Protocol Buffer 编译器 protoc 从 proto 定义中生成你喜欢的语言(或多种语言)的数据访问类。这些类提供了每个字段的简单访问器,如 name()set_name(),以及将整个结构序列化/反序列化为/从原始字节的方法。因此,例如,如果你的首选语言是 C++,在上面的例子上运行编译器将生成一个名为 Person 的类。然后你可以在你的应用程序中使用这个类来填充、序列化和检索 Person Protocol Buffer 消息。你在普通的 proto 文件中定义 gRPC 服务,RPC 方法的参数和返回类型被指定为 Protocol Buffer 消息。

// The greeter service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

gRPC 使用 protoc 和一个特殊的 gRPC 插件来从你的 proto 文件生成代码:你将获得生成的 gRPC 客户端和服务器代码,以及用于填充、序列化和检索消息类型的常规 Protocol Buffer 代码。你将在下面看到一个例子。

要了解更多关于 Protocol Buffers 的信息,包括如何在您选择的语言中安装带有 gRPC 插件的 protoc,请参阅 Protocol Buffers 文档

深入了解 C# gRPC 流

你可以在 这里 了解更多。

演示

我创建了一个小型演示项目,你可以用它来跟随下面的步骤。

契约

如上所述,你需要从 Proto 文件定义开始,所以对于 C# 项目,这也意味着你需要以下 Nugets。

  • Google.Protobuf:允许你创建 proto 文件,并对它们进行编译。
  • Grpc:允许你编写 gRPC 服务。
  • Grpc.Tools:允许基于你的 proto 文件自动生成代理。

有了这三个 nugets,我们就可以开始创建一个小型服务了。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.demo";
option java_outer_classname = "DemoProto";
option objc_class_prefix = "DMO";

import "google/protobuf/empty.proto";

package demos;

service PubSub {
	rpc GetAnEvent(google.protobuf.Empty) returns (Event) {}
	rpc Subscribe(Subscription) returns (stream Event) {}
	rpc Unsubscribe(Subscription) returns (Unsubscription) {}
}

message Event
{
	string Value = 1;
}
message Subscription
{
	string Id = 1;
}
message Unsubscription
{
	string Id = 1;
}

这个小小的 proto 文件包含一个带有 3 个方法的服务,以及可能在服务中使用的几个数据类型。眼尖的你会看到其中的工作流(work stream)作为返回类型。这就是我们想要允许服务器端流式传输时使用的语法。我们稍后将看到它如何影响代理生成。现在,我强烈建议你们所有人阅读 gRPC 的这一页

演示项目包含哪些内容?

演示解决方案实际上包含以下三个项目:

  • Grpc.Demo:proto 文件
  • Grpc.Demo.Client:客户端代码
  • Grpc.Demo.Server:服务器代码

我们将在下面深入探讨所有这些内容。

请求/响应

进行简单的请求响应非常简单,我们可以直接在客户端/服务器中使用 Grpc.Tools 编译自动生成的代理来完成。

客户端代码

Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
var client = new PubSub.PubSubClient(channel);
var reply = client.GetAnEvent(new Empty());
Console.WriteLine($"GetAnEvent : {reply}");

服务器代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Demos;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

namespace Grpc.Demo.Server
{
    public class PubSubImpl : PubSub.PubSubBase
    {
        public override Task<Event> GetAnEvent(Empty request, ServerCallContext context)
        {
            return Task.FromResult(new Event { Value  = DateTime.Now.ToLongTimeString() });
        }
    }

    class Program
    {
        const int Port = 50051;

        public static void Main(string[] args)
        {
            //var subManager = new SubscriptionManager();
            var service = new PubSubImpl();
            Core.Server server = new Core.Server
            {
                Services = { Demos.PubSub.BindService(service) },
                Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) }
            };
            server.Start();
            Console.ReadKey();
            server.ShutdownAsync().Wait();
        }
    }
}

这相当简单易懂,Grpc.Tools 编译完成它的工作,为我们创建了所需的代理/基类,然后只需在服务器端托管它,并在客户端调用它,很简单。

然而,当你想要在 gRPC 上进行双工(服务器发送消息)通信时,事情就开始变得有点意思了。正如我在引言中所说,有很多关于 gRPC 的文章,但很少有关于双工通信的。下一节将讨论这个。

双工请求 / 服务器推送

现在,如果你检查 gRPC 的文档,它会这样说:

服务器流式 RPC,客户端向服务器发送请求,并获得一个流来读取一系列消息。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个 RPC 调用中的消息顺序。

有了这个 proto 定义:

rpc ListFeatures(Rectangle) returns (stream Feature) {}

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

message Rectangle {
  Point lo = 1;
  Point hi = 2;
}

message Feature {
  string name = 1;
  Point location = 2;
}

你会看到这个示例指导代码。

public override async Task ListFeatures
 (Rectangle request, IServerStreamWriter<Feature> responseStream, ServerCallContext context)
        {
            var responses = features.FindAll
                ( (feature) => feature.Exists() && request.Contains(feature.Location) );
            foreach (var response in responses)
            {
                await responseStream.WriteAsync(response);
            }
        }

在这里,你可以看到我们在 ListFeatures 方法的上下文中编写了 responseStream。这似乎很好,但如果你曾经处理过一个需要稍后从服务器发送东西回客户端的严肃服务,你很可能不会处于原始调用方法的上下文中。实际上,你可能会远离这个你调用的原始方法很多类/逻辑路径。甚至 微软官方的双工 WCF -> gRPC 指南 也利用了这个方法上下文来将结果发送回客户端,其中微软的指导显示了这个:

public override async Task Subscribe(SubscribeRequest request, 
IServerStreamWriter<StockTickerUpdate> responseStream, ServerCallContext context)
    {
        var subscriber = _subscriberFactory.GetSubscriber(request.Symbols.ToArray());

        subscriber.Update += async (sender, args) =>
            await WriteUpdateAsync(responseStream, args.Symbol, args.Price);

        await AwaitCancellation(context.CancellationToken);
    }    

这个例子使用了事件等来实现,在我看来这也是一个次优的解决方案。我想要的是能够将某个东西添加到某个缓冲区,然后流式 gRPC 方法就会拾取它并将其发送回客户端。

我实现了这一点,这是我做到的方式。

第一步:订阅

我们需要一个客户端发送的订阅对象。

using Demos;

namespace Grpc.Demo.Server
{
    public class SubscriptionEvent
    {
        public Event Event { get; set; }
        public string SubscriptionId { get; set; }
    }
}

第二步:客户端需要发送订阅并处理订阅回调

对我来说,这意味着创建一个如下的小类:

public class Subscriber
{
	private static Demos.PubSub.PubSubClient _pubSubClient;
	private Subscription _subscription;

	public Subscriber(Demos.PubSub.PubSubClient pubSubClient)
	{
		_pubSubClient = pubSubClient;
	}

	public async Task Subscribe(string subscriptionId)
	{
		_subscription = new Subscription() { Id = subscriptionId };
		Console.WriteLine($">> SubscriptionId : {subscriptionId}");
		using (var call = _pubSubClient.Subscribe(_subscription))
		{
			//Receive
			var responseReaderTask = Task.Run(async () =>
			{
				while (await call.ResponseStream.MoveNext())
				{
					Console.WriteLine("Event received: " + call.ResponseStream.Current);
				}
			});

			await responseReaderTask;
		}
	}

	public void Unsubscribe()
	{
		_pubSubClient.Unsubscribe(_subscription);
	}
}

然后我可以在客户端代码中像这样使用它:

class Program
{
	public static void Main(string[] args)
	{
		Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure);
		var client = new PubSub.PubSubClient(channel);
		var subscriber = new Subscriber(client);

		Task.Run(async () =>
		{
			await subscriber.Subscribe(Guid.NewGuid().ToString("N"));
		}).ConfigureAwait(false).GetAwaiter();		

		Console.WriteLine("Hit key to unsubscribe");
		Console.ReadLine();

		subscriber.Unsubscribe();

		Console.WriteLine("Unsubscribed...");

		Console.WriteLine("Hit key to exit...");
		Console.ReadLine();
	}
}

所以,客户端方面就到此为止,现在转到服务器端。

第三步:服务器端

那么,既然我们有了来自客户端的订阅,服务器应该如何处理它呢?请记住,我想要的是能够将数据推送到某个缓冲区,并将这些数据发送回客户端。我想要能够从任何地方、任何线程执行此操作。

那么如何实现呢?我们可以利用 DataFlow API,特别是 BufferBlock<T> 类,它提供了一个用于存储 Dataflow 数据的缓冲区。通过使用这个类,我们可以启动一个线程将数据发送到客户端。这个线程实际上只是为了表明我们可以强制从服务器将数据发送到客户端,并且是在 gRPC 调用范围之外,并且在一个你选择的新线程中。

好的,这就是我的做法。这是相关的服务器端代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Demos;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

namespace Grpc.Demo.Server
{
    public class PubSubImpl : PubSub.PubSubBase
    {
        private readonly BufferBlock<SubscriptionEvent> _buffer = 
                                           new BufferBlock<SubscriptionEvent>();

        public PubSubImpl()
        {
            SubscriberWritersMap = new Dictionary<string, IServerStreamWriter<Event>>();
        }

        public override async Task Subscribe(Subscription subscription, 
               IServerStreamWriter<Event> responseStream, ServerCallContext context)
        {
            //Dict to hold a streamWriter for each subscriber.
            SubscriberWritersMap[subscription.Id] = responseStream;

            while (SubscriberWritersMap.Count > 0)
            {
                //Wait on BufferBlock from MS Dataflow package.
                var subscriptionEvent = await _buffer.ReceiveAsync();
                if (SubscriberWritersMap.ContainsKey(subscriptionEvent.SubscriptionId))
                {
                    await SubscriberWritersMap[subscriptionEvent.SubscriptionId].WriteAsync
                                                                 (subscriptionEvent.Event);
                }
            }
        }

        public override Task<Unsubscription> Unsubscribe
                        (Subscription request, ServerCallContext context)
        {
            SubscriberWritersMap.Remove(request.Id);
            return Task.FromResult(new Unsubscription() { Id = request.Id });
        }

        public void Publish(SubscriptionEvent subscriptionEvent)
        {
            _buffer.Post(subscriptionEvent);
        }

        public Dictionary<string, IServerStreamWriter<Event>> SubscriberWritersMap 
                         { get; private set; }       
    } 

    class Program
    {
        const int Port = 50051;

        public static void Main(string[] args)
        {
            //var subManager = new SubscriptionManager();

            var service = new PubSubImpl();
            Core.Server server = new Core.Server
            {
                Services = { Demos.PubSub.BindService(service) },
                Ports = { new ServerPort("localhost", Port, ServerCredentials.Insecure) }
            };
            server.Start();

            bool shouldRun = true;

            Random rand = new Random(1000);

			//Create a new thread here to simulate random pushes back to the clients
			//this will randomly pick a subscriber to push data to
            Thread t = new Thread(() =>
            {
                while (shouldRun)
                {
                    if (service.SubscriberWritersMap.Any())
                    {
                        var indexedKeys = service.SubscriberWritersMap.Select((kvp, idx) => new
                        {
                            Idx = idx,
                            Key = kvp.Key

                        });

                        var subscriptionIdx = rand.Next(service.SubscriberWritersMap.Count);
                        var randomSubscriptionId = indexedKeys.Single
                                                   (x => x.Idx == subscriptionIdx).Key;
                        service.Publish(new SubscriptionEvent()
                        {
                            Event = new Event()
                                {Value = $"And event for '{randomSubscriptionId}' 
                                         {Guid.NewGuid().ToString("N")}"},
                            SubscriptionId = randomSubscriptionId
                        });
                    }

                    Thread.Sleep(2000);
                }
            });
            t.Start();

            Console.WriteLine("Server listening on port " + Port);
            Console.WriteLine("Press any key to stop the server...");

            Console.ReadKey();
            shouldRun = false;

            server.ShutdownAsync().Wait();
        }
    }
}

有了这个,我们就可以启动几个客户端,并看到服务器端将随机数据回传给客户端。

点击查看大图

结论

gRPC 确实很酷,而且能够使用多种语言也很棒,我希望这篇文章能展示如何更好地控制双工 gRPC 服务,使其更像传统的发布-订阅式 RPC(例如 WCF)。

历史

  • 2020 年 11 月 27 日:初始版本
© . All rights reserved.