65.9K
CodeProject 正在变化。 阅读更多。
Home

现成的 TCP 套接字

starIconstarIconstarIconstarIconstarIcon

5.00/5 (42投票s)

2012 年 7 月 5 日

CPOL

9分钟阅读

viewsIcon

135862

downloadIcon

8249

用于方便使用 TCP 套接字的包装器。

引言

在现代软件系统中,数据传输通常使用客户端和服务器都已知类型的序列化对象进行。然而,在许多情况下,仍然使用连续的字节流和原始 TCP 套接字(例如,出于性能或兼容性考虑)。我最近遇到了这种用于传输金融信息、医疗数据和“软”传感器输出的通信方式。尽管 TCP 套接字有详细的文档,但使用它们需要编写繁琐且有时棘手的代码。本文介绍了一个 TCP 套接字包装器,它允许开发人员将其用作“黑盒”,而无需处理实现细节。

该包装器提供以下功能:

  • 实现 TCP 客户端(连接发起者)和服务器(连接接受者)
  • 同步(阻塞)和异步(非阻塞)数据发送
  • 为每个套接字在独立的专用线程中处理接收到的数据
  • 处理连续数据流
  • 通知重要事件、错误和异常
  • 发送方识别慢速接收者
  • 等待服务器启动的可能性
  • 连接丢失时自动可配置的重连尝试

上述大部分功能将在本文中详细讨论。

代码描述

包装器本身位于程序集 IL.TcpCommunicationLib.dll 中。它包含一个实现大部分功能的基类 TcpChannel,以及两个派生类,即 TcpServerTcpClient。程序集 IL.WorkerThreadLib.dll 提供了一个用于多线程的支持类 WorkerThread(该类型可独立使用,与通信目的无关,可用于专用线程中的任何循环处理)。要建立通信,开发人员需要提供服务器和客户端的本地 IP 地址(或者可以自动获取),用于接收数据处理和通知的回调方法(可选),以及:

  • 对于服务器:调用 static 方法 TcpServer.StartAcceptSubscribersOnPort() 以在给定端口上开始监听传入连接
  • 对于客户端:创建 TcpClient 类型的对象,并调用其 Connect() 方法以发起与服务器的连接

使用包装器类进行通信的过程如下:服务器和客户端都提供/自动获取各自的本地 IP 地址。服务器通过调用 static 方法 TcpServer.StartAcceptSubscribersOnPort() 开始在指定端口上监听传入连接。客户端调用 Connect() 方法,提供服务器正在监听的 IP 地址和端口。服务器接受客户端的呼叫,并实际与客户端建立连接,自动为此连接分配一个与监听端口不同的端口。建立连接后,服务器和客户端都会在内部调用 Receive() 方法以接收对方发送的字节数组。 TcpChannel.Receive() 同步接收套接字上的数据。这样做是为了确保严格的字节顺序处理,并避免框架线程池使用过多的线程。 Receive() 方法提供连续监听传入字节流的功能,并将接收到的字节放入线程安全的队列中进行处理。接收到的字节在 WorkerThread 类型对象的专用线程中处理。 Receive() 为读取和处理线程提供回调委托。处理线程的回调方法依次调用用户在 TcpClient(并向下到 TcpChannel)构造函数中提供的 onReceived 事件(对于客户端),或者在 AcceptBegin() 中的回调(对于服务器)。

在数据交换期间,类型为 EventHandler<TcpChannelReceivedEventArgs>onReceived 事件会由处理线程定期调用,以解析接收到的字节。事件处理程序应由用户实现。其第一个参数为 TcpChannel 类型,第二个参数为 TcpChannelReceivedEventArgs 类型,包含接收到的字节块。由于 onReceived 始终从同一个处理线程调用,因此其处理程序是线程安全的。

连续接收的字节应被解析以获取所需数据,通常是已知类型对象的形式。传入流可以通过分隔符或固定字段长度分割成特定的记录(对象)。给定的传入 string 片段可能在开头和结尾包含不完整的记录。为了应对这种情况,TcpChannel 类提供了一个 UnparsedBytes 属性,类型为 byte[],用于保存最后一个完整记录和当前接收到的数据块末尾之间的字节。这些字节(如果存在)将放在下一个接收到的数据块之前,在 TcpChannelReceivedEventArgs.BtsReceived 属性中,以确保该属性始终以新记录开头。用户提供的 onReceived 事件处理程序负责在当前数据块解析结束时将最后一个(不完整)记录的前导字节填充到 TcpChannel.UnparsedBytes 属性中。最好将未解析的字节保留在 TcpChannel 对象本身中,而不是保留在调用者对象中,因为调用者对象可能包含多个 TcpChannel 对象,强制使用额外的(同步)字典。

