Griffin.Networking - .NET 网络库
一个性能尚可的 .NET 网络库
引言
Griffin.Networking
是一个用 C# 编写的网络库,其目的是
- 抽象化您在使用原生 .NET 时必须完成的重复性任务
- 创建一种更结构化的方式来处理入站和出站数据
这两个目标应该可以缩短开发网络应用的时间,并由于(希望如此)设计良好的网络层而提高性能。
框架还内置了面向对象控制反转(IoC)支持(请注意,慢速容器会大大损害性能)。IoC 支持由一个服务定位器接口提供(该接口不应暴露在框架外部)。
本文的目的是描述该框架并展示如何用它来开发应用程序。完成后,您将拥有一个可用的 JSON RPC 实现(服务器端)。客户端可以很容易地在之后创建。只需为框架创建一个 `RequestEncoder` 和一个 `ResponseDecoder`,其余所有内容都可以从服务器实现中重用。
背景
我过去构建了多个网络应用程序。从 C# 中的小型客户端应用程序到利用 IO Completion Ports 的 C++ 中的高性能套接字服务器。
它们运行良好,但在实现应用程序时我似乎总是在重复自己。最大的问题是很难获得一个可扩展的架构,您可以在其中将处理程序注入到协议处理中。我的 C# WebServer(搜索“C# webserver”并点击第一个搜索结果)很好地说明了这一点。很难跟踪通信流程。
因此,我决定尝试创建一个易于使用和扩展的网络库。在研究过程中,我偶然发现了 Netty for Java,我的库深受其启发(架构、代码全是我的)。
架构
本节的目的是简要概述将在整篇文章中使用的架构和术语。在阅读完整篇文章之前,本节中的某些内容可能没有意义。
通道
Channel 是 IO 层。在大多数情况下,它是一个套接字实现,但也可以是用于通信的任何东西。默认套接字实现使用经典的 `Begin`/`End` 类型方法。它们很可能会在以后被新的 `Async` 方法取代。
有两种 Channel:服务器 Channel 负责接受新连接并构建正确的客户端 Channel(及其 Pipeline)。客户端 Channel 负责向远程对等方发送和接收信息。
public interface IChannel
{
void HandleDownstream(IPipelineMessage message);
}
如您所见,Channel 接口非常小。原因是整个框架都是异步的。所有通信都通过消息进行。
Channel 的契约规定它只能接收和处理消息。消息(下文详见)可以是 `Connect`、`SendBuffer` 或 `Close`。所有 Channel 实现都在构造函数中接受一个 Pipeline(见下文),并使用它将消息发送到您的应用程序。
管道
Pipeline 是库中最核心的部分。所有的操作都发生在 Pipeline 中。在 Pipeline 中,您可以对用户进行授权,将传入的 `byte[]` 数组转换为更易用的内容,如 `HttpRequest` 等。
Pipeline 有两个方向(可以与双车道公路进行比较)。从 Channel 到应用程序的通道称为“上游”,因为消息从 Channel 流向您的应用程序。另一个方向称为“下游”,因为消息向下流向 Channel。
一个 Pipeline 可以包含任意数量的 handler,每个方向都有其独特的 handler 集。一个 HTTP 流服务器可能只包含上游的 `HttpHeaderDecoder` 和下游的 `HttpFileStreamer` 以提高性能,而一个完整的 `HttpServer` 则会包含会话管理、身份验证、日志记录、错误处理程序等作为上游 handler。
public interface IPipeline
{
/// <summary>
/// Send something from the channel to all handlers.
/// </summary>
/// <param name="message">Message to send to the client</param>
void SendUpstream(IPipelineMessage message);
/// <summary>
/// Set down stream end point
/// </summary>
/// <param name="channel">channel which will handle all down stream messages</param>
void SetChannel(IChannel channel);
/// <summary>
/// Send a message from the client and downwards.
/// </summary>
/// <param name="message">Message to send to the channel</param>
void SendDownstream(IPipelineMessage message);
}
该架构允许您完全控制传入和传出数据在到达您的应用程序或 Channel 之前是如何被处理的。
消息
正如前一节所述,Pipeline 用于向/从您的应用程序发送消息。这些消息是包含要处理信息的小类。可以将消息与 .NET 事件机制中的 `EventArg` 类进行比较。实现了 `IPipelineMessage` 接口的 POCO 类。需要执行操作的消息应命名为动词(`Send`),而事件消息应命名为过去式(`Received`)。
总的指导原则是,每条消息只能包含一种类型的信息。您不能有一个名为 `Received` 的消息,其中包含一个对象属性,该属性从 `byte[]` 开始,到 `SuperDeluxeObject` 结束。而是创建一个名为 `ReceivedSuperDeluxe` 的新消息,该消息包含 `SuperDeluxeObject` 对象。这使得处理更清晰、更易于遵循。
示例消息
public class Connect : IPipelineMessage
{
private readonly EndPoint _remoteEndPoint;
public Connect(EndPoint remoteEndPoint)
{
if (remoteEndPoint == null)
throw new ArgumentNullException("remoteEndPoint");
_remoteEndPoint = remoteEndPoint;
}
public EndPoint RemoteEndPoint
{
get { return _remoteEndPoint; }
}
}
Pipeline Handlers
Pipeline handler 用于处理通过 Pipeline 发送的消息。它们可以是单例(在 Channel 之间共享)或按 Channel 创建。与 Pipeline 一起构造的 handler 可以存储状态信息,因为它们只被一个 Channel 使用。
追踪接收信息的示例上游 handler
public class BufferTracer : IUpstreamHandler
{
private readonly ILogger _logger = LogManager.GetLogger<BufferTracer>();
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as Received;
if (msg != null)
{
var str = Encoding.UTF8.GetString
(msg.BufferSlice.Buffer, msg.BufferSlice.Position, msg.BufferSlice.RemainingLength);
_logger.Trace(str);
}
context.SendUpstream(message);
}
}
请注意,它如何使用 `context.SendUpstream(message)` 将所有消息发送到下一个 handler。这非常重要。每个 handler 都可以决定消息是否应该传播到调用堆栈的上方。这也是消息如何被转换为更易用的内容。
让我们看看 HTTP `HeaderDecoder` handler
public class HeaderDecoder : IUpstreamHandler
{
private readonly IHttpParser _parser;
private int _bodyBytesLeft = 0;
public HeaderDecoder(IHttpParser parser)
{
if (parser == null) throw new ArgumentNullException("parser");
_parser = parser;
}
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
if (message is Closed)
{
_bodyBytesLeft = 0;
_parser.Reset();
}
else if (message is Received)
{
var msg = (Received) message;
// complete the body
if (_bodyBytesLeft > 0)
{
_bodyBytesLeft -= msg.BufferSlice.Count;
context.SendUpstream(message);
return;
}
var httpMsg = _parser.Parse(msg.BufferSlice);
if (httpMsg != null)
{
var recivedHttpMsg = new ReceivedHttpRequest((IRequest) httpMsg);
_bodyBytesLeft = recivedHttpMsg.HttpRequest.ContentLength;
_parser.Reset();
// send up the message to let someone else handle the body
context.SendUpstream(recivedHttpMsg);
msg.BytesHandled = msg.BufferSlice.Count;
context.SendUpstream(msg);
}
return;
}
context.SendUpstream(message);
}
}
这里有两个重要的地方
它遵循单一职责原则
它实际上并不解析 HTTP 消息,而是使用外部解析器进行解析。由于它不违反单一职责原则,因此很容易理解 handler 的作用,并且我们可以随时切换解析器,如果找到更快的解析器。
它将 `Received` 消息转换为 `ReceivedHttpRequest`
所有消息都应视为不可变的。除非有非常充分的理由,否则不要更改其内容。不要将原始包传播到上游,而是创建一个新消息(如果您已处理该消息)。
切换方向
Pipeline handler 可以在任何时候从下游切换到上游(或反之)。切换方向将始终调用另一侧的第一个 handler。这使我们能够简化流程并避免混淆。
public class AuthenticationHandler : IUpstreamHandler
{
private readonly IAuthenticator _authenticator;
private readonly IPrincipalFactory _principalFactory;
public AuthenticationHandler(IAuthenticator authenticator, IPrincipalFactory principalFactory)
{
_authenticator = authenticator;
_principalFactory = principalFactory;
}
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as ReceivedHttpRequest;
if (msg == null)
{
context.SendUpstream(message);
return;
}
var authHeader = msg.HttpRequest.Headers["Authorization"];
if (authHeader == null)
{
context.SendUpstream(message);
return;
}
var user = _authenticator.Authenticate(msg.HttpRequest);
if (user == null)
{
//Not authenticated, send error downstream and abort handling
var response = msg.HttpRequest.CreateResponse(HttpStatusCode.Unauthorized,
"Invalid username or password.");
context.SendDownstream(new SendHttpResponse(msg.HttpRequest, response));
}
else
{
var principal =
_principalFactory.Create(new PrincipalFactoryContext {Request = msg.HttpRequest, User = user});
Thread.CurrentPrincipal = principal;
}
}
}
Pipeline Factories
每次创建新 Channel 时都需要构建一个 Pipeline(及其所有 handler)。框架中有两个内置的 factory。
一个使用名为 `IServiceLocator` 的接口,允许您添加对您喜欢的 IoC 容器的支持。另一个使用委托来创建有状态 handler。
var factory = new DelegatePipelineFactory();
factory.AddDownstreamHandler(() => new ResponseEncoder());
factory.AddUpstreamHandler(() => new HeaderDecoder(new HttpParser()));
factory.AddUpstreamHandler(new HttpErrorHandler(new SimpleErrorFormatter())); //singleton
factory.AddUpstreamHandler(() => new BodyDecoder(new CompositeBodyDecoder(), 65535, 6000000));
factory.AddUpstreamHandler(() => new FileHandler());
factory.AddUpstreamHandler(() => new MessageHandler());
factory.AddUpstreamHandler(new PipelineFailureHandler()); //singleton
缓冲区 (Buffers)
高性能网络库的一个基本部分是数据处理方式。所有较大的内存分配都会损害性能。我们不希望每次读取或发送新数据包时都创建一个新的 `byte[65535]`。分配需要时间,垃圾回收器需要工作更多,并且内存最终会变得碎片化。
框架通过使用缓冲区池和一个名为 `BufferSlice` 的类来解决这个问题。我们可以分配一个 5MB 大小的缓冲区,并将其分割成更小的块,然后在处理中使用它们。我们可以将缓冲区池设置为单例,或者让每个 handler 分配自己的缓冲区池(如果只有五个 handler,仍然是五个分配而不是 5000 个)。
当 `BufferSlice` 被处置时,它会将缓冲区返回给池。因此,所有使用 `BufferSlice` 类的消息都必须实现 `IDisposable`,因为 Channel 在完成处理后会处置所有消息。
性能
框架仍然很新(大约一个月=))。性能尚未达到顶峰。
但是,我使用 Apache 的 ab 工具向 HTTP 侦听器发送了 5000 个请求。框架处理了大约每秒 280 个 HTTP 请求(localhost),我认为在这个项目的早期阶段这是可以接受的。内存消耗约为 80MB(工作集)。(请注意,除非与其他框架进行比较,否则这些数字没有任何实际意义。)欢迎您帮助提高性能或进行自己的基准测试。我很乐意收到一个示例应用程序,可用于性能调优(并与其他框架进行性能比较)。
请不要将 `HttpListener` 与 .NET 中的进行比较,因为 .NET 版本使用 `http.sys`,它在内核模式下运行。任何纯 .NET 解决方案的性能都无法接近它。
构建 JSON RPC 服务器
是时候开始构建一个 JSON RPC 服务器了。创建一个名为 `JsonRpcServer` 之类的新控制台应用程序。打开 nuget 包控制台并运行 `install-package griffin.networking` 来安装框架。
JSON RPC 的规范可以在 官方网站 上找到。本文档不会帮助您理解它,只会展示如何实现它。规范没有说明消息是如何传输的,因此我们将创建一个简单的头部来包装消息。该头部是一个简单的二进制头部,包含一个版本(`byte`)字段和一个长度(`int`)字段。
解码 / 编码
我们需要做的第一件事是处理传入的字节。我们必须将它们解码成我们可以处理的内容。如前所述,我们将使用一个简单的信封。类似这样的
public class SimpleHeader
{
public int Length { get; set; }
public byte Version { get; set; }
}
但是,为了能够使用该类,我们需要以某种方式解码传入的字节。所以,让我们创建第一个 Pipeline handler,我们将仅为此目的使用它
public class HeaderDecoder : IUpstreamHandler
{
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as Received;
if (msg == null)
{
context.SendUpstream(message);
return;
}
// byte + int
if (msg.BufferSlice.RemainingLength < 5)
{
return;
}
var header = new SimpleHeader
{
Version = msg.BufferSlice.Buffer[msg.BufferSlice.Position++],
Length = BitConverter.ToInt32(msg.BufferSlice.Buffer, msg.BufferSlice.Position)
};
msg.BufferSlice.Position += 4;
context.SendUpstream(new ReceivedHeader(header));
if (msg.BufferSlice.RemainingLength > 0)
context.SendUpstream(msg);
}
}}
相当直接。我们不会处理任何内容,直到我们获得至少五个字节(Channel 将在末尾继续填充缓冲区,直到我们处理某项内容)。然后我们只需解码头部,发送一个 `RecievedHeader` 消息,并将剩余的字节传递过去。请注意,我首先使用了版本字节。通过这样做,我们可以在未来的版本中随意更改头部,而不会搞砸一切。
头部除了 JSON 消息的大小之外,没有说明任何其他内容。所以我们需要一些东西来处理 JSON。让我们为该目的创建另一个上游 handler(从而遵守单一职责原则)。它将被调用…… `BodyDecoder` " /> (我作弊了,创建了 JSON RPC 规范中描述的 `Request`/`Response`/`Error` 对象。)
public class BodyDecoder : IUpstreamHandler
{
private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 50);
private readonly BufferPoolStream _stream;
private SimpleHeader _header;
public BodyDecoder()
{
var slice = _bufferPool.PopSlice();
_stream = new BufferPoolStream(_bufferPool, slice);
}
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var headerMsg = message as ReceivedHeader;
if (headerMsg != null)
{
_header = headerMsg.Header;
if (_header.Length > 65535)
{
var error = new ErrorResponse("-9999", new RpcError
{
Code = RpcErrorCode.InvalidRequest,
Message =
"Support requests which is at most 655355 bytes.",
});
context.SendDownstream(new SendResponse(error));
}
return;
}
var received = message as Received;
if (received != null)
{
var count = Math.Min(received.BufferSlice.RemainingLength, _header.Length);
_stream.Write(received.BufferSlice.Buffer, received.BufferSlice.Position, count);
received.BufferSlice.Position += count;
if (_stream.Length == _header.Length)
{
_stream.Position = 0;
var request = DeserializeRequest(_stream);
context.SendUpstream(new ReceivedRequest(request));
}
return;
}
context.SendUpstream(message);
}
protected virtual Request DeserializeRequest(BufferPoolStream body)
{
var reader = new StreamReader(body);
var json = reader.ReadToEnd();
return JsonConvert.DeserializeObject<Request>(json);
}
}
在这里,我们使用 `BufferPool` 而不是每次都创建一个新的缓冲区。因此,性能会大大提高,并且如果服务器运行一段时间,内存碎片会大大减少。还要注意,框架有一个 `BufferPoolStream`,它使用 `BufferPool` 来获取 `byte[]` 缓冲区。该流的未来版本很可能能够在后台使用多个缓冲区(因此能够处理大量数据而不会创建过大的缓冲区)。
在继续实际应用程序之前,让我们添加唯一的下游 handler。响应编码器。
public class ResponseEncoder : IDownstreamHandler
{
private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 100);
public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as SendResponse;
if (msg == null)
{
context.SendDownstream(message);
return;
}
var result = JsonConvert.SerializeObject(msg.Response, Formatting.None);
// send header
var header = new byte[5];
header[0] = 1;
var lengthBuffer = BitConverter.GetBytes(result.Length);
Buffer.BlockCopy(lengthBuffer, 0, header, 1, lengthBuffer.Length);
context.SendDownstream(new SendBuffer(header, 0, 5));
// send JSON
var slice = _bufferPool.PopSlice();
Encoding.UTF8.GetBytes(result, 0, result.Length, slice.Buffer, slice.StartOffset);
slice.Position = slice.StartOffset;
slice.Count = result.Length;
context.SendDownstream(new SendSlice(slice));
}
}
现在,我们只需要在 Pipeline 中做一件事。那就是处理请求。让我们先创建一个非常简单的 handler
class MyApplication : IUpstreamHandler
{
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as ReceivedRequest;
if (msg == null)
return;
var parray = msg.Request.Parameters as object[];
if (parray == null)
return; // muhahaha, violating the API specification
object result;
switch (msg.Request.Method)
{
case "add":
result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
break;
case "subtract":
result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
break;
default:
result = "Nothing useful.";
break;
}
var response = new Response(msg.Request.Id, result);
context.SendDownstream(new SendResponse(response));
}
}
那么,我们如何运行应用程序呢?我们需要创建一个服务器 Channel 并定义客户端 Pipeline。我通常将其放在一个名为 `XxxxListener` 的类中,以遵循 .NET 标准。所以让我们创建一个 `JsonRpcListener`。
public class JsonRpcListener : IUpstreamHandler, IDownstreamHandler
{
private TcpServerChannel _serverChannel;
private Pipeline _pipeline;
public JsonRpcListener(IPipelineFactory clientFactory)
{
_pipeline = new Pipeline();
_pipeline.AddDownstreamHandler(this);
_pipeline.AddUpstreamHandler(this);
_serverChannel = new TcpServerChannel(_pipeline, clientFactory, 2000);
}
public void Start(IPEndPoint endPoint)
{
_pipeline.SendDownstream(new BindSocket(endPoint));
}
public void Stop()
{
_pipeline.SendDownstream(new Close());
}
public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
{
var msg = message as PipelineFailure;
if (msg != null)
throw new TargetInvocationException("Pipeline failed", msg.Exception);
}
public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
{
context.SendDownstream(message);
}
}
所以现在,我们可以在 *Program.cs* 中定义客户端 Pipeline,并将其注入到 `RpcListener` 中
class Program
{
static void Main(string[] args)
{
LogManager.Assign(new SimpleLogManager<ConsoleLogger>());
var factory = new DelegatePipelineFactory();
factory.AddUpstreamHandler(() => new HeaderDecoder());
factory.AddUpstreamHandler(() => new BodyDecoder());
factory.AddUpstreamHandler(new MyApplication());
factory.AddDownstreamHandler(new ResponseEncoder());
JsonRpcListener listener = new JsonRpcListener(factory);
listener.Start(new IPEndPoint(IPAddress.Any, 3322));
Console.ReadLine();
}
}
前两个上游 handler 是有状态的,所以我们需要为每个生成的 Channel 创建它们。这就是我们使用委托的原因。后两个不是有状态的,因此可以是单例。
就这样!您现在拥有了一个可用的 JSON RPC 服务器。当然。它相当基础,但实际的远程处理层与网络层关系不大。
我花了些时间创建了一个概念验证 RPC 系统。让我们先定义我们的 RPC 服务
public class MathModule
{
[OperationContract]
public int Sum(int x, int y)
{
return x + y;
}
}
然后,我们需要重新定义客户端 Pipeline
var invoker = new RpcServiceInvoker(new DotNetValueConverter(), new SimpleServiceLocator());
invoker.Map<MathModule>();
factory.AddUpstreamHandler(() => new HeaderDecoder());
factory.AddUpstreamHandler(() => new BodyDecoder());
factory.AddUpstreamHandler(new RequestHandler(invoker));
factory.AddDownstreamHandler(new ResponseEncoder());
就这样!从这里,我们可以包含 HTTP 协议实现,并将我们简单的头部替换为 HTTP 实现中的 `HeaderDecoder`,从而获得一个在 HTTP 上运行的实现,而不是我们的基本二进制头部。我们需要进行一些小的更改来实现这一点,同时保持大部分 JSON RPC 实现不变。
摘要
我希望我已经成功地演示了如何使用 `Griffin.Networking` 开发网络应用程序,并展示了与原生 .NET 套接字服务器相比,它所提供的强大功能。
代码可作为 nuget 包 `griffin.networking` 获得,HTTP 实现(即将)可作为 `griffin.networking.http` 获得。JSON RPC 实现仍然只是一个概念,因此尚未作为发布版添加。欢迎您参与完成它。
所有代码也都可以从 github 获取。请在 github 上报告任何错误和功能请求。