65.9K
CodeProject 正在变化。 阅读更多。
Home

如何使用 SocketAsyncEventArgs 类

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.86/5 (39投票s)

2008 年 1 月 14 日

CPOL

3分钟阅读

viewsIcon

709857

downloadIcon

20024

关于如何使用 SocketAsyncEventArgs 类的文章。

引言

我一直在寻找用于客户端套接字的高性能代码。之前,我基于 Socket 类的传统异步编程模型方法(BeginSendBeginReceive 等)编写了一段代码。但它没有满足我需要的性能要求。然后,我发现了基于事件的异步操作的新模型(参见 MSDN 杂志 2007 年 9 月刊的“使用 .NET Framework 3.5 连接”)。

背景

异步编程模型 (APM) 广泛应用于 I/O 密集型应用中,以实现高性能,因为它减少了阻塞线程。自从第一个版本以来,APM 已经在 .NET Framework 中实现,并且从那时起一直在改进,使用 C# 3.0 中的 lambda 表达式等新技术。专门针对套接字编程,APM 的新模型提供了更简单的编码,更不用说性能优势了。它致力于使用 SocketAsyncEventArgs 类来保持 I/O 操作之间的上下文,这减少了对象分配和垃圾回收的工作。

SocketAsyncEventArgs 类在 .NET Framework 2.0 SP1 中也可用,并且本文中的代码是使用 Microsoft Visual Studio .NET 2008 编写的。

Using the Code

为了开始使用 SocketAsyncEventArgs 类,我研究了 MSDN 中的示例,但是缺少一些东西:AsyncUserToken 类。我理解该类应该公开一个 Socket 属性,该属性对应于用于执行 I/O 操作的套接字。但是如何在 accept 操作结束之前保存从客户端接收到的数据呢?由于 UserToken 是一个 Object,它可以接受任何东西,所以我创建了一个 Token 类来跟踪 accept 操作。下面显示的是修改后的方法,用于将 Token 类的实例用作 UserToken

// Process the accept for the socket listener.
private void ProcessAccept(SocketAsyncEventArgs e)
{
    Socket s = e.AcceptSocket;
    if (s.Connected)
    {
        try
        {
            SocketAsyncEventArgs readEventArgs = this.readWritePool.Pop();
            if (readEventArgs != null)
            {
                // Get the socket for the accepted client connection and put it into the 
                // ReadEventArg object user token.
                readEventArgs.UserToken = new Token(s, this.bufferSize);

                Interlocked.Increment(ref this.numConnectedSockets);
                Console.WriteLine("Client connection accepted. 
			There are {0} clients connected to the server",
                    this.numConnectedSockets);

                if (!s.ReceiveAsync(readEventArgs))
                {
                    this.ProcessReceive(readEventArgs);
                }
            }
            else
            {
                Console.WriteLine("There are no more available sockets to allocate.");
            }
        }
        catch (SocketException ex)
        {
            Token token = e.UserToken as Token;
            Console.WriteLine("Error when processing data received from {0}:\r\n{1}", 
			token.Connection.RemoteEndPoint, ex.ToString());
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }

        // Accept the next connection request.
        this.StartAccept(e);
    }
}

// This method is invoked when an asynchronous receive operation completes. 
// If the remote host closed the connection, then the socket is closed.
// If data was received then the data is echoed back to the client.
private void ProcessReceive(SocketAsyncEventArgs e)
{
    // Check if the remote host closed the connection.
    if (e.BytesTransferred > 0)
    {
        if (e.SocketError == SocketError.Success)
        {
            Token token = e.UserToken as Token;
            token.SetData(e);

            Socket s = token.Connection;
            if (s.Available == 0)
            {
                // Set return buffer.
                token.ProcessData(e);
                if (!s.SendAsync(e))
                {
                    // Set the buffer to send back to the client.
                    this.ProcessSend(e);
                }
            }
            else if (!s.ReceiveAsync(e))
            {
                // Read the next block of data sent by client.
                this.ProcessReceive(e);
            }
        }
        else
        {
            this.ProcessError(e);
        }
    }
    else
    {
        this.CloseClientSocket(e);
    }
}

