65.9K
CodeProject 正在变化。 阅读更多。
Home

Griffin.Networking - .NET 网络库

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (8投票s)

2012年5月7日

LGPL3

11分钟阅读

viewsIcon

27639

一个性能尚可的 .NET 网络库

免责声明:当前框架发布版为测试版。它应该是相当稳定的,但如果它导致您的计算机爆炸,我概不负责。

引言

Griffin.Networking 是一个用 C# 编写的网络库,其目的是

  1. 抽象化您在使用原生 .NET 时必须完成的重复性任务
  2. 创建一种更结构化的方式来处理入站和出站数据

这两个目标应该可以缩短开发网络应用的时间,并由于(希望如此)设计良好的网络层而提高性能。

框架还内置了面向对象控制反转(IoC)支持(请注意,慢速容器会大大损害性能)。IoC 支持由一个服务定位器接口提供(该接口不应暴露在框架外部)。

本文的目的是描述该框架并展示如何用它来开发应用程序。完成后,您将拥有一个可用的 JSON RPC 实现(服务器端)。客户端可以很容易地在之后创建。只需为框架创建一个 `RequestEncoder` 和一个 `ResponseDecoder`,其余所有内容都可以从服务器实现中重用。

背景

我过去构建了多个网络应用程序。从 C# 中的小型客户端应用程序到利用 IO Completion Ports 的 C++ 中的高性能套接字服务器。

它们运行良好,但在实现应用程序时我似乎总是在重复自己。最大的问题是很难获得一个可扩展的架构,您可以在其中将处理程序注入到协议处理中。我的 C# WebServer(搜索“C# webserver”并点击第一个搜索结果)很好地说明了这一点。很难跟踪通信流程。

因此,我决定尝试创建一个易于使用和扩展的网络库。在研究过程中,我偶然发现了 Netty for Java,我的库深受其启发(架构、代码全是我的)。

架构

本节的目的是简要概述将在整篇文章中使用的架构和术语。在阅读完整篇文章之前,本节中的某些内容可能没有意义。

通道

Channel 是 IO 层。在大多数情况下,它是一个套接字实现,但也可以是用于通信的任何东西。默认套接字实现使用经典的 `Begin`/`End` 类型方法。它们很可能会在以后被新的 `Async` 方法取代。

有两种 Channel:服务器 Channel 负责接受新连接并构建正确的客户端 Channel(及其 Pipeline)。客户端 Channel 负责向远程对等方发送和接收信息。

    public interface IChannel
    {
        void HandleDownstream(IPipelineMessage message);
    }

如您所见,Channel 接口非常小。原因是整个框架都是异步的。所有通信都通过消息进行。

Channel 的契约规定它只能接收和处理消息。消息(下文详见)可以是 `Connect`、`SendBuffer` 或 `Close`。所有 Channel 实现都在构造函数中接受一个 Pipeline(见下文),并使用它将消息发送到您的应用程序。

管道

Pipeline 是库中最核心的部分。所有的操作都发生在 Pipeline 中。在 Pipeline 中,您可以对用户进行授权,将传入的 `byte[]` 数组转换为更易用的内容,如 `HttpRequest` 等。

Pipeline 有两个方向(可以与双车道公路进行比较)。从 Channel 到应用程序的通道称为“上游”,因为消息从 Channel 流向您的应用程序。另一个方向称为“下游”,因为消息向下流向 Channel。

一个 Pipeline 可以包含任意数量的 handler,每个方向都有其独特的 handler 集。一个 HTTP 流服务器可能只包含上游的 `HttpHeaderDecoder` 和下游的 `HttpFileStreamer` 以提高性能,而一个完整的 `HttpServer` 则会包含会话管理、身份验证、日志记录、错误处理程序等作为上游 handler。

public interface IPipeline
{
  /// <summary>
  /// Send something from the channel to all handlers.
  /// </summary>
  /// <param name="message">Message to send to the client</param>
  void SendUpstream(IPipelineMessage message);
 
  /// <summary>
  /// Set down stream end point
  /// </summary>
  /// <param name="channel">channel which will handle all down stream messages</param>
  void SetChannel(IChannel channel);
 
  /// <summary>
  /// Send a message from the client and downwards.
  /// </summary>
  /// <param name="message">Message to send to the channel</param>
  void SendDownstream(IPipelineMessage message);
} 

该架构允许您完全控制传入和传出数据在到达您的应用程序或 Channel 之前是如何被处理的。

消息

