如何使用 SocketAsyncEventArgs 类






4.86/5 (39投票s)
关于如何使用 SocketAsyncEventArgs 类的文章。
引言
我一直在寻找用于客户端套接字的高性能代码。之前,我基于 Socket
类的传统异步编程模型方法(BeginSend
、BeginReceive
等)编写了一段代码。但它没有满足我需要的性能要求。然后,我发现了基于事件的异步操作的新模型(参见 MSDN 杂志 2007 年 9 月刊的“使用 .NET Framework 3.5 连接”)。
背景
异步编程模型 (APM) 广泛应用于 I/O 密集型应用中,以实现高性能,因为它减少了阻塞线程。自从第一个版本以来,APM 已经在 .NET Framework 中实现,并且从那时起一直在改进,使用 C# 3.0 中的 lambda 表达式等新技术。专门针对套接字编程,APM 的新模型提供了更简单的编码,更不用说性能优势了。它致力于使用 SocketAsyncEventArgs
类来保持 I/O 操作之间的上下文,这减少了对象分配和垃圾回收的工作。
SocketAsyncEventArgs
类在 .NET Framework 2.0 SP1 中也可用,并且本文中的代码是使用 Microsoft Visual Studio .NET 2008 编写的。
Using the Code
为了开始使用 SocketAsyncEventArgs
类,我研究了 MSDN 中的示例,但是缺少一些东西:AsyncUserToken
类。我理解该类应该公开一个 Socket
属性,该属性对应于用于执行 I/O 操作的套接字。但是如何在 accept 操作结束之前保存从客户端接收到的数据呢?由于 UserToken
是一个 Object
,它可以接受任何东西,所以我创建了一个 Token
类来跟踪 accept 操作。下面显示的是修改后的方法,用于将 Token
类的实例用作 UserToken
。
// Process the accept for the socket listener.
private void ProcessAccept(SocketAsyncEventArgs e)
{
Socket s = e.AcceptSocket;
if (s.Connected)
{
try
{
SocketAsyncEventArgs readEventArgs = this.readWritePool.Pop();
if (readEventArgs != null)
{
// Get the socket for the accepted client connection and put it into the
// ReadEventArg object user token.
readEventArgs.UserToken = new Token(s, this.bufferSize);
Interlocked.Increment(ref this.numConnectedSockets);
Console.WriteLine("Client connection accepted.
There are {0} clients connected to the server",
this.numConnectedSockets);
if (!s.ReceiveAsync(readEventArgs))
{
this.ProcessReceive(readEventArgs);
}
}
else
{
Console.WriteLine("There are no more available sockets to allocate.");
}
}
catch (SocketException ex)
{
Token token = e.UserToken as Token;
Console.WriteLine("Error when processing data received from {0}:\r\n{1}",
token.Connection.RemoteEndPoint, ex.ToString());
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
// Accept the next connection request.
this.StartAccept(e);
}
}
// This method is invoked when an asynchronous receive operation completes.
// If the remote host closed the connection, then the socket is closed.
// If data was received then the data is echoed back to the client.
private void ProcessReceive(SocketAsyncEventArgs e)
{
// Check if the remote host closed the connection.
if (e.BytesTransferred > 0)
{
if (e.SocketError == SocketError.Success)
{
Token token = e.UserToken as Token;
token.SetData(e);
Socket s = token.Connection;
if (s.Available == 0)
{
// Set return buffer.
token.ProcessData(e);
if (!s.SendAsync(e))
{
// Set the buffer to send back to the client.
this.ProcessSend(e);
}
}
else if (!s.ReceiveAsync(e))
{
// Read the next block of data sent by client.
this.ProcessReceive(e);
}
}
else
{
this.ProcessError(e);
}
}
else
{
this.CloseClientSocket(e);
}
}
// This method is invoked when an asynchronous send operation completes.
// The method issues another receive on the socket to read any additional
// data sent from the client.
private void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
// Done echoing data back to the client.
Token token = e.UserToken as Token;
if (!token.Connection.ReceiveAsync(e))
{
// Read the next block of data send from the client.
this.ProcessReceive(e);
}
}
else
{
this.ProcessError(e);
}
}
我还修改了代码以显示您可以在哪里操作侦听器接收到的消息。在示例中,我在 Token
类中创建了 ProcessData
方法,以将接收到的消息回显到客户端。
为了控制侦听器的生命周期,使用了 Mutex
类的实例。基于原始 Init
方法的 Start
方法创建互斥锁,相应的 Stop
方法释放互斥锁。这些方法适用于将套接字服务器实现为 Windows 服务。
// Starts the server such that it is listening
// for incoming connection requests.
internal void Start(Int32 port)
{
// Get host related information.
IPAddress[] addressList =
Dns.GetHostEntry(Environment.MachineName).AddressList;
// Get endpoint for the listener.
IPEndPoint localEndPoint =
new IPEndPoint(addressList[addressList.Length - 1], port);
// Create the socket which listens for incoming connections.
this.listenSocket = new Socket(localEndPoint.AddressFamily,
SocketType.Stream, ProtocolType.Tcp);
if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)
{
// Set dual-mode (IPv4 & IPv6) for the socket listener.
// 27 is equivalent to IPV6_V6ONLY socket
// option in the winsock snippet below,
// based on http://blogs.msdn.com/wndp/archive/2006/10/24/
// creating-ip-agnostic-applications-part-2-dual-mode-sockets.aspx
this.listenSocket.SetSocketOption(SocketOptionLevel.IPv6,
(SocketOptionName)27, false);
this.listenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any,
localEndPoint.Port));
}
else
{
// Associate the socket with the local endpoint.
this.listenSocket.Bind(localEndPoint);
}
// Start the server.
this.listenSocket.Listen(this.numConnections);
// Post accepts on the listening socket.
this.StartAccept(null);
mutex.WaitOne();
}
// Stop the server.
internal void Stop()
{
mutex.ReleaseMutex();
}
现在我们有了一个套接字服务器,下一步是使用 SocketAsyncEventArgs
类创建一个套接字客户端。尽管 MSDN 说该类是专门为网络服务器应用程序设计的,但是在使用此 APM 的客户端代码中没有任何限制。下面是 SocketClien
类,以这种方式编写
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace SocketAsyncClient
{
// Implements the connection logic for the socket client.
internal sealed class SocketClient : IDisposable
{
// Constants for socket operations.
private const Int32 ReceiveOperation = 1, SendOperation = 0;
// The socket used to send/receive messages.
private Socket clientSocket;
// Flag for connected socket.
private Boolean connected = false;
// Listener endpoint.
private IPEndPoint hostEndPoint;
// Signals a connection.
private static AutoResetEvent autoConnectEvent =
new AutoResetEvent(false);
// Signals the send/receive operation.
private static AutoResetEvent[]
autoSendReceiveEvents = new AutoResetEvent[]
{
new AutoResetEvent(false),
new AutoResetEvent(false)
};
// Create an uninitialized client instance.
// To start the send/receive processing call the
// Connect method followed by SendReceive method.
internal SocketClient(String hostName, Int32 port)
{
// Get host related information.
IPHostEntry host = Dns.GetHostEntry(hostName);
// Address of the host.
IPAddress[] addressList = host.AddressList;
// Instantiates the endpoint and socket.
hostEndPoint =
new IPEndPoint(addressList[addressList.Length - 1], port);
clientSocket = new Socket(hostEndPoint.AddressFamily,
SocketType.Stream, ProtocolType.Tcp);
}
// Connect to the host.
internal void Connect()
{
SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
connectArgs.UserToken = clientSocket;
connectArgs.RemoteEndPoint = hostEndPoint;
connectArgs.Completed +=
new EventHandler<socketasynceventargs />(OnConnect);
clientSocket.ConnectAsync(connectArgs);
autoConnectEvent.WaitOne();
SocketError errorCode = connectArgs.SocketError;
if (errorCode != SocketError.Success)
{
throw new SocketException((Int32)errorCode);
}
}
/// Disconnect from the host.
internal void Disconnect()
{
clientSocket.Disconnect(false);
}
// Calback for connect operation
private void OnConnect(object sender, SocketAsyncEventArgs e)
{
// Signals the end of connection.
autoConnectEvent.Set();
// Set the flag for socket connected.
connected = (e.SocketError == SocketError.Success);
}
// Calback for receive operation
private void OnReceive(object sender, SocketAsyncEventArgs e)
{
// Signals the end of receive.
autoSendReceiveEvents[SendOperation].Set();
}
// Calback for send operation
private void OnSend(object sender, SocketAsyncEventArgs e)
{
// Signals the end of send.
autoSendReceiveEvents[ReceiveOperation].Set();
if (e.SocketError == SocketError.Success)
{
if (e.LastOperation == SocketAsyncOperation.Send)
{
// Prepare receiving.
Socket s = e.UserToken as Socket;
byte[] receiveBuffer = new byte[255];
e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
e.Completed +=
new EventHandler<socketasynceventargs />(OnReceive);
s.ReceiveAsync(e);
}
}
else
{
ProcessError(e);
}
}
// Close socket in case of failure and throws
// a SockeException according to the SocketError.
private void ProcessError(SocketAsyncEventArgs e)
{
Socket s = e.UserToken as Socket;
if (s.Connected)
{
// close the socket associated with the client
try
{
s.Shutdown(SocketShutdown.Both);
}
catch (Exception)
{
// throws if client process has already closed
}
finally
{
if (s.Connected)
{
s.Close();
}
}
}
// Throw the SocketException
throw new SocketException((Int32)e.SocketError);
}
// Exchange a message with the host.
internal String SendReceive(String message)
{
if (connected)
{
// Create a buffer to send.
Byte[] sendBuffer = Encoding.ASCII.GetBytes(message);
// Prepare arguments for send/receive operation.
SocketAsyncEventArgs completeArgs = new SocketAsyncEventArgs();
completeArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);
completeArgs.UserToken = clientSocket;
completeArgs.RemoteEndPoint = hostEndPoint;
completeArgs.Completed +=
new EventHandler<socketasynceventargs />(OnSend);
// Start sending asynchronously.
clientSocket.SendAsync(completeArgs);
// Wait for the send/receive completed.
AutoResetEvent.WaitAll(autoSendReceiveEvents);
// Return data from SocketAsyncEventArgs buffer.
return Encoding.ASCII.GetString(completeArgs.Buffer,
completeArgs.Offset, completeArgs.BytesTransferred);
}
else
{
throw new SocketException((Int32)SocketError.NotConnected);
}
}
#region IDisposable Members
// Disposes the instance of SocketClient.
public void Dispose()
{
autoConnectEvent.Close();
autoSendReceiveEvents[SendOperation].Close();
autoSendReceiveEvents[ReceiveOperation].Close();
if (clientSocket.Connected)
{
clientSocket.Close();
}
}
#endregion
}
}
关注点
我有一个在集群环境中运行的套接字服务器的经验。在这种情况下,您不能使用主机地址列表中的第一个条目。相反,您应该使用最后一个地址,如 Start
方法中所示。这里介绍的另一种技术是如何为 IP6 地址族设置双模式,如果您想在默认启用 IP6 的 Windows Vista 或 Windows Server 2008 中运行服务器,这将很有帮助。
这两个程序都使用命令行参数来运行。在客户端示例中,如果服务器和客户端都在 Windows 域之外的计算机上运行,则应将“localhost”告知为服务器的名称,而不是计算机名。
历史
- 2008 年 1 月 15 日 - 发布原始版本
- 2010 年 9 月 28 日 - 更新文章和服务器示例