// This method is invoked when an asynchronous send operation completes.
// The method issues another receive on the socket to read any additional 
// data sent from the client.
private void ProcessSend(SocketAsyncEventArgs e)
{
    if (e.SocketError == SocketError.Success)
    {
        // Done echoing data back to the client.
        Token token = e.UserToken as Token;
        if (!token.Connection.ReceiveAsync(e))
        {
            // Read the next block of data send from the client.
            this.ProcessReceive(e);
        }
    }
    else
    {
        this.ProcessError(e);
    }
}

我还修改了代码以显示您可以在哪里操作侦听器接收到的消息。在示例中,我在 Token 类中创建了 ProcessData 方法,以将接收到的消息回显到客户端。

为了控制侦听器的生命周期,使用了 Mutex 类的实例。基于原始 Init 方法的 Start 方法创建互斥锁,相应的 Stop 方法释放互斥锁。这些方法适用于将套接字服务器实现为 Windows 服务。

// Starts the server such that it is listening
// for incoming connection requests.
internal void Start(Int32 port)
{
    // Get host related information.
    IPAddress[] addressList = 
          Dns.GetHostEntry(Environment.MachineName).AddressList;
    // Get endpoint for the listener.
    IPEndPoint localEndPoint = 
          new IPEndPoint(addressList[addressList.Length - 1], port);

    // Create the socket which listens for incoming connections.
    this.listenSocket = new Socket(localEndPoint.AddressFamily, 
                        SocketType.Stream, ProtocolType.Tcp);

    if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)
    {
        // Set dual-mode (IPv4 & IPv6) for the socket listener.
        // 27 is equivalent to IPV6_V6ONLY socket
        // option in the winsock snippet below,
        // based on http://blogs.msdn.com/wndp/archive/2006/10/24/
        //   creating-ip-agnostic-applications-part-2-dual-mode-sockets.aspx
        this.listenSocket.SetSocketOption(SocketOptionLevel.IPv6, 
                                         (SocketOptionName)27, false);
        this.listenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, 
                               localEndPoint.Port));
    }
    else
    {
        // Associate the socket with the local endpoint.
        this.listenSocket.Bind(localEndPoint);
    }

    // Start the server.
    this.listenSocket.Listen(this.numConnections);

    // Post accepts on the listening socket.
    this.StartAccept(null);

    mutex.WaitOne();
}

// Stop the server.
internal void Stop()
{
    mutex.ReleaseMutex();
}