正如前一节所述,Pipeline 用于向/从您的应用程序发送消息。这些消息是包含要处理信息的小类。可以将消息与 .NET 事件机制中的 `EventArg` 类进行比较。实现了 `IPipelineMessage` 接口的 POCO 类。需要执行操作的消息应命名为动词(`Send`),而事件消息应命名为过去式(`Received`)。

总的指导原则是,每条消息只能包含一种类型的信息。您不能有一个名为 `Received` 的消息,其中包含一个对象属性,该属性从 `byte[]` 开始,到 `SuperDeluxeObject` 结束。而是创建一个名为 `ReceivedSuperDeluxe` 的新消息,该消息包含 `SuperDeluxeObject` 对象。这使得处理更清晰、更易于遵循。

示例消息

public class Connect : IPipelineMessage
{
    private readonly EndPoint _remoteEndPoint;
    public Connect(EndPoint remoteEndPoint)
    {
        if (remoteEndPoint == null)
            throw new ArgumentNullException("remoteEndPoint");
        _remoteEndPoint = remoteEndPoint;
    }
    public EndPoint RemoteEndPoint
    {
        get { return _remoteEndPoint; }
    }
} 

Pipeline Handlers

Pipeline handler 用于处理通过 Pipeline 发送的消息。它们可以是单例(在 Channel 之间共享)或按 Channel 创建。与 Pipeline 一起构造的 handler 可以存储状态信息,因为它们只被一个 Channel 使用。

追踪接收信息的示例上游 handler

public class BufferTracer : IUpstreamHandler
{
    private readonly ILogger _logger = LogManager.GetLogger<BufferTracer>();
    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        var msg = message as Received;
        if (msg != null)
        {
            var str = Encoding.UTF8.GetString
            (msg.BufferSlice.Buffer, msg.BufferSlice.Position, msg.BufferSlice.RemainingLength);
            _logger.Trace(str);
        }
        context.SendUpstream(message);
    }
}

请注意,它如何使用 `context.SendUpstream(message)` 将所有消息发送到下一个 handler。这非常重要。每个 handler 都可以决定消息是否应该传播到调用堆栈的上方。这也是消息如何被转换为更易用的内容。

让我们看看 HTTP `HeaderDecoder` handler

public class HeaderDecoder : IUpstreamHandler
{
    private readonly IHttpParser _parser;
    private int _bodyBytesLeft = 0;
    public HeaderDecoder(IHttpParser parser)
    {
        if (parser == null) throw new ArgumentNullException("parser");
        _parser = parser;
    }
    public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
    {
        if (message is Closed)
        {
            _bodyBytesLeft = 0;
            _parser.Reset();
        }
        else if (message is Received)
        {
            var msg = (Received) message;
            // complete the body
            if (_bodyBytesLeft > 0)
            {
                _bodyBytesLeft -= msg.BufferSlice.Count;
                context.SendUpstream(message);
                return;
            }
            var httpMsg = _parser.Parse(msg.BufferSlice);
            if (httpMsg != null)
            {
                var recivedHttpMsg = new ReceivedHttpRequest((IRequest) httpMsg);
                _bodyBytesLeft = recivedHttpMsg.HttpRequest.ContentLength;
                _parser.Reset();
                // send up the message to let someone else handle the body
                context.SendUpstream(recivedHttpMsg);
                msg.BytesHandled = msg.BufferSlice.Count;
                context.SendUpstream(msg);
            }
            return;
        }
        context.SendUpstream(message);
    }
} 

这里有两个重要的地方

它遵循单一职责原则

它实际上并不解析 HTTP 消息,而是使用外部解析器进行解析。由于它不违反单一职责原则,因此很容易理解 handler 的作用,并且我们可以随时切换解析器,如果找到更快的解析器。

它将 `Received` 消息转换为 `ReceivedHttpRequest`

所有消息都应视为不可变的。除非有非常充分的理由,否则不要更改其内容。不要将原始包传播到上游,而是创建一个新消息(如果您已处理该消息)。

切换方向

Pipeline handler 可以在任何时候从下游切换到上游(或反之)。切换方向将始终调用另一侧的第一个 handler。这使我们能够简化流程并避免混淆。

public class AuthenticationHandler : IUpstreamHandler
{
  private readonly IAuthenticator _authenticator;
  private readonly IPrincipalFactory _principalFactory;
 
