gRPC 在 C#、JavaScript 和 Python 中的简单示例





5.00/5 (10投票s)
本文提供了创建 Google RPC 的客户端和服务器代码的简单示例。
引言
gRPC 的优势
微软几乎放弃了 WCF,没有将其代码包含在 .NET CORE 中。不同进程之间(可能运行在不同机器上)进行远程通信的最佳且最流行的解决方案是 gRPC 或 Google Remote Procedure Calls。
gRPC 具有以下优点
- GRPC 适用于所有流行平台,支持大多数编程语言,包括但不限于
- C#
- Java
- JavaScript/TypeScript
- Python
- Go
- Objective-C
- gRPC 非常快速且占用带宽小 - 数据由 Protobuffer 软件以最合理的方式打包。
- gRPC 及其底层 Protobuffer 非常易于学习和使用。
- 除了常规的请求-回复模式外,gRPC 还使用发布/订阅模式,订阅的客户端可以接收异步发布的邮件流。这样的流可以轻松转换为 Rx 的
IObservable
,并相应地应用所有允许的 Rx LINQ 转换。
Proto 文件
gRPC 和 Protobuffer 使用简单的 .proto 文件来定义服务和消息。或者,在 C# 和其他语言中,可以使用代码优先方法,从带有属性的语言代码定义 gRPC 服务和消息。
当一个人希望在所有 gRPC 客户端和服务器之间坚持使用同一种语言时,代码优先方法会更好。我更感兴趣的是不同语言编写的客户端可以访问用 C# 编写的同一服务器的情况。特别是,我对 C#、Python 和 JavaScript/TypeScript 客户端感兴趣。因此,本文中的所有示例都将使用 .proto 文件来定义消息和服务。
文章大纲
本文介绍了两个示例——一个演示请求/回复模式,另一个演示发布/订阅模式。对于每个示例,服务器都用 C# 编写,客户端用三种不同的语言编写:C#、NodeJS(JavaScript)和 Python。
代码位置
所有示例代码都位于 NP.Samples 的 GRPC 文件夹下。下面所有的文件夹引用都将相对于存储库的 GRPC 文件夹。
简单(请求/回复)gRPC 示例
SimpleGrpcServer.sln 解决方案位于 SimpleRequestReplySample\SimpleGrpcServer 文件夹下。该解决方案包含五个项目
Protos
- 包含 service.proto gRPC protobuffer 文件SimpleGrpcServer
- C# Grpc 服务器SimpleGrpcClient
- C# Grpc 客户端SimpleNodeJSGrpcClient
- Node JS Grpc 客户端SimplePythonGrpcClient
- Python Grpc 客户端
Protos 项目
Protos.csproj 项目是一个 .NET 项目,将其 service.proto 文件编译成 C# 服务器和客户端项目使用的 .NET 代码。非 C# 项目只需使用其 service.proto 文件生成相应语言的客户端存根。
Protos.csproj 引用了三个 nuget 包——Google.Protobuf
、Grpc
和 Grpc.Tools
。
我提出的例子是一个非常流行的 Greeter Grpc 服务示例,可以在例如 .NET 中的 gRPC 概述和 gRPC 快速入门中找到。
客户端发送一个名字,例如 "Joe Doe
" 到服务器,服务器回复 "Hello Joe Doe
" 消息。
// taken from
// https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/simple_hello_world/
syntax = "proto3";
package greet;
service Greeter
{
// client takes HelloRequest and returns HelloReply
rpc SayHello (HelloRequest) returns (HelloReply);
}
// HelloRequest has only one string field - name
message HelloRequest
{
string name = 1;
}
// HelloReply has only one string name - msg
message HelloReply
{
string msg = 1;
}
proto 文件本质上包含消息(请求和回复)和服务 Greeter
。请注意,消息非常类似于 Java 或 C#,只有字段的名称后面跟着数字——例如,string name = 1;
。数字在同一消息的每个字段中都必须是唯一的,并将用于按数字顺序存储和恢复消息字段。
在我们的例子中,每个消息只有一个字段,因此任何数字都可以(我们使用数字 1 来表示这是消息中的第一个(也是唯一一个)字段)。
服务 Greeter
包含一个名为 SayHello
的 rpc
(远程过程调用),它以消息 HelloRequest
作为输入,并返回 HelloReply
消息作为输出。请注意,虽然 RPC 的接口由 proto 文件定义,但实现仍然取决于服务器。
为了让 Visual Studio 自动生成 .NET 代码,创建客户端和服务器存根,service.proto 文件的 BuildAction
(生成操作)应设置为 "Protobuf Compiler
"(一旦您引用 Grpc.Tools
,此选项将出现在生成操作中)。
如上所述,存根将仅为 C# 客户端和服务器自动生成。其他语言将使用自己的方法直接从 service.proto 文件生成存根。
运行 C# 服务器和 C# 客户端
要运行服务器,请在 **Solution Explorer** 中右键单击 **SimpleGrpcServer** 项目,然后选择 **Debug**->**Start Without Debugging**(调试->不带调试运行)。
将打开一个空的命令提示符(因为服务器是一个控制台应用程序)。
现在从同一解决方案运行 C# 客户端(通过在 **Solution Explorer** 中右键单击 **SimpleGrpcClient** 并选择 **Debug** -> **Run Without Debugging**(调试->运行而不调试)。
客户端将显示从服务器返回的 "Hello C# Client
" string
。
SimpleGrpcClient 代码
所有 C# Client
代码都位于 SimpleGrpcClient
项目的 Program.cs 文件中。
using Grpc.Core;
using static Greet.Greeter;
// get the channel connecting the client to the server
var channel = new Channel("localhost", 5555, ChannelCredentials.Insecure);
// create the GreeterClient service
var client = new GreeterClient(channel);
// call SetHello RPC on the server asynchronously and wait for the reply.
var reply =
await client.SayHelloAsync(new Greet.HelloRequest { Name = "C# Client" });
// print the Msg within the reply.
Console.WriteLine(reply.Msg);
SimpleGrpcServer 代码
服务器代码包含在两个文件中——GreeterImplementation.cs 和 Program.cs。
GreeterImplementation
- 是一个派生自 abstract GreeterBase
(生成的服务器存根类)的类。它提供了 SayHello(...)
方法的实现(该方法在超类 GreeterBase
中是 abstract
的。这里的实现会在 request.Name
中包含的内容前面加上 "Hello
" string
。
internal class GreeterImplementation : Greeter.GreeterBase
{
// provides implementation for the abstract method SayHello(...)
// from the generated server stub
public override async Task<HelloReply> SayHello
(
HelloRequest request,
ServerCallContext context)
{
// return HelloReply with Msg consisting of the word Hello
// and the name passed by the request
return new HelloReply
{
Msg = $"Hello {request.Name}"
};
}
}
这是 Program.cs 代码。
using Greet;
using Grpc.Core;
using SimpleGrpcServerTest;
// create GreeterImplementation object providing the
// RPC SayHello implementation
GreeterImplementation greeterImplementation = new GreeterImplementation();
// bind the server with the greeterImplementation so that SayHello RPC called on
// the server will be channeled over to greeterImplementation.SayHello
Server server = new Server
{
Services = { Greeter.BindService(greeterImplementation) }
};
// set the server host, port and security (insecure)
server.Ports.Add(new ServerPort("localhost", 5555, ServerCredentials.Insecure));
// start the server
server.Start();
// wait with shutdown until the user presses a key
Console.ReadLine();
// shutdown the server
server.ShutdownAsync().Wait();
最重要的行是将服务和 GreeterImplementation
绑定的那一行。
// bind the server with the greeterImplementation so that SayHello RPC called on
// the server will be channeled over to greeterImplementation.SayHello
Server server = new Server
{
Services = { Greeter.BindService(greeterImplementation) }
};
Node JS gRPC 客户端
要安装所需的包,请在 Solution Explorer 中右键单击 SimpleNodeJSGrpcClient
项目下的 npm,然后选择 "Install npm packages"(安装 npm 包)菜单项。
最后,右键单击 SimpleNodeJSGrpcClient
项目,然后选择 **Debug**->**Start Without Debugging**(调试->不带调试运行)。
客户端控制台应打印从服务器返回的 "Hello Java Script
" string
。
这是 Node JS 客户端的代码(带注释)。
module;
// import grpc functionality
let grpc = require('@grpc/grpc-js');
// import protoLoader functionality
let protoLoader = require('@grpc/proto-loader');
// load the services from service.proto file
const root =
protoLoader.loadSync
(
'../Protos/service.proto', // path to service.proto file
{
keepCase: true, // service loading parameters
longs: String,
enums: String,
defaults: true,
oneofs: true
});
// get the client package definitions for greet package
// defined within the services.proto file
const greet = grpc.loadPackageDefinition(root).greet;
// connect the client to the server
const client = new greet.Greeter("localhost:5555", grpc.credentials.createInsecure());
// call sayHello RPC passing "Java Script" as the name parameter
client.sayHello({ name: "Java Script" }, function (err, response) {
// obtain the response and print its msg field
console.log(response.msg);
});
// prevent the program from exiting right away
var done = (function wait() { if (!done) setTimeout(wait, 1000) })();
Python gRPC 客户端
要准备 Python gRCP 客户端解决方案,首先在 SimplePythonGrpcClient
项目下右键单击 Python Environment 下的 **env**,然后选择 "Install from requirements.txt"(从 requirements.txt 安装)(仅需执行一次)。
这将恢复所有必需的 Python 包。
然后,您可以以与任何其他项目相同的方式运行它,只需确保服务器已在运行。
客户端 Python 项目应在控制台窗口中打印 "Hello Python
" string
。
这是 Python 代码(在 Python 注释中解释)。
# import require packages
import grpc
import grpc_tools.protoc
# generate service_pb2 (for proto messages) and
# service_pb2_grpc (for RPCs) stubs
grpc_tools.protoc.main([
'grpc_tools.protoc',
'-I{}'.format("../Protos/."),
'--python_out=.',
'--grpc_python_out=.',
'../Protos/service.proto'
])
# import the generated stubs
import service_pb2;
import service_pb2_grpc;
# create the channel connecting to the server at localhost:5555
channel = grpc.insecure_channel('localhost:5555')
# get the server gRCP stub
greeterStub = service_pb2_grpc.GreeterStub(channel)
# call SayHello RPC on the server passing HelloRequest message
# whose name is set to 'Python'
response = greeterStub.SayHello(service_pb2.HelloRequest(name='Python'))
# print the result
print(response.msg)
简单中继 gRPC 示例
我们的下一个示例演示了发布/订阅 gRPC 架构。简单的中继服务器将客户端发布的邮件传递给订阅它的每个客户端。
该示例的解决方案 StreamingRelayServer.sln 位于 StreamingSample/StreamingRelayServer 文件夹下。
启动解决方案,您会发现它由服务器项目 - StreamingRelayServer
、包含 protobuf service.proto 文件的 Protos
项目以及三个文件夹组成:CSHARP、NodeJS 和 Python。其中每个文件夹都将包含两个客户端——发布客户端和订阅客户端。
Relay 示例 Protos
与上一个示例相同,service.proto 文件仅为 C# 项目编译成 .NET——Python 和 NodeJS 客户端使用它们自己的机制来解析文件。
// gRPC service (RelayService)
service RelayService
{
// Publish RPC - takes a Message with a msg string field
rpc Publish (Message) returns (PublishConfirmed) {}
// Subscribe RPC take SubscribeRequest and returns a stream
// of Message objects
rpc Subscribe(SubscribeRequest) returns (stream Message){}
}
// Relay Message class that
// contains a single msg field
message Message
{
string msg = 1;
}
// Empty class used to confirm that Published Message has been received
message PublishConfirmed
{
}
// Empty message that requests a subscription to Relay Messages.
message SubscribeRequest
{
}
请注意,Subscribe
rpc 返回的是 Messages
的一个**流**(而不是单个 Message
)。
运行服务器和客户端
要启动服务器,请右键单击 StreamingRelayServer
项目,然后选择 **Debug**->**Start Without Debugging**(调试->不带调试运行)。客户端应以完全相同的方式启动。为了观察到有事情发生,您需要先启动订阅客户端,然后再启动发布客户端。
例如,启动服务器,然后启动 C# CSHARP/SubscribeSample。然后运行 CSHARP/PublishSample。订阅客户端将在控制台窗口中打印 "Published from C# Client
"。
请记住,在第一次启动 Node JS 项目之前,它们必须先构建(以下载 JavaScript 包)。另外,在第一次启动 Python 项目之前,您需要右键单击其 **Python Environments**->**env**,然后选择 "Install from requirements.txt"(从 requirements.txt 安装)来下载和安装 Python 包。
C# 发布客户端
发布客户端代码包含在 PublishSample
项目的 Program.cs 文件中。这是 C# 发布客户端的带注释代码。
using Grpc.Core;
using Service;
using static Service.RelayService;
// Channel contains information for establishing a connection to the server
Channel channel = new Channel("localhost", 5555, ChannelCredentials.Insecure);
// create the RelayServiceClient from the channel
RelayServiceClient client = new RelayServiceClient(channel);
// call PublishAsync and get the confirmation reply
PublishConfirmed confirmation =
await client.PublishAsync(new Message { Msg = "Published from C# Client" });
C# 订阅客户端
订阅客户端代码位于 SubscribeSample
项目的 Program.cs 文件中。它从服务器获取 replies
流,并打印该流中的消息。
// channel contains info for connecting to the server
Channel channel = new Channel("localhost", 5555, ChannelCredentials.Insecure);
// create RelayServiceClient
RelayServiceClient client = new RelayServiceClient(channel);
// replies is an async stream
using var replies = client.Subscribe(new Service.SubscribeRequest());
// move to the next message within the reply stream
while(await replies.ResponseStream.MoveNext())
{
// get the current message within reply stream
var message = replies.ResponseStream.Current;
// print the current message
Console.WriteLine(message.Msg);
}
replies
流可能是无限的,并且只有在客户端或服务器终止连接时才会结束。如果服务器没有新的回复,客户端将等待 await replies.ResponseStream.MoveNext()
。
服务器代码
一旦我们通过演示其客户端弄清楚了服务器的作用,让我们看一下 StreamingRelayServer
项目中的服务器代码。
这是 Program.cs 代码,它启动服务器,将其绑定到 RelayServiceImplementations
(RelayServer
的 gRPC 实现),并将其连接到 localhost 的 5555 端口。
// bind RelayServiceImplementations to the gRPC server.
Server server = new Server
{
Services = { RelayService.BindService(new RelayServiceImplementations()) }
};
// set the server to be connected to port 5555 on the localhost without
// any security
server.Ports.Add(new ServerPort("localhost", 5555, ServerCredentials.Insecure));
// start the server
server.Start();
// prevent the server from exiting.
Console.ReadLine();
RelayServiceImplementations
类包含最有趣的代码,实现了 gRPC 存根(从 service.proto 文件中定义的 RelayService
生成)的 abstract
方法 Publish(...)
和 Subscribe(...)
。
internal class RelayServiceImplementations : RelayServiceBase
{
// all client subscriptions
List<Subscription> _subscriptions = new List<Subscription>();
// Publish implementation
public override async Task<PublishConfirmed> Publish
(
Message request,
ServerCallContext context)
{
// add a published message to every subscription
foreach (Subscription subscription in _subscriptions)
{
subscription.AddMessage(request.Msg);
}
// return PublishConfirmed reply
return new PublishConfirmed();
}
// Subscribe implementation
public override async Task Subscribe
(
SubscribeRequest request,
IServerStreamWriter<Message> responseStream,
ServerCallContext context)
{
// create subscription object for a client subscription
Subscription subscription = new Subscription();
// add subscription to the list of subscriptions
_subscriptions.Add(subscription);
// subscription loop
while (true)
{
try
{
// take message one by one from subscription
string msg = subscription.TakeMessage(context.CancellationToken);
// create Message reply
Message message = new Message { Msg = msg };
// write the message into the output stream.
await responseStream.WriteAsync(message);
}
catch when(context.CancellationToken.IsCancellationRequested)
{
// if subscription is cancelled, break the loop
break;
}
}
// once the subscription is broken, remove it
// from the list of subscriptions
_subscriptions.Remove(subscription);
}
}
Publish(...)
方法遍历 _subscriptions
列表中的每个订阅,并将新发布的邮件添加到其中每个订阅。
Subscribe(...)
方法创建一个单独的订阅,并检查它是否有新邮件(由 Publish(...)
方法插入)。如果找到邮件,它会将其删除并推送到响应流。如果找不到邮件,它会等待。
一旦 Subscribe
连接中断,订阅将被移除。
这是单个 subscription
对象的代码。
// represents a single client subscription
internal class Subscription
{
private BlockingCollection<string> _messages =
new BlockingCollection<string>();
// add a message to the _messages collection
public void AddMessage(string message)
{
_messages.Add(message);
}
// remove the first message from the _messages collection
// If there are no message in the collection, TakeMessage will wait
// blocking the thread.
public string TakeMessage(CancellationToken cancellationToken)
{
return _messages.Take(cancellationToken);
}
}
BlockingCollection
将阻塞订阅线程,直到其中有邮件。由于每个订阅(或任何客户端操作)都在自己的线程中运行,因此其他订阅不会受到影响。
发布 Node JS 示例
项目 PublishNodeJsSample
- 包含其 app.js 文件中的相关代码。
// import grpc packages
let grpc = require('@grpc/grpc-js');
let protoLoader = require('@grpc/proto-loader');
// load and parse the service.proto file
const root = protoLoader.loadSync
(
'../../Protos/service.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
// get the service package containing RelayService object
const service = grpc.loadPackageDefinition(root).service;
// create the RelayService client connected to localhost:5555 port
const client = new service.RelayService
("localhost:5555", grpc.credentials.createInsecure());
// publish the Message object "Published from JS Client"
// (as long as the Json structure matches the Message object structure it will be
// converted to Message object)
client.Publish({ msg: "Published from JS Client" }, function (err, response) {
});
有趣的部分是 client.Publish(...)
代码。请注意,我们正在创建一个 JSON 对象 { msg: "Published from JS Client" }
作为 Publish(Message msg, ...)
方法的输入。由于其 JSON 结构与 service.proto 文件中定义的 Message
对象结构匹配,因此该对象将在服务器上自动转换为 Message
对象。
这里再次提醒 service.proto Message
的样子。
// Relay Message class that
// contains a single msg field
message Message
{
string msg = 1;
}
订阅 Node JS 示例
此示例的重要代码位于 SubscribeNodeJsSample
项目的 app.js 文件中。
// import grpc packages
let grpc = require('@grpc/grpc-js');
let protoLoader = require('@grpc/proto-loader');
// load and parse the service.proto file
const root = protoLoader.loadSync
(
'../../Protos/service.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
// get the service package containing RelayService object
const service = grpc.loadPackageDefinition(root).service;
// create the RelayService client connected to localhost:5555 port
const client = new service.RelayService
("localhost:5555", grpc.credentials.createInsecure());
// create the client subcription by passing
// an empty Json message (matching empty SubscribeRequest from service.proto)
var call = client.Subscribe({});
// process data stream combing from the server in
// response to calling Subscribe(...) gRPC
call.on('data', function (response) {
console.log(response.msg);
});
请注意,调用 Subscribe(...)
gRPC 将返回一个 JS 委托,每次从服务器接收到响应消息时都会调用该委托。
发布 Python 示例
此示例的代码位于 PublishPythonSample
项目的 PublishPythonSample.py 文件中。
# import python packages
import grpc
import grpc_tools.protoc
# generate the client stubs for service.proto file in python
grpc_tools.protoc.main([
'grpc_tools.protoc',
'-I{}'.format("../../Protos/."),
'--python_out=.',
'--grpc_python_out=.',
'../../Protos/service.proto'
])
# import the client stubs (service_pb2 contains messages,
# service_pb2_grpc contains RPCs)
import service_pb2;
import service_pb2_grpc;
# create the channel
channel = grpc.insecure_channel('localhost:5555')
# create the client stub object for RelayService
stub = service_pb2_grpc.RelayServiceStub(channel);
# create and publish the message
response = stub.Publish(service_pb2.Message(msg='Publish from Python Client'));
重要提示:我们使用以下命令创建通道:
channel = grpc.insecure_channel('localhost:5555')
这意味着创建的通道和从中获得的 stub
只允许对 stub
进行**阻塞调用**——我们稍后使用的 stub.Publish(...)
调用是一个**阻塞调用**,它会等待直到与服务器的往返完成才返回。
如果想使用**异步**(非阻塞但可等待)调用,应该通过使用以下方式创建通道:
channel = grpc.aio.insecure_channel('localhost:5555')
请注意区别——我们上面使用的 insecure_channel(...)
方法路径中缺少 .aio.
部分。但是,我们将在下面给出长期订阅连接的示例时使用此调用到异步版本的 insecure_channel(...)
方法。
这看起来是一个小细节,但花了我一些时间才弄明白,所以希望这个提示能让其他人避免犯同样的错误。
订阅 Python 示例
这是代码(来自同一名称项目的 SubscribePythonSample.py 文件)。
# import python packages
import asyncio
import grpc
import grpc_tools.protoc
# generate the client stubs for service.proto file in python
grpc_tools.protoc.main([
'grpc_tools.protoc',
'-I{}'.format("../../Protos/."),
'--python_out=.',
'--grpc_python_out=.',
'../../Protos/service.proto'
])
# import the client stubs (service_pb2 contains messages,
# service_pb2_grpc contains RPCs)
import service_pb2;
import service_pb2_grpc;
# define async loop
async def run() -> None:
#create the channel
async with grpc.aio.insecure_channel('localhost:5555') as channel:
# create the client stub object for RelayService
stub = service_pb2_grpc.RelayServiceStub(channel);
# call Subscribe gRCP and print the responses asynchronously
async for response in stub.Subscribe(service_pb2.SubscribeRequest()):
print(response.msg)
# run the async method calling that subscribes and
# prints the messages coming from the server
asyncio.run(run())
当然,请注意,为了创建允许异步调用的通道,我们在 grpc.aio.
路径中使用 insecure_channel(...)
版本的方法。
async with grpc.aio.insecure_channel('localhost:5555') as channel:
结论
在微软几乎弃用 WCF 后,gRPC - google RPC 是创建各种服务器/客户端通信的最佳框架。除了常规的请求/回复模式外,它还促进了带输出和输入流的发布/订阅模式的实现。
在本文中,我演示了如何将 gRPC 服务器(用 C# 实现)与用不同语言(C#、JavaScript 和 Python)编写的客户端一起使用。我提供了请求/回复和发布/订阅模式的示例。唯一缺少的模式是客户端向服务器发送流作为输入,但这种情况不那么常见,我可能会在将来添加一个涵盖它们的章节。
历史
- 2023 年 1 月 24 日:初始版本