TCP Socket 开箱即用 - 使用 Async-Await 和 .NET Core 重新审视





5.00/5 (21投票s)
方便的异步 TCP 连接组件。使用相同的代码,它可在 .NET Core 和桌面版本中使用。
引言
本文档和代码提供 .NET Core 的异步 TCP 通信实现。我之前关于此主题的文章《TCP Socket 开箱即用》介绍了一个用于同步 TCP 通信的组件。这项工作中的通信组件更简单,并具有重要的附加有用功能。特别是,它
- 使用 .NET Core,兼容所有主流操作系统;
- 应用 async-await 模式实现异步行为;
- 提供方便的远程过程调用(以下简称 RPC)和连续数据流基础设施;
- 自动处理分布在多个接收块中的序列化对象;
- 生成传输数据的带时间戳,以确定传输延迟。
该组件不需要每个连接一个专用线程,而是采用异步方法。通过使用 async-await 模式,其实现大大简化了。
要使用 .NET Core 版本的代码示例,需要安装 .NET Core 2.0。可以从这个 Microsoft 网站下载。开发过程中使用了 Microsoft Visual Studio 2017。
注意。 由于许多读者在桌面应用程序中使用套接字,我决定添加一个桌面版本。它本质上具有相同的代码。使用桌面版本不需要安装 .NET Core。文章中解释安装等部分是针对 .NET Core 版本的。
代码描述
TcpHelperLib 项目(DLL)实现了 TCP 通信。它对服务器和客户端通用。该组件的主要类型是 TcpHelper
。其构造函数允许用户通过显式提供参数(可以省略其中一些并使用其默认定义)或通过读取 JSON 配置文件隐式地配置实例。构造函数接受实例的 id
和用户提供的用于处理接收数据的 processMethod
处理程序作为参数。它由 TcpHelper
类型内部调用,并接受以下参数:
类型 | 描述 |
日期时间 | 数据发送时间(时间戳)。 |
List<byte> | 接收到的数据。 |
TcpClientWrapper | 代表通信对方的代理对象。用于发送数据回对方。 |
StateProperties | 保存连接状态的对象。 |
TcpHelper
类型提供了最重要的公共方法,即服务器的 async Listen()
和客户端的 async Connect()
。
TcpHelper
服务器对象通过激活其 Listen()
方法,开始监听给定 IP 地址上的传入客户端调用。
public async void Listen(int port, string host = null) { var listener = new TcpListener(GetHost(ref host), port); listener.Start(); // Continue listening. Log($"Server \"{Id}\" is listening on {host}:{port} ..."); TcpClient client = null; while ((client = await listener.AcceptTcpClientAsync()) != null && client.Connected) { var str = $"Client {client.Client.RemoteEndPoint} {ackConnection} \"{Id}\" {host}:{port}"; Log(str); isActive = true; var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client }; await clientWrapper.SendAsync(); await clientWrapper.SendAsync(str); Receive(client); } }
客户端也创建一个 TcpHelper
类型的对象,并调用其 Connect()
方法,向服务器发送连接请求。
public async Task<TcpClientWrapper> Connect(int port, string host = null) { var server = await GetConnectedServer(host, port); if (server.Connected) { Log($"Client \"{Id}\" connected to Server {host}:{port}."); Receive(server); } else LogError("Connection failed after reties."); return new TcpClientWrapper(Delim, DelimRepeated) { Peer = server }; } private async Task<TcpClient> GetConnectedServer(string host, int port) { var server = new TcpClient(); for (int i = 0; i < maxConnectionAttempts && !server.Connected && !cts.IsCancellationRequested; i++) { try { await server.Client.ConnectAsync(GetHost(ref host), port); } catch { await Task.Delay(intervalBetweenConnectionAttemptsInMs, cts.Token); } } return server; }
一旦来自客户端的传入调用到达,服务器端的 TcpHelper
对象就会创建一个连接,将其分配给一个端口与服务器监听的端口不同,并且具有一个新套接字。建立连接后,客户端和服务器会交换消息。Receive()
方法(服务器和客户端通用)提供消息处理。
private async void Receive(TcpClient client) { if (client == null || !client.Connected) return; var lstData = new List<byte>(); var stateProprties = new StateProperties(); using (var netStream = client.GetStream()) { var clientWrapper = new TcpClientWrapper(Delim, DelimRepeated) { Peer = client }; var buffer = new byte[receiveBufferSize]; int readBytes = 0; while ((readBytes = await ReadNetStreamAsync(netStream, buffer, cts.Token)) > 0) { lstData.AddRange(GetReceivedBuffer(buffer, readBytes)); try { ProcessReceived(lstData, clientWrapper, stateProprties); SetLastInteractionTime(); } catch (Exception e) { LogError("ProcessReceived() failed.", e); } } } }
它按连接调用,并在其 while
循环中读取接收到的数据。由于 async-await 模式,该循环不会阻塞线程,从而实现了异步套接字。
客户端和服务器的角色仅在通信开始时有所不同。客户端发起通信,服务器接受它。一旦建立通信,对等方就会交换消息。因此,我们可以讨论消息提供者和消息接收者,它们都可以是客户端或服务器。通常,这些消息包含序列化对象。为了通过顺序通道进行通信,提供者将对象序列化为字节数组,并将它们作为连续字节流发送给接收者。接收者应从这些字节中重构(反序列化)有意义的对象。为此,接收者需要知道传入字节流中序列化对象的边界。这可以通过使用可以序列化为固定大小字节数组的对象,或通过分隔字节流来实现。在本文中,实现了后一种方法。分隔符由字节流中连续重复几次的某个字节构成。字节及其重复次数是可配置的(默认情况下,字节是 char
对象“pipe” '|'
0x7C,重复次数为 3 次)。重复是为了区分分隔符和在传输的有意义数据中偶尔出现的定界符字节。与序列化数据传输相关的重要问题是将同一对象的各个部分连接起来,这些部分分布在不同的接收块中。
分隔符和对象连接问题都在 TcpHelperLib 中通过实现以下简单协议来内部解决。提供者会自动将重复的分隔符字节添加到发送的数据中,接收者会相应地解析接收到的数据。字节流如下图所示:
Receive()
方法在其 while
循环中调用 ProcessReceived()
方法。
private async void ProcessReceived(List<byte> lstData, TcpClientWrapper clientWrapper, StateProperties stateProperties) { Log($"{clientWrapper.RemoteEndPoint} ThreadId = {Thread.CurrentThread.ManagedThreadId}"); // Split to parts by delimiter and update lstData var lstParts = SplitToDelimitedParts(lstData); // Process parts foreach (var lstByte in lstParts) { ProcessingResult result = null; if (lstByte != null) { var timestamp = GetTimestamp(lstByte); if (timestamp > DateTime.MinValue) { CheckIfRpc(lstByte, stateProprties); try { result = processMethod?.Invoke(timestamp, lstByte, clientWrapper, stateProprties); } catch (Exception e) { LogError("ProcessReceived(): user supplied processMethod() callback failed.", e); } } else LogError("ProcessReceived(): timestamp == DateTime.MinValue"); } if (result != null) { byte[] bts = result.BytesToSendBack; if (bts != null && bts.Length > 0) { var task = clientWrapper.SendAsync(bts); if (result.IsSyncSend) await task; } } } }
ProcessReceived()
方法的参数包括一个字节列表 List<byte> lstData
,其中包含紧跟在分隔符之后的接收到的字节;一个 TcpClientWrapper clientWrapper
对象,代表通信对等方,用于发送响应;以及一个 StateProperties stateProperties
对象,用于保存通信状态。stateProperties
对象包含可以在 Receive()
方法(例如,在下面描述的适当场景中为 RPC 提供处理程序)或用户提供的 processMethod
回调中插入的属性。ProcessReceived()
方法调用 SplitToDelimitedParts()
方法以获取一个字节列表的列表——一个 List<List<byte>>
类型的对象。后者方法还会修改其参数 List<byte> lstData
,使其只留下接收到的字节的“尾部”(如果非空),稍后将作为“头部”添加到即将到来的接收块中。然后,对每个分隔的字节列表执行反序列化为对象。TcpClientWrapper
类型的 Task SendAsync(byte[] dataToSend)
方法用于发送字节,它会自动在发送数据的前面添加时间戳(作为 DateTime
对象的 long
表示,以 ticks 为单位),并在后面添加分隔符。接收者通过在 ProcessReceived()
方法内部调用 GetTimestamp()
方法来读取此时间戳。
在继续讲解 ProcessReceived()
方法之前,我们简要描述一下 TCP 通信对等方之间消息交换的主要场景。最常见的场景如下:
- 同步对话:当一个对等方仅响应接收到的消息而发送消息时,即顺序消息交换;
- 异步对话:当对等方非顺序地发送和接收消息时。异步对话的特例是流式传输,即一方请求另一方提供连续的消息流。这种情况可能会出现,例如,当一方订阅另一方发出的事件时。
- RPC:当通信方远程调用彼此的某些方法时。
ProcessReceived()
支持上述场景。它调用 CheckIfRpc(lstByte, stateProperties)
方法来确定是否发生了 RPC 场景。在这种情况下,接收到的字节被反序列化为 JSON 字符串,提供远程过程名称和参数。此 JSON 字符串被转换为 RemoteProcInfo
类型的实例,并放入 stateProperties
中。
用户提供的 processMethod
处理程序由 ProcessReceived()
方法在所有上述场景中调用。processMethod
不需要处理分隔和连接问题。它接收一个字节列表参数 lst
,对应于一个准备好反序列化的有用数据数组;在 RPC 的情况下,还有已经准备好的 RemoteProcInfo
对象,可以从 stateProperties
参数中检索。
对于 RPC 场景,processMethod
从 stateProperties
中提取 RemoteProcInfo
对象并执行实际的过程调用。在其他场景中,processMethod
使用 List<byte> lst
参数。作为其结果,processMethod
生成一个 ProcessingResult
类型的对象。此输出对象的内容(BytesToSendBack
或 StringToSendBack
属性)将根据 ProcessingResult
类型的 IsSyncSend
布尔属性的值,自动同步或异步地发送回通信对等方。如果 processMethod
的返回值是 null
,则在调用 processMethod
后不会将任何内容发送回通信对等方。例如,这在数据接收方的流式传输中使用,因为不需要确认收到数据。
processMethod
的 TcpClientWrapper clientWrapper
参数充当通信对等方的代理,并且可以显式用于异步发送数据给它。为了在我们示例中说明这一点,我们在计时器的处理程序中使用它来实现流式传输场景,如下面将要讨论的。
代码示例
示例展示了 TcpHelperLib 组件的用法。它是使用 .NET Core 开发的,因此可以在所有主流操作系统上运行(正如下面所示,它已在 Windows 10 和 Linux 上进行了测试)。示例解决方案由四个项目组成,即 TcpHelperLib 本身(位于 Lib 文件夹中)、Server 和 Client 控制台应用程序,以及辅助的 TestObjectLib。
服务器
下面展示了 Server 的完整代码。
using System; using System.Threading; using Newtonsoft.Json.Linq; using TcpHelperLib; using TestObjectLib; namespace AsyncSocketServer { class Server { const int port = 11511; const string JSON_CONFIG_FILE = "tcpHelperSettings.json"; const string TIMER_NAME = "Timer"; const string START_STREAMING = "Start Streaming"; static MethodCaller caller = new MethodCaller(); static void Main(string[] args) { Console.WriteLine("Async. Socket SERVER"); caller["Foo"] = rpi => { var p = rpi.Params; var jo = p[2] as JObject; var test = new Test { Str = jo.V<string>("Str"), Num = jo.V<int>("Num") }; return new ProcessingResult { StringToSendBack = $"{Implementation.Foo((string)p[0], (double)p[1], test)}", //IsSyncSend = true }; }; var server = new TcpHelper(id: "SVR", processMethod: (dt, lst, clientWrapper, stateProprties) => { var rpi = stateProprties.GetRpi("Foo"); if (rpi != null) // RPC return caller.ExecuteMethod(rpi); // STREAMING if (lst.ToStr() == START_STREAMING) // Streaming timer stateProprties[TIMER_NAME] = new Timer(async _ => { try { await clientWrapper.SendAsync($"{DateTime.Now}"); Console.WriteLine($"Streaming remote endpoint: {clientWrapper.RemoteEndPoint}"); } catch (Exception e) { (stateProprties[TIMER_NAME] as Timer)?.Dispose(); stateProprties[TIMER_NAME] = null; Console.WriteLine(e); } }, null, 1000, 500); return null; }, configFilePath: JSON_CONFIG_FILE); server.Listen(port); Console.WriteLine("Press any key to quit..."); Console.ReadKey(); server.Stop(); } } internal static class Implementation { public static string Foo(string s, double d, Test test) { return $"\"Foo()\": Echo from Server: {s} {d}, {test.Str} {test.Num}"; } } }
TcpHelper server
对象使用其 processMethod
回调创建,该回调支持 RPC 和 流式传输 场景。在 RPC 的情况下,回调从 stateProperties
中的状态获取 RemoteProcInfo
对象,并将其作为参数提供给预先创建的 MethodCaller caller
对象的 ExecuteMethod()
。caller
包含远程调用方法 Implementation.Foo()
的激活器。如果 stateProperties
中没有“Foo”的条目,则假定为流式传输场景。在这种情况下,一旦服务器收到“Start Streaming”消息,它就会启动一个计时器,定期向客户端发送异步消息(此时为服务器的时间)。在此示例中,每个连接都设置了自己的计时器,该计时器作为连接状态的一部分保存在 stateProperties
中。对于服务器,所有连接都具有相同的 processMethod
实现,但其实例是不同的。服务器在其 Listen()
方法被调用时,开始在指定 URL 上监听。
客户端
这是 Client 的完整代码。
#define INCLUDE_RPC using System; using System.Collections.Generic; using System.Threading; using TcpHelperLib; using TestObjectLib; namespace AsyncSocketClient { class Client { static Timer timer; static SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); static bool isStreaming = false; const int port = 11511; const string START_STREAMING = "Start Streaming"; static TcpHelper clientStream = null; static TcpClientWrapper tcwStream = null; #if INCLUDE_RPC static TcpHelper clientRpc = null; static TcpClientWrapper tcwRpc = null; #endif static void Main(string[] args) { Console.WriteLine("Async. Socket CLIENT"); // Connection timer timer = new Timer(async _ => { int maxIdleTimeInSec = 10; await semaphore.WaitAsync(); if (clientStream != null && clientStream.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) && (tcwStream == null || !tcwStream.IsConnected)) { Console.WriteLine("Connection Timer: STREAMING"); isStreaming = false; tcwStream = await clientStream.Connect(port); } #if INCLUDE_RPC if (clientRpc != null && clientRpc.TimeSpanSinceLastInteraction > TimeSpan.FromSeconds(maxIdleTimeInSec) && (tcwRpc == null || !tcwRpc.IsConnected)) { Console.WriteLine("Connection Timer: RPC"); tcwRpc = await clientRpc.Connect(port); } #endif semaphore.Release(); }, null, 0, 1000); // STREAMING clientStream = new TcpHelper(id: "STREAMING-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) => { CommonHandler("STREAMING", timestamp, clientStream, clientWrapper, lstByte); if (!isStreaming) { isStreaming = true; return new ProcessingResult { StringToSendBack = START_STREAMING /*, IsSyncSend = true*/ }; } return null; }); #if INCLUDE_RPC // RPC clientRpc = new TcpHelper(id: "RPC-CLIENT", processMethod: (timestamp, lstByte, clientWrapper, dctState) => { CommonHandler("RPC", timestamp, clientRpc, clientWrapper, lstByte); return new ProcessingResult { StringToSendBack = Proxy.ToJson("Foo", "Apricot", 15.11, new Test { Num = 5, Str = "Mango" }), //IsSyncSend = true }; }); #endif Console.WriteLine("Press any key to quit..."); Console.ReadKey(); clientStream.Stop(); #if INCLUDE_RPC clientRpc.Stop(); #endif } static void CommonHandler(string name, DateTime timestamp, TcpHelper clientHelper, TcpClientWrapper clientWrapper, List<byte> lstByte) { const int delayLimitinMs = 1500; var delay = DateTime.Now - timestamp; if (delay > TimeSpan.FromMilliseconds(delayLimitinMs)) Console.WriteLine($"******** {name}: Large Delay: {delay}, exceeds limit of {delayLimitinMs} ms"); Console.WriteLine($"{name}: {timestamp} ** {lstByte.ToStr()}"); } } }
客户端被创建为 TcpHelper
类型的对象。clientStream
对象提供流式传输场景,而 clientRpc
对象代表RPC 场景(后者在定义了 INCLUDE_RPC
时使用)。每个客户端支持一个到服务器的连接,并具有自己的 processMethod
回调实现。客户端通过调用其 Connect()
方法开始连接。作为对 Connect()
调用建立连接的响应,服务器会回复一个众所周知的确认消息,该消息由相应客户端的 processMethod
进行处理。clientStream
一旦向服务器发送开始流式传输的请求,然后其 processMethod
会持续收到相应的消息。在 clientRpc
中,其 processMethod
生成一个 ProcessingResult
输出对象,该对象将异步发送(因为 IsSyncSend
未设置为 true)。它发送 JSON 调用以执行远程过程,并以 ProcessingResult
类型对象的形式接收结果。
TcpHelperLib 组件配备了“内置”日志记录。其输出级别可以通过构造函数参数或配置文件进行配置。在我们的示例中,日志记录输出到控制台。TcpHelperLib 组件本身的日志消息(在服务器和客户端中)都显示在控制台窗口中,并带有缩进,以“TcpHelper: ”前缀开头。Server 和 Client 测试应用程序的消息则不带缩进地输出到控制台。
运行示例
Windows
可运行的示例二进制文件可以通过文章开头的Download demo链接下载。为了准备这些二进制文件,从Server和Client的项目文件夹执行了以下命令:
dotnet publish -o publish -c Release
(为方便起见,此命令已放入这些文件夹中的 publish.cmd 命令文件中),然后将两个新出现的 publish 文件夹的内容一起复制到一个通用文件夹(我们称之为 AsyncTcp_bin)。JSON 配置文件 tcpHelperSettings.json 也可以放置在这个通用文件夹中。要运行 Client 和 Server,应在 AsyncTcp_bin 文件夹中打开两个不同的控制台窗口,并执行以下命令:
dotnet Server.dll
和
dotnet Client.dll
Linux
为了在 Linux 上运行示例,我使用了一个安装了 Oracle VirtualBox 的 Linux 虚拟机。这里只需要服务器环境。可以从其网站下载 VirtualBox,并且其安装指南可以在例如这里找到。在安装完虚拟机后,我们需要安装一个用于 Windows 和 Linux 之间文件交换的工具。WinSCP 可以完成此任务(缩写代表 Windows Secure Copy)。可以从其网站安装该应用程序。虽然可以使用 VirtualBox 的显示窗口来输入命令,但 PuTTY 终端模拟器对此非常有帮助。可以从这里下载。
在 Windows 中准备好的 AsyncTcp_bin(或从Download demo下载的)文件夹的内容应该被传输到 Linux 环境。这可以使用 WinSCP 应用程序完成。可以使用 PuTTY 应用程序运行与 Windows 中相同的 dotnet Client.dll
和 dotnet Server.dll
命令。
测试用例
作为第一个测试用例,我们运行一个 Server 和一个 Client 应用程序。它们可以按任何顺序运行,因为 Client 会尝试连接 Server(如果它不可用,可配置相应参数)。如果定义了 INCLUDE_RPC
,则在 Client 中可以看到流式传输和RPC 场景的消息(否则仅为流式传输)。对于 RPC 场景,Server 会在控制台写入其 TcpHelperLib 组件的消息,指示 processMethod
回调正在执行的线程。正如您将看到的,线程会不时变化。这表明实现了基于 async-await 模式的异步消息接收机制。Client 和 Server 应用程序都可以通过按下控制台窗口中的任意键来停止。我们可以停止其中任何一个,在另一个中看到相应的异常消息,然后重新启动已停止的应用程序。连接将恢复,Client 和 Server 将像最初一样运行(请注意,在 Linux 虚拟机中,只有在一段时间后才能成功重新启动 - 可能之前的套接字没有立即被销毁)。我们可以启动一个 Server 并连接多个 Client。每个客户端与服务器建立自己的连接以进行 RPC 和流式传输。与前一个场景类似,重新启动任何应用程序都会导致自动重新连接。
有趣的是测试消息在多个接收块中出现的场景。对于这个测试,我们可以将接收缓冲区长度减小到大约 29 字节。这可以通过配置和 TcpHelper
构造函数的参数来实现。但为了简化测试,只需取消注释用注释 //1
标记的两行即可。现在,如果我们在此处用注释 //1*
标记的行处设置断点,它将被命中。即使使用如此小的接收缓冲区,我们也应该得到正确的结果,尽管我们会看到服务器端对 processMethod
回调的更多调用。
结论
本文档介绍了一个使用 .NET Core 开发的 TCP 连接组件,用于在不同操作系统中使用。它基于 async-await 模式,以避免线程阻塞。该组件为同步和异步消息交换以及远程过程调用提供了基础设施。附带了服务器和客户端的测试控制台应用程序。