  public AuthenticationHandler(IAuthenticator authenticator, IPrincipalFactory principalFactory)
  {
    _authenticator = authenticator;
    _principalFactory = principalFactory;
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as ReceivedHttpRequest;
    if (msg == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    var authHeader = msg.HttpRequest.Headers["Authorization"];
    if (authHeader == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    var user = _authenticator.Authenticate(msg.HttpRequest);
    if (user == null)
    {
      //Not authenticated, send error downstream and abort handling
      var response = msg.HttpRequest.CreateResponse(HttpStatusCode.Unauthorized, 
                                                    "Invalid username or password.");
      context.SendDownstream(new SendHttpResponse(msg.HttpRequest, response));
    }
    else
    {
      var principal =
        _principalFactory.Create(new PrincipalFactoryContext {Request = msg.HttpRequest, User = user});
      Thread.CurrentPrincipal = principal;
    }
  }
} 

Pipeline Factories

每次创建新 Channel 时都需要构建一个 Pipeline(及其所有 handler)。框架中有两个内置的 factory。

一个使用名为 `IServiceLocator` 的接口,允许您添加对您喜欢的 IoC 容器的支持。另一个使用委托来创建有状态 handler。

var factory = new DelegatePipelineFactory();
factory.AddDownstreamHandler(() => new ResponseEncoder());
factory.AddUpstreamHandler(() => new HeaderDecoder(new HttpParser()));
factory.AddUpstreamHandler(new HttpErrorHandler(new SimpleErrorFormatter())); //singleton
factory.AddUpstreamHandler(() => new BodyDecoder(new CompositeBodyDecoder(), 65535, 6000000));
factory.AddUpstreamHandler(() => new FileHandler());
factory.AddUpstreamHandler(() => new MessageHandler());
factory.AddUpstreamHandler(new PipelineFailureHandler());                     //singleton

缓冲区 (Buffers)

高性能网络库的一个基本部分是数据处理方式。所有较大的内存分配都会损害性能。我们不希望每次读取或发送新数据包时都创建一个新的 `byte[65535]`。分配需要时间,垃圾回收器需要工作更多,并且内存最终会变得碎片化。

框架通过使用缓冲区池和一个名为 `BufferSlice` 的类来解决这个问题。我们可以分配一个 5MB 大小的缓冲区,并将其分割成更小的块,然后在处理中使用它们。我们可以将缓冲区池设置为单例,或者让每个 handler 分配自己的缓冲区池(如果只有五个 handler,仍然是五个分配而不是 5000 个)。

当 `BufferSlice` 被处置时,它会将缓冲区返回给池。因此,所有使用 `BufferSlice` 类的消息都必须实现 `IDisposable`,因为 Channel 在完成处理后会处置所有消息。

性能

框架仍然很新(大约一个月=))。性能尚未达到顶峰。

但是,我使用 Apache 的 ab 工具向 HTTP 侦听器发送了 5000 个请求。框架处理了大约每秒 280 个 HTTP 请求(localhost),我认为在这个项目的早期阶段这是可以接受的。内存消耗约为 80MB(工作集)。(请注意,除非与其他框架进行比较,否则这些数字没有任何实际意义。)欢迎您帮助提高性能或进行自己的基准测试。我很乐意收到一个示例应用程序,可用于性能调优(并与其他框架进行性能比较)。

请不要将 `HttpListener` 与 .NET 中的进行比较,因为 .NET 版本使用 `http.sys`,它在内核模式下运行。任何纯 .NET 解决方案的性能都无法接近它。

构建 JSON RPC 服务器

是时候开始构建一个 JSON RPC 服务器了。创建一个名为 `JsonRpcServer` 之类的新控制台应用程序。打开 nuget 包控制台并运行 `install-package griffin.networking` 来安装框架。

JSON RPC 的规范可以在 官方网站 上找到。本文档不会帮助您理解它,只会展示如何实现它。规范没有说明消息是如何传输的,因此我们将创建一个简单的头部来包装消息。该头部是一个简单的二进制头部,包含一个版本(`byte`)字段和一个长度(`int`)字段。

解码 / 编码

我们需要做的第一件事是处理传入的字节。我们必须将它们解码成我们可以处理的内容。如前所述,我们将使用一个简单的信封。类似这样的

public class SimpleHeader
{
  public int Length { get; set; }
  public byte Version { get; set; }
} 

但是,为了能够使用该类,我们需要以某种方式解码传入的字节。所以,让我们创建第一个 Pipeline handler,我们将仅为此目的使用它

public class HeaderDecoder : IUpstreamHandler
{
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as Received;
    if (msg == null)
    {
      context.SendUpstream(message);
      return;
    }
 