现在我们有了一个套接字服务器,下一步是使用 SocketAsyncEventArgs 类创建一个套接字客户端。尽管 MSDN 说该类是专门为网络服务器应用程序设计的,但是在使用此 APM 的客户端代码中没有任何限制。下面是 SocketClien 类,以这种方式编写

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace SocketAsyncClient
{
    // Implements the connection logic for the socket client.
    internal sealed class SocketClient : IDisposable
    {
        // Constants for socket operations.
        private const Int32 ReceiveOperation = 1, SendOperation = 0;

        // The socket used to send/receive messages.
        private Socket clientSocket;

        // Flag for connected socket.
        private Boolean connected = false;

        // Listener endpoint.
        private IPEndPoint hostEndPoint;

        // Signals a connection.
        private static AutoResetEvent autoConnectEvent = 
                              new AutoResetEvent(false); 

        // Signals the send/receive operation.
        private static AutoResetEvent[] 
                autoSendReceiveEvents = new AutoResetEvent[]
        {
            new AutoResetEvent(false),
            new AutoResetEvent(false)
        };

        // Create an uninitialized client instance.
        // To start the send/receive processing call the
        // Connect method followed by SendReceive method.
        internal SocketClient(String hostName, Int32 port)
        {
            // Get host related information.
            IPHostEntry host = Dns.GetHostEntry(hostName);

            // Address of the host.
            IPAddress[] addressList = host.AddressList;

            // Instantiates the endpoint and socket.
            hostEndPoint = 
              new IPEndPoint(addressList[addressList.Length - 1], port);
            clientSocket = new Socket(hostEndPoint.AddressFamily, 
                               SocketType.Stream, ProtocolType.Tcp);
        }

        // Connect to the host.
        internal void Connect()
        {
            SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();

            connectArgs.UserToken = clientSocket;
            connectArgs.RemoteEndPoint = hostEndPoint;
            connectArgs.Completed += 
               new EventHandler<socketasynceventargs />(OnConnect);

            clientSocket.ConnectAsync(connectArgs);
            autoConnectEvent.WaitOne();

            SocketError errorCode = connectArgs.SocketError;
            if (errorCode != SocketError.Success)
            {
                throw new SocketException((Int32)errorCode);
            }
        }

        /// Disconnect from the host.
        internal void Disconnect()
        {
            clientSocket.Disconnect(false);
        }

        // Calback for connect operation
        private void OnConnect(object sender, SocketAsyncEventArgs e)
        {
            // Signals the end of connection.
            autoConnectEvent.Set();

            // Set the flag for socket connected.
            connected = (e.SocketError == SocketError.Success);
        }

        // Calback for receive operation
        private void OnReceive(object sender, SocketAsyncEventArgs e)
        {
            // Signals the end of receive.
            autoSendReceiveEvents[SendOperation].Set();
        }

        // Calback for send operation
        private void OnSend(object sender, SocketAsyncEventArgs e)
        {
            // Signals the end of send.
            autoSendReceiveEvents[ReceiveOperation].Set();

            if (e.SocketError == SocketError.Success)
            {
                if (e.LastOperation == SocketAsyncOperation.Send)
                {
                    // Prepare receiving.
                    Socket s = e.UserToken as Socket;

                    byte[] receiveBuffer = new byte[255];
                    e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
                    e.Completed += 
                      new EventHandler<socketasynceventargs />(OnReceive);
                    s.ReceiveAsync(e);
                }
            }
            else
            {
                ProcessError(e);
            }
        }

        // Close socket in case of failure and throws
        // a SockeException according to the SocketError.
        private void ProcessError(SocketAsyncEventArgs e)
        {
            Socket s = e.UserToken as Socket;
            if (s.Connected)
            {
                // close the socket associated with the client
                try
                {
                    s.Shutdown(SocketShutdown.Both);
                }
                catch (Exception)
                {
                    // throws if client process has already closed
                }
                finally
                {
                    if (s.Connected)
                    {
                        s.Close();
                    }
                }
            }

            // Throw the SocketException
            throw new SocketException((Int32)e.SocketError);
        }

        // Exchange a message with the host.
        internal String SendReceive(String message)
        {
            if (connected)
            {
                // Create a buffer to send.
                Byte[] sendBuffer = Encoding.ASCII.GetBytes(message);

                // Prepare arguments for send/receive operation.
                SocketAsyncEventArgs completeArgs = new SocketAsyncEventArgs();
                completeArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);
                completeArgs.UserToken = clientSocket;
                completeArgs.RemoteEndPoint = hostEndPoint;
                completeArgs.Completed += 
                  new EventHandler<socketasynceventargs />(OnSend);

                // Start sending asynchronously.
                clientSocket.SendAsync(completeArgs);

                // Wait for the send/receive completed.
                AutoResetEvent.WaitAll(autoSendReceiveEvents);

                // Return data from SocketAsyncEventArgs buffer.
                return Encoding.ASCII.GetString(completeArgs.Buffer, 
                       completeArgs.Offset, completeArgs.BytesTransferred);
            }
            else
            {
                throw new SocketException((Int32)SocketError.NotConnected);
            }
        }

        #region IDisposable Members

        // Disposes the instance of SocketClient.
        public void Dispose()
        {
            autoConnectEvent.Close();
            autoSendReceiveEvents[SendOperation].Close();
            autoSendReceiveEvents[ReceiveOperation].Close();
            if (clientSocket.Connected)
            {
                clientSocket.Close();
            }
        }

        #endregion
    }
}

关注点

我有一个在集群环境中运行的套接字服务器的经验。在这种情况下,您不能使用主机地址列表中的第一个条目。相反,您应该使用最后一个地址,如 Start 方法中所示。这里介绍的另一种技术是如何为 IP6 地址族设置双模式,如果您想在默认启用 IP6 的 Windows Vista 或 Windows Server 2008 中运行服务器,这将很有帮助。

这两个程序都使用命令行参数来运行。在客户端示例中,如果服务器和客户端都在 Windows 域之外的计算机上运行,则应将“localhost”告知为服务器的名称,而不是计算机名。

历史

  • 2008 年 1 月 15 日 - 发布原始版本
  • 2010 年 9 月 28 日 - 更新文章和服务器示例
© . All rights reserved.