65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 .NET Standard 中的 TCPListener 和 TCPClient 实现高性能 TCP 客户端服务器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (15投票s)

2020年6月10日

CPOL

8分钟阅读

viewsIcon

67534

downloadIcon

4136

高性能跨平台 TCP 客户端服务器连接

引言

随着 .NET 5 的推出,以及需要从 .NET 4.8 和 .NET Core 迁移的用户,此源代码旨在提供一个示例,说明如何建立高性能的跨平台客户端服务器消息交换。该方案基于 .NET Standard,无需特定于 .NET Framework 或 .NET Core 或其他任何内容。

此外,它还解决了 TCP 会话中遇到的常见问题:消息自旋锁问题、异步内存泄漏问题以及/或在 TCP 客户端服务器实现中非常普遍的 CancellationToken 异常问题。

TCP 服务器

为了简化起见,我们将使用 CLI 项目。项目类型本身可以是 .NET 5、.NET Core 或 .NET Framework。客户端和服务器都使用 .NET Standard 语法编码,因此它们可以与这三者无缝集成。

服务器主机典型的 CLI Main()

using System;

public static class Program
{
    public static void Main()
    {
        Console.WriteLine("Press esc key to stop");

        int i = 0;
        void PeriodicallyClearScreen()
        {
            i++;
            if (i > 15)
            {
                Console.Clear();
                Console.WriteLine("Press esc key to stop");
                i = 0;
            }
        }

        //Write the host messages to the console
        void OnHostMessage(string input)
        {
            PeriodicallyClearScreen();
            Console.WriteLine(input);
        }

        var BLL = new ServerHost.Host(OnHostMessage);
        BLL.RunServerThread();

        while (Console.ReadKey().Key != ConsoleKey.Escape)
        {
            Console.Clear();
            Console.WriteLine("Press esc key to stop");
        }

        Console.WriteLine("Attempting clean exit");
        BLL.WaitForServerThreadToStop();

        Console.WriteLine("Exiting console Main.");
    }
}

基本的 CLI 管道,没有什么不寻常的。

Esc 键退出客户端窗口,每 15 条消息清除一次窗口(仅用于调试/演示目的)。在生产版本中,请勿将实际网络消息写入控制台。

此代码块唯一需要注意的地方是,为了保持高性能,客户端和服务器托管在专用线程中。也就是说,与执行 Main 块的线程分开。这方面的逻辑包含在 RunServerThread() 函数中。

为此,我们将创建一个 Host 类并将其添加到 .NET Standard 库项目类型中。.NET Standard 库可以被 .NET 5、.NET Framework 和 .NET Core 项目引用,因此在撰写本文时,它是最灵活的选择。

在此处添加以下代码

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;

public class Host
{
    #region Public Functions
    public virtual void RunServerThread()
    {
        this.ServerThread.Start();
        this.OnHostMessages.Invoke("Server started");
    }

    public virtual void WaitForServerThreadToStop()
    {
         this.Server.ExitSignal = true;
         this.OnHostMessages.Invoke("Exit Signal sent to server thread");
         this.OnHostMessages.Invoke("Joining server thread");
         this.ServerThread.Join();
         this.OnHostMessages.Invoke("Server thread has exited gracefully");
    }
    #endregion
}

RunServerThread() 函数启动将运行服务器的 Thread

WaitForServerThreadToStop() 函数向服务器线程发出信号,使其尽快优雅地退出。然后,我们将服务器线程加入到调用线程(在此例中是 Main() 线程),否则 CLI 窗口将终止/中止,从清理/异常处理的角度来看,我们不希望这样做;优雅/干净的退出是首选。

向服务器主机类添加支持变量和构造函数

#region Public Delegates
public delegate void HostMessagesDelegate(string message);
#endregion

#region Variables

protected readonly StandardServer.Server Server;
protected readonly Thread ServerThread;

#region Callbacks
protected readonly HostMessagesDelegate OnHostMessages;
#endregion

#endregion