    // byte + int
    if (msg.BufferSlice.RemainingLength < 5)
    {
      return;
    }
 
    var header = new SimpleHeader
             {
               Version = msg.BufferSlice.Buffer[msg.BufferSlice.Position++],
               Length = BitConverter.ToInt32(msg.BufferSlice.Buffer, msg.BufferSlice.Position)
             };
    msg.BufferSlice.Position += 4;
    context.SendUpstream(new ReceivedHeader(header));
 
    if (msg.BufferSlice.RemainingLength > 0)
      context.SendUpstream(msg);
  }
}} 

相当直接。我们不会处理任何内容,直到我们获得至少五个字节(Channel 将在末尾继续填充缓冲区,直到我们处理某项内容)。然后我们只需解码头部,发送一个 `RecievedHeader` 消息,并将剩余的字节传递过去。请注意,我首先使用了版本字节。通过这样做,我们可以在未来的版本中随意更改头部,而不会搞砸一切。

头部除了 JSON 消息的大小之外,没有说明任何其他内容。所以我们需要一些东西来处理 JSON。让我们为该目的创建另一个上游 handler(从而遵守单一职责原则)。它将被调用…… `BodyDecoder` Wink | <img src= " /> (我作弊了,创建了 JSON RPC 规范中描述的 `Request`/`Response`/`Error` 对象。)

public class BodyDecoder : IUpstreamHandler
{
  private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 50);
  private readonly BufferPoolStream _stream;
  private SimpleHeader _header;
 
  public BodyDecoder()
  {
    var slice = _bufferPool.PopSlice();
    _stream = new BufferPoolStream(_bufferPool, slice);
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var headerMsg = message as ReceivedHeader;
    if (headerMsg != null)
    {
      _header = headerMsg.Header;
      if (_header.Length > 65535)
      {
        var error = new ErrorResponse("-9999", new RpcError
                               {
                                 Code = RpcErrorCode.InvalidRequest,
                                 Message =
                                   "Support requests which is at most 655355 bytes.",
                               });
        context.SendDownstream(new SendResponse(error));
      }
 
      return;
    }
 
    var received = message as Received;
    if (received != null)
    {
      var count = Math.Min(received.BufferSlice.RemainingLength, _header.Length);
      _stream.Write(received.BufferSlice.Buffer, received.BufferSlice.Position, count);
      received.BufferSlice.Position += count;
 
      if (_stream.Length == _header.Length)
      {
        _stream.Position = 0;
        var request = DeserializeRequest(_stream);
        context.SendUpstream(new ReceivedRequest(request));
      }
 
      return;
    }
 
    context.SendUpstream(message);
  }
 
  protected virtual Request DeserializeRequest(BufferPoolStream body)
  {
    var reader = new StreamReader(body);
    var json = reader.ReadToEnd();
    return JsonConvert.DeserializeObject<Request>(json);
  }
} 

在这里,我们使用 `BufferPool` 而不是每次都创建一个新的缓冲区。因此,性能会大大提高,并且如果服务器运行一段时间,内存碎片会大大减少。还要注意,框架有一个 `BufferPoolStream`,它使用 `BufferPool` 来获取 `byte[]` 缓冲区。该流的未来版本很可能能够在后台使用多个缓冲区(因此能够处理大量数据而不会创建过大的缓冲区)。

在继续实际应用程序之前,让我们添加唯一的下游 handler。响应编码器。

public class ResponseEncoder : IDownstreamHandler
{
  private static readonly BufferPool _bufferPool = new BufferPool(65535, 50, 100);
 
  public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg =  message as SendResponse;
    if (msg == null)
    {
      context.SendDownstream(message);
      return;
    }
 
    var result = JsonConvert.SerializeObject(msg.Response, Formatting.None);
 
    // send header
    var header = new byte[5];
    header[0] = 1;
    var lengthBuffer = BitConverter.GetBytes(result.Length);
    Buffer.BlockCopy(lengthBuffer, 0, header, 1, lengthBuffer.Length);
    context.SendDownstream(new SendBuffer(header, 0, 5));
 
    // send JSON
    var slice = _bufferPool.PopSlice();
    Encoding.UTF8.GetBytes(result, 0, result.Length, slice.Buffer, slice.StartOffset);
    slice.Position = slice.StartOffset;
    slice.Count = result.Length;
    context.SendDownstream(new SendSlice(slice));
  }
}

现在,我们只需要在 Pipeline 中做一件事。那就是处理请求。让我们先创建一个非常简单的 handler