事件和诊断

严格来说,onReceived 事件的处理程序是 TcpChannel 用户唯一必须实现的强制性回调。但是套接字包装器类型提供了一系列关于其内部各种事件的通知,为调用者提供相关数据。这些通知可以通过类型为 EventHandler<TcpChannelNotifyEventArgs> 的事件获得。它们可用于状态切换、诊断和日志记录。除了 TcpChannel 通知事件外,TcpServer 还可以使用 static onServerNotifies 来报告发生在 TcpServer.static 方法中(在连接建立之前)的事件。由调用者实现的 onServerNotifies 事件处理程序是 TcpServer.StartAcceptSubscribersOnPort() 静态方法的第四个可选参数。 TcpServer 可能提供一个更特殊的事件 onInitConnectionToServer。当服务器接受客户端的传入连接请求,创建适当的 TcpServer 对象(事件调用中的第一个“sender”参数),并为此连接分配套接字时,它的委托会被调用。在其处理程序中,服务器可以向客户端发送一些连接确认消息。 onInitConnectionToServerTcpServer.StartAcceptSubscribersOnPort() 静态方法的第三个可选参数。

对于发送方来说,识别“慢速接收者”的情况通常很重要,即套接字执行 send 操作所需的时间超过套接字的 SendTimeout 参数。例如,这可能由于网络带宽不够宽或接收方接收数据太慢而发生。 TcpChannel 类型可以诊断同步(通过分析 onSyncSendException 事件引发的异常)和异步(onAsyncSendSendingTimeoutExceeded 事件)发送模式下的这种情况。

代码示例

小型应用程序 SampleApp.exe 说明了套接字包装器类型的使用。根据其参数,该应用程序可以充当服务器或客户端。以下代码片段展示了其 Main() 方法的一部分。

// Set local host for all channels (both server and client) in the machine
TcpChannel.LocalHost = localHost;
onReceived = new TcpChannelEventHandler<tcpchannelreceivedeventargs>((tcpChannelSender, e) =>
    {
        if (e.AreBytesAvailable)
        {
            // Some processing of received bytes
            if (tcpChannelSender != null)
                tcpChannelSender.Send(/* bytes or string to send */));
        }
    });

switch (role)
{
    case Role.Server:
        onInitConnectionToServer = new TcpChannelEventHandler<EventArgs>((tcpServerSender, e) =>
            {
                SetEventHandlers(tcpServerSender);
                tcpServerSender.Send(/* bytes or string to send */);
            });
        onServerNotifies = new TcpChannelEventHandler<TcpChannelNotifyEventArgs>(
            (tcpServerSender, e) =>
            {
                // Some notification processing, e.g. logging
            });
        TcpServer.StartAcceptSubscribersOnPort(localPort, onReceived,
                            onInitConnectionToServer, onServerNotifies);
        isListening = true;
        break;

    case Role.Client:
        TcpClient tcpClient = new TcpClient(onReceived, 
                                            localPort.ToString(),// id
                                            15,                  // receiveTimeoutInSec
                                             9,                  // reconnectionAttempts
                                            10);               // delayBetweenReconnectionAttemptsInSec  
        SetEventHandlers(tcpClient);                                            
        tcpClient.Connect(remoteHost, remotePort);
        break;
}

我们来讨论上面的代码。服务器和客户端调用应用程序都需要实现 onReceived 事件的处理程序。服务器还实现了 onInitConnectionToServeronServerNotifies 事件的处理程序。服务器调用 static 方法 TcpServer.StartAcceptSubscribersOnPort() 以在 localPort 上开始监听来自客户端的传入连接请求。当建立连接后,onInitConnectionToServer 事件会被调用,并将新创建的 TcpServer 对象作为发送方(第一个参数)。客户端首先通过调用 TcpClient 公共构造函数来创建一个 TcpClient 对象。为了提供一定程度的灵活性,static 方法 TcpServer.StartAcceptSubscribersOnPort()TcpClient 类型的构造函数有几个参数,但其中大多数都有默认值。下表提供了有关参数的信息:

