现成的 TCP 套接字





5.00/5 (42投票s)
用于方便使用 TCP 套接字的包装器。
引言
在现代软件系统中,数据传输通常使用客户端和服务器都已知类型的序列化对象进行。然而,在许多情况下,仍然使用连续的字节流和原始 TCP 套接字(例如,出于性能或兼容性考虑)。我最近遇到了这种用于传输金融信息、医疗数据和“软”传感器输出的通信方式。尽管 TCP 套接字有详细的文档,但使用它们需要编写繁琐且有时棘手的代码。本文介绍了一个 TCP 套接字包装器,它允许开发人员将其用作“黑盒”,而无需处理实现细节。
该包装器提供以下功能:
- 实现 TCP 客户端(连接发起者)和服务器(连接接受者)
- 同步(阻塞)和异步(非阻塞)数据发送
- 为每个套接字在独立的专用线程中处理接收到的数据
- 处理连续数据流
- 通知重要事件、错误和异常
- 发送方识别慢速接收者
- 等待服务器启动的可能性
- 连接丢失时自动可配置的重连尝试
上述大部分功能将在本文中详细讨论。
代码描述
包装器本身位于程序集 IL.TcpCommunicationLib.dll 中。它包含一个实现大部分功能的基类 TcpChannel
,以及两个派生类,即 TcpServer
和 TcpClient
。程序集 IL.WorkerThreadLib.dll 提供了一个用于多线程的支持类 WorkerThread
(该类型可独立使用,与通信目的无关,可用于专用线程中的任何循环处理)。要建立通信,开发人员需要提供服务器和客户端的本地 IP 地址(或者可以自动获取),用于接收数据处理和通知的回调方法(可选),以及:
- 对于服务器:调用
static
方法TcpServer.StartAcceptSubscribersOnPort()
以在给定端口上开始监听传入连接 - 对于客户端:创建
TcpClient
类型的对象,并调用其Connect()
方法以发起与服务器的连接
使用包装器类进行通信的过程如下:服务器和客户端都提供/自动获取各自的本地 IP 地址。服务器通过调用 static
方法 TcpServer.StartAcceptSubscribersOnPort()
开始在指定端口上监听传入连接。客户端调用 Connect()
方法,提供服务器正在监听的 IP 地址和端口。服务器接受客户端的呼叫,并实际与客户端建立连接,自动为此连接分配一个与监听端口不同的端口。建立连接后,服务器和客户端都会在内部调用 Receive()
方法以接收对方发送的字节数组。 TcpChannel.Receive()
同步接收套接字上的数据。这样做是为了确保严格的字节顺序处理,并避免框架线程池使用过多的线程。 Receive()
方法提供连续监听传入字节流的功能,并将接收到的字节放入线程安全的队列中进行处理。接收到的字节在 WorkerThread
类型对象的专用线程中处理。 Receive()
为读取和处理线程提供回调委托。处理线程的回调方法依次调用用户在 TcpClient
(并向下到 TcpChannel
)构造函数中提供的 onReceived
事件(对于客户端),或者在 AcceptBegin()
中的回调(对于服务器)。
在数据交换期间,类型为 EventHandler<TcpChannelReceivedEventArgs>
的 onReceived
事件会由处理线程定期调用,以解析接收到的字节。事件处理程序应由用户实现。其第一个参数为 TcpChannel
类型,第二个参数为 TcpChannelReceivedEventArgs
类型,包含接收到的字节块。由于 onReceived
始终从同一个处理线程调用,因此其处理程序是线程安全的。
连续接收的字节应被解析以获取所需数据,通常是已知类型对象的形式。传入流可以通过分隔符或固定字段长度分割成特定的记录(对象)。给定的传入 string
片段可能在开头和结尾包含不完整的记录。为了应对这种情况,TcpChannel
类提供了一个 UnparsedBytes
属性,类型为 byte[]
,用于保存最后一个完整记录和当前接收到的数据块末尾之间的字节。这些字节(如果存在)将放在下一个接收到的数据块之前,在 TcpChannelReceivedEventArgs.BtsReceived
属性中,以确保该属性始终以新记录开头。用户提供的 onReceived
事件处理程序负责在当前数据块解析结束时将最后一个(不完整)记录的前导字节填充到 TcpChannel.UnparsedBytes
属性中。最好将未解析的字节保留在 TcpChannel
对象本身中,而不是保留在调用者对象中,因为调用者对象可能包含多个 TcpChannel
对象,强制使用额外的(同步)字典。
事件和诊断
严格来说,onReceived
事件的处理程序是 TcpChannel
用户唯一必须实现的强制性回调。但是套接字包装器类型提供了一系列关于其内部各种事件的通知,为调用者提供相关数据。这些通知可以通过类型为 EventHandler<TcpChannelNotifyEventArgs>
的事件获得。它们可用于状态切换、诊断和日志记录。除了 TcpChannel
通知事件外,TcpServer
还可以使用 static
onServerNotifies
来报告发生在 TcpServer.static
方法中(在连接建立之前)的事件。由调用者实现的 onServerNotifies
事件处理程序是 TcpServer.StartAcceptSubscribersOnPort()
静态方法的第四个可选参数。 TcpServer
可能提供一个更特殊的事件 onInitConnectionToServer
。当服务器接受客户端的传入连接请求,创建适当的 TcpServer
对象(事件调用中的第一个“sender
”参数),并为此连接分配套接字时,它的委托会被调用。在其处理程序中,服务器可以向客户端发送一些连接确认消息。 onInitConnectionToServer
是 TcpServer.StartAcceptSubscribersOnPort()
静态方法的第三个可选参数。
对于发送方来说,识别“慢速接收者”的情况通常很重要,即套接字执行 send 操作所需的时间超过套接字的 SendTimeout
参数。例如,这可能由于网络带宽不够宽或接收方接收数据太慢而发生。 TcpChannel
类型可以诊断同步(通过分析 onSyncSendException
事件引发的异常)和异步(onAsyncSendSendingTimeoutExceeded
事件)发送模式下的这种情况。
代码示例
小型应用程序 SampleApp.exe 说明了套接字包装器类型的使用。根据其参数,该应用程序可以充当服务器或客户端。以下代码片段展示了其 Main()
方法的一部分。
// Set local host for all channels (both server and client) in the machine
TcpChannel.LocalHost = localHost;
onReceived = new TcpChannelEventHandler<tcpchannelreceivedeventargs>((tcpChannelSender, e) =>
{
if (e.AreBytesAvailable)
{
// Some processing of received bytes
if (tcpChannelSender != null)
tcpChannelSender.Send(/* bytes or string to send */));
}
});
switch (role)
{
case Role.Server:
onInitConnectionToServer = new TcpChannelEventHandler<EventArgs>((tcpServerSender, e) =>
{
SetEventHandlers(tcpServerSender);
tcpServerSender.Send(/* bytes or string to send */);
});
onServerNotifies = new TcpChannelEventHandler<TcpChannelNotifyEventArgs>(
(tcpServerSender, e) =>
{
// Some notification processing, e.g. logging
});
TcpServer.StartAcceptSubscribersOnPort(localPort, onReceived,
onInitConnectionToServer, onServerNotifies);
isListening = true;
break;
case Role.Client:
TcpClient tcpClient = new TcpClient(onReceived,
localPort.ToString(),// id
15, // receiveTimeoutInSec
9, // reconnectionAttempts
10); // delayBetweenReconnectionAttemptsInSec
SetEventHandlers(tcpClient);
tcpClient.Connect(remoteHost, remotePort);
break;
}
我们来讨论上面的代码。服务器和客户端调用应用程序都需要实现 onReceived
事件的处理程序。服务器还实现了 onInitConnectionToServer
和 onServerNotifies
事件的处理程序。服务器调用 static
方法 TcpServer.StartAcceptSubscribersOnPort()
以在 localPort
上开始监听来自客户端的传入连接请求。当建立连接后,onInitConnectionToServer
事件会被调用,并将新创建的 TcpServer
对象作为发送方(第一个参数)。客户端首先通过调用 TcpClient
公共构造函数来创建一个 TcpClient
对象。为了提供一定程度的灵活性,static
方法 TcpServer.StartAcceptSubscribersOnPort()
和 TcpClient
类型的构造函数有几个参数,但其中大多数都有默认值。下表提供了有关参数的信息:
参数 | 类型 | 描述 | 默认值 | 相关 |
onReceived | TcpChannelEventHandler <TcpChannelReceivedEventArgs> | 由调用者实现的事件。接收到数据块时调用。 | 不适用 | 服务器 & 客户端 |
id | 字符串 | 用于标识给定 TcpChannel 对象的参数。 | null | 服务器 & 客户端 |
receiveTimeoutInSec | int | 如果在该时间间隔(秒)内 TcpClient 未收到传入数据,则认为通道已关闭。 | 15 秒 | 客户端 |
reconnectionAttempts | int | TcpClient 在认为通道已关闭后将尝试的最大连续重连次数。 | 0 | 客户端 |
delayBetweenReconnectionAttemptsInSec | int | 两次连续重连尝试之间的时间间隔(秒)。 | 15 秒 | 客户端 |
socketReceiveTimeoutInSec | int | 转换为毫秒并分配给套接字的 ReceiveTimeout 参数。 | 15 秒 | 服务器 & 客户端 |
socketSendTimeoutInSec | int | 转换为毫秒并分配给套接字的 SendTimeout 参数。 | 5 秒 | 服务器 & 客户端 |
socketReceiveBufferSize | int | 分配给套接字的 ReceiveBufferSize 参数。 | 128 KB | 服务器 & 客户端 |
socketSendBufferSize | int | 分配给套接字的 SendBufferSize 参数。 | 128 KB | 服务器 & 客户端 |
方法 SetEventHandlers()
为 TcpChannel
和 TcpClient
的通知事件分配处理程序。
static void SetEventHandlers(TcpChannel tcpChannel)
{
if (tcpChannel == null)
return;
tcpChannel.onSocketNullOrNotConnected += ((tcpChannelSender, e) =>
{
// Event handler
});
// Other event handles assignment for TcpChannel
TcpClient tcpClient = tcpChannel as TcpClient;
if (tcpClient != null)
{
tcpClient.onSocketConnectionFailed += ((tcpClientSender, e) =>
{
// Event handler
});
// Other event handles assignment for TcpClient
}
}
对于服务器,此方法由 onInitConnectionToServer
事件处理程序调用;对于客户端,该方法紧跟在 TcpClient
构造函数之后调用。
调用 SetEventHandlers()
方法后,客户端调用 Connect()
方法,并将服务器的地址和端口作为参数传入。如果服务器正在监听传入连接并准备好接受连接,则在 tcpListener.BeginAcceptSocket()
方法中定义的 evAccept
同步事件中,会创建一个 TcpServer
类型的对象,建立与客户端的连接,并调用 onInitConnectionToServer
事件。如果服务器此时未监听传入连接,客户端仍然可以定期发送连接请求,直到服务器准备好接受为止。为了实现此功能,TcpClient
和 TcpChannel
类型支持可配置的重连机制。此机制通过 TcpClient
构造函数的三个参数进行配置,即 receiveTimeoutInSec
、reconnectionAttempts
和 delayBetweenReconnectionAttemptsInSec
。其描述和默认值在上面的表格中给出。默认情况下,重连机制是禁用的(reconnectionAttempts
为 0)。但是通过适当的配置(例如,在上面的代码片段中),重连尝试将定期进行,包括与服务器的初始连接。重连仅是客户端的功能,就像任何连接发起一样。一旦客户端和服务器之间的连接建立,双方就可以发送(同步或异步)字节数组,接收并处理接收到的字节,使用 onReceived
事件处理程序。
要运行演示,应启动包含 SampleApp.exe 及其适当参数的命令文件 RunServer.cmd、RunClient1.cmd 和 RunClient2.cmd。如果客户端在服务器之前启动,则连接将在服务器启动后的 delayBetweenReconnectionAttemptsInSec
(在示例中为 10 秒)内发生。为了演示重连功能,服务器演示样本包含一个定时器,导致服务器的周期性工作:服务器工作一分钟,然后空闲一分钟。服务器恢复工作后,客户端将在 delayBetweenReconnectionAttemptsInSec
内再次连接到服务器。在服务器空闲期间,客户端会根据其重连配置继续尝试连接到服务器。
结论
本文介绍了一个简化通过 TCP 套接字进行连续数据交换的小型库。只需要很少的用户代码。库中的类型配备了一系列用于诊断和日志记录的通知事件。套接字以同步方式接收数据,并在另一个专用线程中进行处理。
谢谢
非常感谢我的一位杰出的专业人士和朋友 Michael Molotsky,他提供了关于本文主题非常有用的讨论。