class MyApplication : IUpstreamHandler
{
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as ReceivedRequest;
    if (msg == null)
      return;
 
    var parray = msg.Request.Parameters as object[];
    if (parray == null)
      return; // muhahaha, violating the API specification
 
    object result;
    switch (msg.Request.Method)
    {
      case "add":
        result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
        break;
      case "subtract":
        result = int.Parse(parray[0].ToString()) + int.Parse(parray[0].ToString());
        break;
      default:
        result = "Nothing useful.";
        break;
    }
 
    var response = new Response(msg.Request.Id, result);
    context.SendDownstream(new SendResponse(response));
  }
} 

那么,我们如何运行应用程序呢?我们需要创建一个服务器 Channel 并定义客户端 Pipeline。我通常将其放在一个名为 `XxxxListener` 的类中,以遵循 .NET 标准。所以让我们创建一个 `JsonRpcListener`。

public class JsonRpcListener : IUpstreamHandler, IDownstreamHandler
{
  private TcpServerChannel _serverChannel;
  private Pipeline _pipeline;
 
  public JsonRpcListener(IPipelineFactory clientFactory)
  {
    _pipeline = new Pipeline();
    _pipeline.AddDownstreamHandler(this);
    _pipeline.AddUpstreamHandler(this);
    _serverChannel = new TcpServerChannel(_pipeline, clientFactory, 2000);
 
  }
 
  public void Start(IPEndPoint endPoint)
  {
    _pipeline.SendDownstream(new BindSocket(endPoint));
  }
 
  public void Stop()
  {
    _pipeline.SendDownstream(new Close());
  }
 
  public void HandleUpstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    var msg = message as PipelineFailure;
    if (msg != null)
      throw new TargetInvocationException("Pipeline failed", msg.Exception);
  }
 
  public void HandleDownstream(IPipelineHandlerContext context, IPipelineMessage message)
  {
    context.SendDownstream(message);
  }
}

所以现在,我们可以在 *Program.cs* 中定义客户端 Pipeline,并将其注入到 `RpcListener` 中

class Program
{
  static void Main(string[] args)
  {
    LogManager.Assign(new SimpleLogManager<ConsoleLogger>());
 
    var factory = new DelegatePipelineFactory();
    factory.AddUpstreamHandler(() => new HeaderDecoder());
    factory.AddUpstreamHandler(() => new BodyDecoder());
    factory.AddUpstreamHandler(new MyApplication());
    factory.AddDownstreamHandler(new ResponseEncoder());
 
    JsonRpcListener listener = new JsonRpcListener(factory);
    listener.Start(new IPEndPoint(IPAddress.Any, 3322));
 
    Console.ReadLine();
  }
} 

前两个上游 handler 是有状态的,所以我们需要为每个生成的 Channel 创建它们。这就是我们使用委托的原因。后两个不是有状态的,因此可以是单例。

就这样!您现在拥有了一个可用的 JSON RPC 服务器。当然。它相当基础,但实际的远程处理层与网络层关系不大。

我花了些时间创建了一个概念验证 RPC 系统。让我们先定义我们的 RPC 服务

public class MathModule
{
  [OperationContract]
  public int Sum(int x, int y)
  {
    return x + y;
  }
}

然后,我们需要重新定义客户端 Pipeline

var invoker = new RpcServiceInvoker(new DotNetValueConverter(), new SimpleServiceLocator());
invoker.Map<MathModule>();
factory.AddUpstreamHandler(() => new HeaderDecoder());
factory.AddUpstreamHandler(() => new BodyDecoder());
factory.AddUpstreamHandler(new RequestHandler(invoker));
factory.AddDownstreamHandler(new ResponseEncoder());

就这样!从这里,我们可以包含 HTTP 协议实现,并将我们简单的头部替换为 HTTP 实现中的 `HeaderDecoder`,从而获得一个在 HTTP 上运行的实现,而不是我们的基本二进制头部。我们需要进行一些小的更改来实现这一点,同时保持大部分 JSON RPC 实现不变。

摘要

我希望我已经成功地演示了如何使用 `Griffin.Networking` 开发网络应用程序,并展示了与原生 .NET 套接字服务器相比,它所提供的强大功能。

代码可作为 nuget 包 `griffin.networking` 获得,HTTP 实现(即将)可作为 `griffin.networking.http` 获得。JSON RPC 实现仍然只是一个概念,因此尚未作为发布版添加。欢迎您参与完成它。

所有代码也都可以从 github 获取。请在 github 上报告任何错误和功能请求。

© . All rights reserved.