#region Constructor
public Host(HostMessagesDelegate onHostMessages)
{
    this.OnHostMessages = onHostMessages ?? 
         throw new ArgumentNullException(nameof(onHostMessages));
    this.Server = new StandardServer.Server(this.OnMessage, this.ConnectionHandler);
    this.ServerThread = new Thread(this.Server.Run);
}
#endregion

#region Protected Functions
protected virtual void OnMessage(string message)
{
    this.OnHostMessages.Invoke(message);
}
        
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
}
#endregion 
  • ServerThread:托管服务器的线程(Server 类)
  • OnHostMessagesOnMessage:仅用于演示目的,将消息从 TCP 连接器线程推送到客户端 CLI 窗口。(注意:对于 WinForm 应用程序,您必须在 GUI 线程上执行 ISynchronizeInvoke 才能向最终用户显示消息。Console.Write 没有此限制。)
  • ConnectionHandler:稍后我们会回到这里。
  • Server:我们要编写的 TCP Server 类代码。

创建一个名为 Server 的类,并将以下代码添加到其中

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

public class Server
{
    #region Public Functions
    public virtual void Run()
    {
        if (this.IsRunning)
            return; //Already running, only one running instance allowed.

        this.IsRunning = true;
        this.ExitSignal = false;

        while (!this.ExitSignal)
            this.ConnectionLooper();

        this.IsRunning = false;
   }
   #endregion
}

由于 Run() 运行在专用线程中,您可能需要添加一个 Try Catch 块来调试任何未处理的异常,并根据需要添加日志记录。为了清晰起见,我将此留给您。

Run() 函数处理 ExitSignal 检查,并将其余的 TCP 逻辑推送到 ConnectionLooper() 函数。此时,您可能会想:“但这会是一个无限的自旋循环”,您是正确的,但我们将使用 await 来解决这个问题。

Server 类添加支持变量和 Constructor

#region Public Properties
private volatile bool _ExitSignal;
public virtual bool ExitSignal
{
    get => this._ExitSignal;
    set => this._ExitSignal = value;
}
#endregion

#region Public Delegates
public delegate void ConnectionHandlerDelegate(NetworkStream connectedAutoDisposedNetStream);
public delegate void MessageDelegate(string message);
#endregion

#region Variables

#region Init/State
protected readonly int AwaiterTimeoutInMS;
protected readonly string Host;
protected readonly int Port;
protected readonly int MaxConcurrentListeners;
protected readonly TcpListener Listener;
        
protected bool IsRunning;
protected List<Task> TcpClientTasks = new List<Task>();
#endregion

#region Callbacks
protected readonly ConnectionHandlerDelegate OnHandleConnection;
protected readonly MessageDelegate OnMessage;
#endregion

#endregion

#region Constructor
public Server(
              MessageDelegate onMessage, 
              ConnectionHandlerDelegate connectionHandler, 
              string host = "0.0.0.0",
              int port = 8080, 
              int maxConcurrentListeners = 10,
              int awaiterTimeoutInMS = 500
             )
{
     this.OnMessage = onMessage ?? throw new ArgumentNullException(nameof(onMessage));
     this.OnHandleConnection = connectionHandler ?? 
          throw new ArgumentNullException(nameof(connectionHandler));
     this.Host = host ?? throw new ArgumentNullException(nameof(host));
     this.Port = port;
     this.MaxConcurrentListeners = maxConcurrentListeners;
     this.AwaiterTimeoutInMS = awaiterTimeoutInMS;
     this.Listener = new TcpListener(IPAddress.Parse(this.Host), this.Port);
}
#endregion

这里没什么特别的,唯一需要注意的是 _ExitSignal 成员变量是 volatile 类型,这有助于防止 CLI Main() 线程和服务器主机线程之间发生陈旧的 get/set。对于演示目的,这比 Lock 或 Mutex 更简单,并且 CPU/内存占用可能更少。

IP 0.0.0.0 是 IPAddress Any。您可以根据需要更改或删除默认值。

在这里,我们维护一个 TcpClient 连接任务(异步任务)的 List,它可以是一个大小为(maxConcurrentListeners)的数组,而不是一个 List。如果这样做,它可能会运行快几微秒。

OnMessage 仅用于演示目的,以在 CLI 窗口中显示消息。

