65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 .NET Framework 进行异步套接字通信

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.64/5 (23投票s)

2003年9月1日

6分钟阅读

viewsIcon

208287

downloadIcon

5457

一篇关于使用 .NET Framework 进行异步套接字通信的文章。

引言

我最近在编写一个基于服务器的组件,该组件碰巧需要 .NET Framework 中的 TCP/IP 套接字通信。我发现 .NET Framework 增强了,并且在某些方面,改进了 MFC 框架提供的现有套接字类。MFC 类在其设计的框架(MFC 和文档中心范例)中运行良好。MFC 框架非常适合 GUI 应用程序,并且在该领域表现出色,但在服务器应用程序等其他领域则表现不佳。CAscyncSocket 能够合理地执行异步套接字操作,但仍然与 MFC 框架及其附带的臃肿程序绑定。.NET Framework 的套接字和网络类应运而生。在我看来,这些类是 C# 中套接字 API 的绝佳实现。它们逻辑上封装了使用 TcpClient、TcpListener 和 IpEndpoint 类的通信。这些类分别负责套接字连接的双向通信、监听新的套接字连接请求以及封装 TCP/IP 通信的终结点。然而,这仍然不足以创建可重用的异步套接字库,因为要实现稳健的异步通信,需要某种形式的线程。在我看来,.NET Framework 的线程类比以前更容易使用。线程类包含许多线程 API 函数,这些函数被封装为静态函数。构造也非常直接。这使我们进入一个相关主题:线程池。线程池可用于有效地管理多个线程。如果您的应用程序有几项需要多线程的小任务,通过使用 ThreadPool 类,您可以利用系统控制的线程。

使用代码

//add the two threads for Receive and transmit
ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveThreadEntryPoint));
ThreadPool.QueueUserWorkItem(new WaitCallback(SendThreadEntryPoint));

上面的代码说明了如何将线程添加到 ThreadPool 类。它创建了两个用于发送和接收数据的线程,并使用 QueueUserWorkItem 将它们添加到 ThreadPool 中。WaitCallback 对象是 ThreadPool 类的委托对象 - 它将线程函数封装为回调。系统将排队每个线程并调用回调 - 所有这些都无需进一步交互。下面显示了 QueueUserWorkItem 函数。

public static bool QueueUserWorkItem(
   WaitCallback callBack,
   object state
);
参数
  • callBack

    一个 WaitCallback,表示当线程池中的线程拾取工作项时要调用的委托。

  • 状态

    从线程池服务时传递给委托的对象。

顾名思义,.NET Framework 的 Socket 类封装了 Berkeley 套接字实现,并且是可获得的最低级别的套接字访问。下面的代码显示了如何使用 Socket 类连接到远程主机。

// find an endpoint using DNS
IPAddress addrHost = Dns.Resolve("ftp.microsoft.com").AddressList[0];

// create an endpoint
IPEndPoint IPHost = new IPEndPoint(addrHost, FTP_PORT);

// create a TCP socket
Socket s = new Socket(AddressFamily.InterNetwork, 
               SocketType.Stream, ProtocolType.Tcp );

// connect to host described by endpoint
s.Connect(IPHost);

上面的代码将创建一个解析为 Microsoft 公共 FTP 站点的 TCP/IP 终结点。然后,它将创建一个用于流式通信的 TCP/IP 套接字。最后,调用 Connect 方法以连接到先前创建的终结点。

让我们看一下 Socket 构造函数。

public Socket(
  AddressFamily addressFamily,
  SocketType socketType,
  ProtocolType protocolType
);
参数
  • addressFamily
  • socketType
  • protocolType

[来自 MSDN]

AddressFamily 成员指定 Socket 实例用于解析地址的寻址方案。例如,InterNetwork 指示在 Socket 连接到终结点时预期使用 IPv4 地址。SocketType 成员指定 Socket 类实例表示的套接字类型。SocketType 会隐式指示将在 AddressFamily 中使用哪个 ProtocolType。例如,当 SocketType 为 Dgram 时,ProtocolType 始终为 UDP。当 SocketType 为 Stream 时,ProtocolType 始终为 TCP。ProtocolType 指定 Socket 类支持的协议。ProtocolType 枚举由 Socket 类使用,用于向 Windows 套接字 API 指示套接字的请求协议。计算机上必须存在所请求协议的低级驱动程序软件才能创建 Socket。

异步的最后一块拼图在于 Socket 函数 BeginRead、BeginWrite、EndRead 和 EndWrite。加上一些线程同步的技巧,我们就拥有了异步套接字管理器所需的所有工具。

