使用 .NET Standard 中的 TCPListener 和 TCPClient 实现高性能 TCP 客户端服务器
高性能跨平台 TCP 客户端服务器连接
引言
随着 .NET 5 的推出,以及需要从 .NET 4.8 和 .NET Core 迁移的用户,此源代码旨在提供一个示例,说明如何建立高性能的跨平台客户端服务器消息交换。该方案基于 .NET Standard,无需特定于 .NET Framework 或 .NET Core 或其他任何内容。
此外,它还解决了 TCP 会话中遇到的常见问题:消息自旋锁问题、异步内存泄漏问题以及/或在 TCP 客户端服务器实现中非常普遍的 CancellationToken
异常问题。
TCP 服务器
为了简化起见,我们将使用 CLI 项目。项目类型本身可以是 .NET 5、.NET Core 或 .NET Framework。客户端和服务器都使用 .NET Standard 语法编码,因此它们可以与这三者无缝集成。
服务器主机典型的 CLI Main()
块
using System;
public static class Program
{
public static void Main()
{
Console.WriteLine("Press esc key to stop");
int i = 0;
void PeriodicallyClearScreen()
{
i++;
if (i > 15)
{
Console.Clear();
Console.WriteLine("Press esc key to stop");
i = 0;
}
}
//Write the host messages to the console
void OnHostMessage(string input)
{
PeriodicallyClearScreen();
Console.WriteLine(input);
}
var BLL = new ServerHost.Host(OnHostMessage);
BLL.RunServerThread();
while (Console.ReadKey().Key != ConsoleKey.Escape)
{
Console.Clear();
Console.WriteLine("Press esc key to stop");
}
Console.WriteLine("Attempting clean exit");
BLL.WaitForServerThreadToStop();
Console.WriteLine("Exiting console Main.");
}
}
基本的 CLI 管道,没有什么不寻常的。
按 Esc 键退出客户端窗口,每 15 条消息清除一次窗口(仅用于调试/演示目的)。在生产版本中,请勿将实际网络消息写入控制台。
此代码块唯一需要注意的地方是,为了保持高性能,客户端和服务器托管在专用线程中。也就是说,与执行 Main 块的线程分开。这方面的逻辑包含在 RunServerThread()
函数中。
为此,我们将创建一个 Host
类并将其添加到 .NET Standard 库项目类型中。.NET Standard 库可以被 .NET 5、.NET Framework 和 .NET Core 项目引用,因此在撰写本文时,它是最灵活的选择。
在此处添加以下代码
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
public class Host
{
#region Public Functions
public virtual void RunServerThread()
{
this.ServerThread.Start();
this.OnHostMessages.Invoke("Server started");
}
public virtual void WaitForServerThreadToStop()
{
this.Server.ExitSignal = true;
this.OnHostMessages.Invoke("Exit Signal sent to server thread");
this.OnHostMessages.Invoke("Joining server thread");
this.ServerThread.Join();
this.OnHostMessages.Invoke("Server thread has exited gracefully");
}
#endregion
}
RunServerThread()
函数启动将运行服务器的 Thread
。
WaitForServerThreadToStop()
函数向服务器线程发出信号,使其尽快优雅地退出。然后,我们将服务器线程加入到调用线程(在此例中是 Main()
线程),否则 CLI 窗口将终止/中止,从清理/异常处理的角度来看,我们不希望这样做;优雅/干净的退出是首选。
向服务器主机类添加支持变量和构造函数
#region Public Delegates
public delegate void HostMessagesDelegate(string message);
#endregion
#region Variables
protected readonly StandardServer.Server Server;
protected readonly Thread ServerThread;
#region Callbacks
protected readonly HostMessagesDelegate OnHostMessages;
#endregion
#endregion
#region Constructor
public Host(HostMessagesDelegate onHostMessages)
{
this.OnHostMessages = onHostMessages ??
throw new ArgumentNullException(nameof(onHostMessages));
this.Server = new StandardServer.Server(this.OnMessage, this.ConnectionHandler);
this.ServerThread = new Thread(this.Server.Run);
}
#endregion
#region Protected Functions
protected virtual void OnMessage(string message)
{
this.OnHostMessages.Invoke(message);
}
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
}
#endregion
ServerThread
:托管服务器的线程(Server
类)OnHostMessages
和OnMessage
:仅用于演示目的,将消息从 TCP 连接器线程推送到客户端 CLI 窗口。(注意:对于 WinForm 应用程序,您必须在 GUI 线程上执行ISynchronizeInvoke
才能向最终用户显示消息。Console.Write
没有此限制。)ConnectionHandler
:稍后我们会回到这里。Server
:我们要编写的 TCPServer
类代码。
创建一个名为 Server
的类,并将以下代码添加到其中
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
public class Server
{
#region Public Functions
public virtual void Run()
{
if (this.IsRunning)
return; //Already running, only one running instance allowed.
this.IsRunning = true;
this.ExitSignal = false;
while (!this.ExitSignal)
this.ConnectionLooper();
this.IsRunning = false;
}
#endregion
}
由于 Run()
运行在专用线程中,您可能需要添加一个 Try Catch
块来调试任何未处理的异常,并根据需要添加日志记录。为了清晰起见,我将此留给您。
Run()
函数处理 ExitSignal
检查,并将其余的 TCP 逻辑推送到 ConnectionLooper()
函数。此时,您可能会想:“但这会是一个无限的自旋循环”,您是正确的,但我们将使用 await
来解决这个问题。
向 Server
类添加支持变量和 Constructor
#region Public Properties
private volatile bool _ExitSignal;
public virtual bool ExitSignal
{
get => this._ExitSignal;
set => this._ExitSignal = value;
}
#endregion
#region Public Delegates
public delegate void ConnectionHandlerDelegate(NetworkStream connectedAutoDisposedNetStream);
public delegate void MessageDelegate(string message);
#endregion
#region Variables
#region Init/State
protected readonly int AwaiterTimeoutInMS;
protected readonly string Host;
protected readonly int Port;
protected readonly int MaxConcurrentListeners;
protected readonly TcpListener Listener;
protected bool IsRunning;
protected List<Task> TcpClientTasks = new List<Task>();
#endregion
#region Callbacks
protected readonly ConnectionHandlerDelegate OnHandleConnection;
protected readonly MessageDelegate OnMessage;
#endregion
#endregion
#region Constructor
public Server(
MessageDelegate onMessage,
ConnectionHandlerDelegate connectionHandler,
string host = "0.0.0.0",
int port = 8080,
int maxConcurrentListeners = 10,
int awaiterTimeoutInMS = 500
)
{
this.OnMessage = onMessage ?? throw new ArgumentNullException(nameof(onMessage));
this.OnHandleConnection = connectionHandler ??
throw new ArgumentNullException(nameof(connectionHandler));
this.Host = host ?? throw new ArgumentNullException(nameof(host));
this.Port = port;
this.MaxConcurrentListeners = maxConcurrentListeners;
this.AwaiterTimeoutInMS = awaiterTimeoutInMS;
this.Listener = new TcpListener(IPAddress.Parse(this.Host), this.Port);
}
#endregion
这里没什么特别的,唯一需要注意的是 _ExitSignal
成员变量是 volatile 类型,这有助于防止 CLI Main()
线程和服务器主机线程之间发生陈旧的 get/set。对于演示目的,这比 Lock 或 Mutex 更简单,并且 CPU/内存占用可能更少。
IP 0.0.0.0 是 IPAddress Any。您可以根据需要更改或删除默认值。
在这里,我们维护一个 TcpClient
连接任务(异步任务)的 List
,它可以是一个大小为(maxConcurrentListeners
)的数组,而不是一个 List
。如果这样做,它可能会运行快几微秒。
OnMessage
仅用于演示目的,以在 CLI 窗口中显示消息。
OnHandleConnection
是一个回调,该类的使用者将在此处编写特定于其业务场景的网络逻辑。
添加以下代码来实现 ConnectionLooper()
函数
#region Protected Functions
protected virtual void ConnectionLooper()
{
while (this.TcpClientTasks.Count < this.MaxConcurrentListeners)
{
var AwaiterTask = Task.Run(async () =>
{
this.ProcessMessagesFromClient(await this.Listener.AcceptTcpClientAsync());
});
this.TcpClientTasks.Add(AwaiterTask);
}
int RemoveAtIndex = Task.WaitAny(this.TcpClientTasks.ToArray(), this.AwaiterTimeoutInMS);
if (RemoveAtIndex > 0)
this.TcpClientTasks.RemoveAt(RemoveAtIndex);
}
#endregion
在这里,我们的服务器正在异步监听一定数量的 TCP 连接(限制为 int MaxConcurrentListeners
),这可以防止内部 .NET ThreadPool
线程耗尽。这取决于 vm/server/host 的 CPU 核心数量。您的主机越强大,您就能支持越多的并发监听器。
Task await
建立一个线程继续执行,当客户端成功连接时,将继续执行 ProcessMessagesFromClient
。该函数是实际处理网络通信的地方。
然后,我们对这些可等待的任务列表执行 WaitAny()
,但只在给定的毫秒数内进行,这部分是关键。如果没有超时,我们就不会检测到 ExitSignal
变量的变化,而在多线程环境中,这对于优雅退出至关重要。WaitAny
可防止自旋锁,同时还能检测已退出的 TCPListener Tasks
(.NET 内部 ThreadPool
线程)。
它还可以避免内存泄漏无限/过多的 await、线程或任务,同时仍然异步处理连接。
AcceptTcpClientAsync
如果任务被中止,则会抛出异常。您可能希望为每个任务提供一个 CancellationToken
,并在您的需求要求时优雅地退出任务列表中的每个任务。由于我们从不调用 Listener.Stop()
,当主机 Thread
优雅退出时,在此示例代码中,清理工作由 .NET GC 内部处理。
当 WaitAny
检测到任务已完成时,将返回 RemoveAt
索引,以便我们可以在下一轮执行 ExitSignal
检查后将其从列表中简单删除,并重新添加一个新任务。
添加以下代码来实现 ProcessMessagesFromClient()
函数
protected virtual void ProcessMessagesFromClient(TcpClient Connection)
{
using (Connection)
{
if (!Connection.Connected)
return;
using (var netstream = Connection.GetStream())
{
this.OnHandleConnection.Invoke(netstream);
}
}
}
当成功连接时,此函数将在继续执行的 ThreadPool
线程中运行,通常不会与 ServerThread
或 Main()
线程是同一个线程。如果需要严格强制执行,可以添加 ConfigureAwait(false)
。
TCPClient
和 NetworkStream
会自动使用 Using
语法关闭和处置,这样,该类的使用者就不必关心清理工作。
OnHandleConnection.Invoke
基本上调用 Host
类中的 ConnectionHandler()
函数,我们现在将编写该函数。
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
if (!connectedAutoDisposedNetStream.CanRead && !connectedAutoDisposedNetStream.CanWrite)
return; //We need to be able to read and write
var writer = new StreamWriter(connectedAutoDisposedNetStream) { AutoFlush = true };
var reader = new StreamReader(connectedAutoDisposedNetStream);
var StartTime = DateTime.Now;
int i = 0;
while (!this.Server.ExitSignal) //Tight network message-loop (optional)
{
var JSON_Helper = new Helper.JSON();
string JSON = JSON_Helper.JSONstring();
string Response;
try //Communication block
{
//Synchronously send some JSON to the connected client
writer.WriteLine(JSON);
//Synchronously wait for a response from the connected client
Response = reader.ReadLine();
}
catch (IOException ex)
{
_ = ex; //Add Debug breakpoint and logging here
return; //Swallow exception and Exit function on network error
}
//Put breakpoint here to inspect the JSON string return by the connected client
Helper.SomeDataObject Data = JSON_Helper.DeserializeFromJSON(Response);
_ = Data;
//Update stats
i++;
var ElapsedTime = DateTime.Now - StartTime;
if (ElapsedTime.TotalMilliseconds >= 1000)
{
this.OnHostMessages.Invoke("Messages per second: " + i);
i = 0;
StartTime = DateTime.Now;
}
}
}
这主要是您需要自己编写的样板网络消息代码。如果您只需要发送 1 条消息,则删除 while
循环。保留 ExitSignal
检查作为 if
语句无害,volatile bool
可以根据需要频繁检查。
网络流支持字符串行和二进制字节数组的传输。
在此特定演示中,我们将一个对象序列化和反序列化为 JSON XML 作为 string
,并通过 Write/Read-Line 来回发送(.NET 会自动将其转换为 ASCII 字符 bytes[]
数组,如果您需要其他字符集,.NET 提供了多种转换辅助方法)。
这里有一个要点是,如果连接因任何原因丢失,写入和读取流可能会抛出 IOException
,您需要在 Try
...Catch
块中处理它。如果您需要处理或冒泡异常,可以使用 throw;
而不是 return;
如前所述,当 Thread
或任务继续执行或中止/抛出异常时,Usings
块将清理网络连接。
通常,我从不使用 throw ex;
,因为它会丢弃最顶层的异常信息,而 throw;
会保留完整的堆栈跟踪/内部异常。
TCP 客户端
TCP 客户端的 CLI 和 Host 类与 TCP 服务器基本相同,完整的版本已附加并可下载。
下面说明的唯一区别是客户端使用 TCP Client 而不是 TCP Listener,并且 ConnectionLooper()
函数更简单,因为我们不需要处理多个并发的入站连接。
using System;
using System.Net.Sockets;
using System.Threading;
#region Public Functions
public virtual void Run()
{
if (this.IsRunning)
return; //Already running, only one running instance allowed.
this.IsRunning = true;
this.ExitSignal = false;
while (!this.ExitSignal)
this.ConnectionLooper();
this.IsRunning = false;
}
#endregion
#region Protected Functions
protected virtual void ConnectionLooper()
{
this.OnMessage.Invoke("Attempting server connection... ");
using (var Client = new TcpClient())
{
try
{
Client.Connect(this.Host, this.Port);
}
catch(SocketException ex)
{
this.OnMessage.Invoke(ex.Message);
//Server is unavailable, wait before re-trying
Thread.Sleep(this.ConnectionAttemptDelayInMS);
return; //Swallow exception
}
if (!Client.Connected) //exit function if not connected
return;
using (var netstream = Client.GetStream())
{
//Process the connection
this.OnHandleConnection.Invoke(netstream);
}
}
}
#endregion
这里,我们只使用 Thread Sleep。如果您在任务(ThreadPool
管理器)中托管客户端,则首选 Task.Delay
。
如果您不需要外部 ExitSignal
进行优雅退出,请随时将其删除,这样可以简化实现,即您的进程将在准备好时自行退出。我仅将其添加到演示示例中,以展示需要该功能的人可以如何实现。
如果您只进行一次连接或间歇性连接,则不需要包含的 While
循环。在此示例中,我们无限期等待服务器上线,如果您的要求不同,您需要根据您的要求进行自定义。
此外,如果您需要从同一进程中建立多个并发客户端连接,也可以做到,在这种情况下,您肯定会想要使用与 Server
类类似的方案,该方案利用可等待的 WaitAny()
异步 TaskList
模式,而不是本示例代码中所示的简单同步 Connect()
循环。
结论
希望有人觉得这段代码有用。
我能够在一个单核 VM 上实现高达 74000 次双向 JSON 交换,因此性能应该不错。在此示例中,服务器使用 .NET4.8,客户端使用 Core3.1。
如果您不序列化对象,它应该运行得更快,可能只受限于网卡带宽或 CPU 速度。
历史
- 2020年6月10日 - 版本 1.0