OnHandleConnection 是一个回调,该类的使用者将在此处编写特定于其业务场景的网络逻辑。

添加以下代码来实现 ConnectionLooper() 函数

#region Protected Functions
protected virtual void ConnectionLooper()
{
     while (this.TcpClientTasks.Count < this.MaxConcurrentListeners)
     {
        var AwaiterTask = Task.Run(async () =>
        {
             this.ProcessMessagesFromClient(await this.Listener.AcceptTcpClientAsync());
        });
        this.TcpClientTasks.Add(AwaiterTask);
     }

     int RemoveAtIndex = Task.WaitAny(this.TcpClientTasks.ToArray(), this.AwaiterTimeoutInMS);
     
     if (RemoveAtIndex > 0) 
        this.TcpClientTasks.RemoveAt(RemoveAtIndex);
}
#endregion

在这里,我们的服务器正在异步监听一定数量的 TCP 连接(限制为 int MaxConcurrentListeners),这可以防止内部 .NET ThreadPool 线程耗尽。这取决于 vm/server/host 的 CPU 核心数量。您的主机越强大,您就能支持越多的并发监听器。

Task await 建立一个线程继续执行,当客户端成功连接时,将继续执行 ProcessMessagesFromClient。该函数是实际处理网络通信的地方。

然后,我们对这些可等待的任务列表执行 WaitAny(),但只在给定的毫秒数内进行,这部分是关键。如果没有超时,我们就不会检测到 ExitSignal 变量的变化,而在多线程环境中,这对于优雅退出至关重要。WaitAny 可防止自旋锁,同时还能检测已退出的 TCPListener Tasks(.NET 内部 ThreadPool 线程)。

它还可以避免内存泄漏无限/过多的 await、线程或任务,同时仍然异步处理连接。

AcceptTcpClientAsync 如果任务被中止,则会抛出异常。您可能希望为每个任务提供一个 CancellationToken,并在您的需求要求时优雅地退出任务列表中的每个任务。由于我们从不调用 Listener.Stop(),当主机 Thread 优雅退出时,在此示例代码中,清理工作由 .NET GC 内部处理。

WaitAny 检测到任务已完成时,将返回 RemoveAt 索引,以便我们可以在下一轮执行 ExitSignal 检查后将其从列表中简单删除,并重新添加一个新任务。

添加以下代码来实现 ProcessMessagesFromClient() 函数

protected virtual void ProcessMessagesFromClient(TcpClient Connection)
{
    using (Connection)
    {
         if (!Connection.Connected)
            return;

         using (var netstream = Connection.GetStream())
         {
            this.OnHandleConnection.Invoke(netstream);
         }
    }
}

当成功连接时,此函数将在继续执行的 ThreadPool 线程中运行,通常不会与 ServerThreadMain() 线程是同一个线程。如果需要严格强制执行,可以添加 ConfigureAwait(false)

TCPClientNetworkStream 会自动使用 Using 语法关闭和处置,这样,该类的使用者就不必关心清理工作。

OnHandleConnection.Invoke 基本上调用 Host 类中的 ConnectionHandler() 函数,我们现在将编写该函数。

protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
     if (!connectedAutoDisposedNetStream.CanRead && !connectedAutoDisposedNetStream.CanWrite)
        return; //We need to be able to read and write

     var writer = new StreamWriter(connectedAutoDisposedNetStream) { AutoFlush = true };
     var reader = new StreamReader(connectedAutoDisposedNetStream);

     var StartTime = DateTime.Now;
     int i = 0;
     while (!this.Server.ExitSignal) //Tight network message-loop (optional)
     {
          var JSON_Helper = new Helper.JSON();
          string JSON = JSON_Helper.JSONstring();

          string Response;
          try //Communication block
          {
              //Synchronously send some JSON to the connected client
              writer.WriteLine(JSON);
              //Synchronously wait for a response from the connected client
              Response = reader.ReadLine();
          }
          catch (IOException ex)
          {
              _ = ex; //Add Debug breakpoint and logging here
             return; //Swallow exception and Exit function on network error
          }

          //Put breakpoint here to inspect the JSON string return by the connected client
          Helper.SomeDataObject Data = JSON_Helper.DeserializeFromJSON(Response);
          _ = Data;

          //Update stats
          i++;
          var ElapsedTime = DateTime.Now - StartTime;
          if (ElapsedTime.TotalMilliseconds >= 1000)
          {
              this.OnHostMessages.Invoke("Messages per second: " + i);
              i = 0;
              StartTime = DateTime.Now;
          }
     }
}