参数 类型 描述 默认值 相关
onReceived

TcpChannelEventHandler <TcpChannelReceivedEventArgs>

由调用者实现的事件。接收到数据块时调用。 不适用 服务器 & 客户端
id 字符串 用于标识给定 TcpChannel 对象的参数。 null 服务器 & 客户端
receiveTimeoutInSec int 如果在该时间间隔(秒)内 TcpClient 未收到传入数据,则认为通道已关闭。 15 秒 客户端
reconnectionAttempts int TcpClient 在认为通道已关闭后将尝试的最大连续重连次数。 0 客户端

delayBetweenReconnectionAttemptsInSec

int 两次连续重连尝试之间的时间间隔(秒)。 15 秒 客户端
socketReceiveTimeoutInSec int 转换为毫秒并分配给套接字的 ReceiveTimeout 参数。 15 秒 服务器 & 客户端
socketSendTimeoutInSec int 转换为毫秒并分配给套接字的 SendTimeout 参数。 5 秒 服务器 & 客户端
socketReceiveBufferSize int 分配给套接字的 ReceiveBufferSize 参数。 128 KB 服务器 & 客户端
socketSendBufferSize int 分配给套接字的 SendBufferSize 参数。 128 KB 服务器 & 客户端

方法 SetEventHandlers()TcpChannelTcpClient 的通知事件分配处理程序。

static void SetEventHandlers(TcpChannel tcpChannel)
{
    if (tcpChannel == null)
        return;

    tcpChannel.onSocketNullOrNotConnected += ((tcpChannelSender, e) => 
        { 
            // Event handler    
        });

    // Other event handles assignment for TcpChannel

    TcpClient tcpClient = tcpChannel as TcpClient;
    if (tcpClient != null)
    {
        tcpClient.onSocketConnectionFailed += ((tcpClientSender, e) =>
            {
                // Event handler    
            });

        // Other event handles assignment for TcpClient
    }
}

对于服务器,此方法由 onInitConnectionToServer 事件处理程序调用;对于客户端,该方法紧跟在 TcpClient 构造函数之后调用。

调用 SetEventHandlers() 方法后,客户端调用 Connect() 方法,并将服务器的地址和端口作为参数传入。如果服务器正在监听传入连接并准备好接受连接,则在 tcpListener.BeginAcceptSocket() 方法中定义的 evAccept 同步事件中,会创建一个 TcpServer 类型的对象,建立与客户端的连接,并调用 onInitConnectionToServer 事件。如果服务器此时未监听传入连接,客户端仍然可以定期发送连接请求,直到服务器准备好接受为止。为了实现此功能,TcpClientTcpChannel 类型支持可配置的重连机制。此机制通过 TcpClient 构造函数的三个参数进行配置,即 receiveTimeoutInSecreconnectionAttemptsdelayBetweenReconnectionAttemptsInSec。其描述和默认值在上面的表格中给出。默认情况下,重连机制是禁用的(reconnectionAttempts 为 0)。但是通过适当的配置(例如,在上面的代码片段中),重连尝试将定期进行,包括与服务器的初始连接。重连仅是客户端的功能,就像任何连接发起一样。一旦客户端和服务器之间的连接建立,双方就可以发送(同步或异步)字节数组,接收并处理接收到的字节,使用 onReceived 事件处理程序。

要运行演示,应启动包含 SampleApp.exe 及其适当参数的命令文件 RunServer.cmdRunClient1.cmdRunClient2.cmd。如果客户端在服务器之前启动,则连接将在服务器启动后的 delayBetweenReconnectionAttemptsInSec(在示例中为 10 秒)内发生。为了演示重连功能,服务器演示样本包含一个定时器,导致服务器的周期性工作:服务器工作一分钟,然后空闲一分钟。服务器恢复工作后,客户端将在 delayBetweenReconnectionAttemptsInSec 内再次连接到服务器。在服务器空闲期间,客户端会根据其重连配置继续尝试连接到服务器。

结论

本文介绍了一个简化通过 TCP 套接字进行连续数据交换的小型库。只需要很少的用户代码。库中的类型配备了一系列用于诊断和日志记录的通知事件。套接字以同步方式接收数据,并在另一个专用线程中进行处理。

谢谢

非常感谢我的一位杰出的专业人士和朋友 Michael Molotsky,他提供了关于本文主题非常有用的讨论。

© . All rights reserved.