IAsyncResult iar;
SocketStateObject so2 = new SocketStateObject(_socket);
iar = _socket.BeginReceive(so2.buffer,
0, SocketStateObject.BUFFER_SIZE, 0,
new AsyncCallback(AsynchReadCallback), so2);

上面的代码片段展示了 Socket 方法 BeginReceive 的用法。让我们更详细地了解一下该方法。

public IAsyncResult BeginReceive(
   byte[] buffer,
   int offset,
   int size,
   SocketFlags socketFlags,
   AsyncCallback callback,
   object state
);
参数
  • buffer

    用于存储接收数据的存储位置。

  • offset

    在 buffer 参数中,要存储接收数据的零基位置。

  • size

    要接收的字节数。

  • socketFlags

    SocketFlags 值的按位组合。

  • callback

    AsyncCallback 委托。

  • 状态

    包含此请求状态信息的对象。

      AsyncCallback 委托值得关注。这就是我们将实现数据处理的地方,因为 BeginXXX 方法仅启动异步操作。我们必须提供一个委托函数来处理异步操作的结果。下面是一个使用 AsynchCallback 委托函数的示例。

      private void AsynchReadCallback(IAsyncResult ar)
      {
          SocketStateObject so = (SocketStateObject)ar.AsyncState;
          Socket s = so.WorkSocket;
      
          try
          {
              // sanity check
              if (s == null || !s.Connected) return;
      
              // call EndRecieve which will return the number of bytes read
              int read = s.EndReceive(ar);
              if (read > 0)
              {
                  // get the buffer as a string
                  string msg = Encoding.ASCII.GetString(so.buffer, 0, read);
                  if (OnReceive != null)
                  {
                      // call delegate to inform client
                      OnReceive(_myID, msg);
                  }
      
                  // and start receiving more
                  s.BeginReceive(so.buffer,
                          0, SocketStateObject.BUFFER_SIZE, 0,
                          new AsyncCallback(AsynchReadCallback), so);
              }
          }
          catch {}
      }
      
      public void AsynchSendCallback(System.IAsyncResult ar)
      {
          SocketStateObject so = (SocketStateObject)ar.AsyncState;
          Socket s = so.WorkSocket;
      
          try
          {
              // sanity check
              if (s == null || !s.Connected) return;
              int send = s.EndSend(ar);
          }
          catch {}
      }

      上面的代码首先使用 ar 参数的 AsynchState 属性获取用户定义的 IAsyncResult 对象。然后获取状态对象中存储的 Socket,并在此 Socket 上调用 EndRecieve 方法。接收处理函数将循环直到没有更多字节可读取,此时它将重新进入主循环。当仍有数据可供接收时,接收处理程序将继续调用 BeginRecieve。发送函数要简单得多,它只会调用 EndSend 并终止。请注意,EndRecieve 和 EndSend 很可能会阻塞。

      让我们开始查看一个简陋的套接字管理器所需的接口。我们显然需要 Send 和 Receive 方法。现在我们来看看它们。下面是一个公共 Send 方法的示例。

      public void SendMessage(string Message)
      {
          // queue it...
          TransmitLock.AcquireWriterLock(-1);
          try
          {
              TransmitQueue.Enqueue(Message);
          }
          catch {}
          finally { TransmitLock.ReleaseWriterLock(); }
      
          // signal that data was sent
          DataReady.Set();
      }

      上面的代码首先使用临界区对象提供线程安全。然后将消息添加到传输队列,并释放临界区对象。最后,引发 DataReady 事件,这是一个 AutoResetEvent 对象,用于通知发送线程数据已准备好进行处理。

      我们需要下一个公共方法是 Connect 方法。请参阅下面的示例。

      public int Connect(string hostName, int serviceport)
      {
          //no need to do anything once connected
          if (_isConnected) return -1;
      
          // resolve...
          IPHostEntry hostEntry = Dns.Resolve(hostName);
          if ( hostEntry != null )
          {
              // create an end-point for the first address...
              IPEndPoint endPoint = new 
                     IPEndPoint(hostEntry.AddressList[0], serviceport);
      
              // connect to host described by endpoint
              _socket.Connect(endPoint); OnConnect(_myID, true);
      
              // spin up the threads...
              //add the two threads for Receive and transmit
              ThreadPool.QueueUserWorkItem
                      (new WaitCallback(ReceiveThreadEntryPoint));
              ThreadPool.QueueUserWorkItem
                      (new WaitCallback(SendThreadEntryPoint));
      
              // set connected flag
              _isConnected = true;
      
              // return this unique id
              return _myID;
          }
          else
          {
              return -1;
          }
      }

      上面的代码结合了我们已经见过的两个代码片段,用于连接 Socket 和向 ThreadPool 添加线程。Disconnect 方法显示在下面。

      public void Disconnect()
      {
          if (_isConnected)
          {
              _isConnected = false;
      
              // signal the threads to end
              StopEvent.Set();
      
              // now kill the socket
              if (_socket != null)
              {
                  _socket.Close();
              }
              _socket = null;
          }
      }

      上面的代码使用 ManualResetEvent 对象 StopEvent 来向工作线程发出信号,使其终止其循环。

      让我们开始查看一些实现细节。我们将从从线程池执行的发送和接收线程开始。请参阅下面的代码示例。

      public void ReceiveThreadEntryPoint(object state)
      {
          try
          {
              // loop forever...
              while( true )
              {
                  WaitHandle[] handles = new WaitHandle[1];
                  handles[0] = StopEvent;
      
                  if ( _socket != null && _socket.Connected )
                  {
                      // not disconnected
                      try
                      {
                          // start the receive operation
                          System.IAsyncResult iar;
                          SocketStateObject so2 = new SocketStateObject(_socket);
                          iar = _socket.BeginReceive(so2.buffer,
                                  0, SocketStateObject.BUFFER_SIZE, 0,
                                  new AsyncCallback(AsynchReadCallback), so2);
      
                          if( WaitHandle.WaitAny(handles) == 0 )
                          {
                              // stop event was signaled, break out of loop
                              break;
                          }
                      }
                      catch {}
                  }
              }
          }
          catch {}
      }
      
      public void SendThreadEntryPoint(object state)
      {
          try
          {
              Queue workQueue = new Queue();
      
              // loop...
              while( true )
              {
                  WaitHandle[] handles = new WaitHandle[2];
                  handles[0] = StopEvent;
                  handles[1] = DataReady;
      
                  if( WaitHandle.WaitAny(handles) == 0 )
                  {
                      // if the stop event was signalled, time to exit the loop
                      break;
                  }
                  else if (_socket != null && _socket.Connected)
                  {
                      // not disconnected
                      // go through the queue...
                      TransmitLock.AcquireWriterLock(-1);
                      try
                      {
                          workQueue.Clear();
                          foreach( string message in _transmitQueue)
                          {
                              workQueue.Enqueue(message);
                          }
                          TransmitQueue.Clear();
                      }
                      catch {}
                      finally
                      {
                          TransmitLock.ReleaseWriterLock();
                      }
      
                      // loop the outbound messages...
                      foreach( string message in workQueue )
                      {
                          SocketStateObject so2 = new SocketStateObject(_socket);
                          byte[] buff = Encoding.ASCII.GetBytes(message);
      
                          // send it...
                          System.IAsyncResult iar;
                          iar = _socket.BeginSend(buff,
                                  0, buff.Length, 0,
                                  new AsyncCallback(AsynchSendCallback), so2);
                      }
                  }
              }
          }
          catch {}
      }

      以上说明了 AsynchSocketManager 类中整个代码中最复杂的部分。但是,您会发现,在分析了前面的代码片段后,它很容易理解。第一个函数是线程池管理的接收线程。该函数一直循环,直到引发 StopEvent,这表示收到了断开连接消息。如果未引发 StopEvent,则接收线程函数会调用 BeginRecieve。此调用将阻塞直到数据准备好接收,这将由前面介绍的异步回调 AsynchReadCallback 处理。下一个函数是由线程池管理的发送线程函数。此函数一直循环,直到引发 StopEvent 或 DataReady 事件。如果引发 StopEvent,则函数退出,表示断开连接。如果 DataReady 事件被引发,函数将删除 TransmitQueue 中当前的所有项,并将它们添加到 workQueue 中,然后通过异步 BeginSend 调用一次发送一个。

      关注点

      异步套接字 I/O 可能很简单,但正确的线程安全和同步至关重要。ThreadPool 对象有助于解决终止线程的一些难题,尤其是当析构函数未被调用时(当主线程意外终止时可能会发生这种情况)。这可能导致线程仍在运行或处于不确定的状态。当我们允许 ThreadPool 管理我们的线程时,它会使我们免受这些影响。在我的下一篇文章中,我将讨论在 ThreadPool 管理的套接字服务器中使用 .NET 套接字类,并使用 I/O 完成端口。

    • © . All rights reserved.