这主要是您需要自己编写的样板网络消息代码。如果您只需要发送 1 条消息,则删除 while 循环。保留 ExitSignal 检查作为 if 语句无害,volatile bool 可以根据需要频繁检查。

网络流支持字符串行和二进制字节数组的传输。

在此特定演示中,我们将一个对象序列化和反序列化为 JSON XML 作为 string,并通过 Write/Read-Line 来回发送(.NET 会自动将其转换为 ASCII 字符 bytes[] 数组,如果您需要其他字符集,.NET 提供了多种转换辅助方法)。

这里有一个要点是,如果连接因任何原因丢失,写入和读取流可能会抛出 IOException,您需要在 Try...Catch 块中处理它。如果您需要处理或冒泡异常,可以使用 throw; 而不是 return;

如前所述,当 Thread 或任务继续执行或中止/抛出异常时,Usings 块将清理网络连接。

通常,我从不使用 throw ex;,因为它会丢弃最顶层的异常信息,而 throw; 会保留完整的堆栈跟踪/内部异常。

TCP 客户端

TCP 客户端的 CLI 和 Host 类与 TCP 服务器基本相同,完整的版本已附加并可下载。

下面说明的唯一区别是客户端使用 TCP Client 而不是 TCP Listener,并且 ConnectionLooper() 函数更简单,因为我们不需要处理多个并发的入站连接。

using System;
using System.Net.Sockets;
using System.Threading;

#region Public Functions
public virtual void Run()
{
    if (this.IsRunning)
       return; //Already running, only one running instance allowed.

    this.IsRunning = true;
    this.ExitSignal = false;

    while (!this.ExitSignal)
        this.ConnectionLooper();

    this.IsRunning = false;
}
#endregion

#region Protected Functions
protected virtual void ConnectionLooper()
{
     this.OnMessage.Invoke("Attempting server connection... ");
     using (var Client = new TcpClient())
     {
         try
         {
            Client.Connect(this.Host, this.Port);
         }
         catch(SocketException ex)
         {
             this.OnMessage.Invoke(ex.Message);
             //Server is unavailable, wait before re-trying
             Thread.Sleep(this.ConnectionAttemptDelayInMS); 
             return; //Swallow exception
         }

         if (!Client.Connected) //exit function if not connected
             return;

         using (var netstream = Client.GetStream())
         {
             //Process the connection
             this.OnHandleConnection.Invoke(netstream); 
         }
    }
}
#endregion

这里,我们只使用 Thread Sleep。如果您在任务(ThreadPool管理器)中托管客户端,则首选 Task.Delay

如果您不需要外部 ExitSignal 进行优雅退出,请随时将其删除,这样可以简化实现,即您的进程将在准备好时自行退出。我仅将其添加到演示示例中,以展示需要该功能的人可以如何实现。

如果您只进行一次连接或间歇性连接,则不需要包含的 While 循环。在此示例中,我们无限期等待服务器上线,如果您的要求不同,您需要根据您的要求进行自定义。

此外,如果您需要从同一进程中建立多个并发客户端连接,也可以做到,在这种情况下,您肯定会想要使用与 Server 类类似的方案,该方案利用可等待的 WaitAny() 异步 TaskList 模式,而不是本示例代码中所示的简单同步 Connect() 循环。

结论

希望有人觉得这段代码有用。

我能够在一个单核 VM 上实现高达 74000 次双向 JSON 交换,因此性能应该不错。在此示例中,服务器使用 .NET4.8,客户端使用 Core3.1。

如果您不序列化对象,它应该运行得更快,可能只受限于网卡带宽或 CPU 速度。

历史

  • 2020年6月10日 - 版本 1.0
© . All rights reserved.