C# SocketAsyncEventArgs 高性能套接字代码
使用 .NET SocketAsyncEventArgs 编写 C# Socket 服务器代码

引言
Microsoft 创建了 SocketAsyncEventArgs 类,以帮助您编写可伸缩、高性能的 Socket 服务器代码。 SocketAsyncEventArgs 通过 .NET Socket
类中的异步方法使用 I/O 完成端口。本文介绍了一种编写 Windows 下 TCP/IP 可伸缩、高性能 Socket 代码的成熟方法,即 I/O 完成端口。 还有一个关于 I/O 完成端口 的 Microsoft 页面链接。 SocketAsyncEventArgs 帮助我们以异步工作、引发 Completed
事件、设置缓冲区空间、对象池、拥有状态对象、通过属性访问 Socket 等优势来访问 Socket,同时拥有 I/O 完成端口 的性能特征。非常棒。
本文的目的是帮助您理解使用 SocketAsyncEventArgs 类进行的基本操作。
背景
您可能像我一样,在 Microsoft 的 SocketAsyncEventArgs 类主页上开始了对该主题的研究。该页面上的示例代码给了我启发。但它也很令人困惑。我注意到他们示例代码中的一些问题是
- 似乎 Microsoft 删除了关于 UserToken 属性的示例代码。
UserToken
非常重要,因为如果您需要发布多个接收操作来接收消息,那么您将需要在操作之间存储数据的地方。发送操作也是如此。 - 示例代码中的某些方法名称以及某些变量名称有些令人困惑。
- 他们使用 Semaphore 的原因并没有真正解释。
- Microsoft 页面上关于 SetBuffer 方法的 BufferManager 代码示例展示了如何构建缓冲区。虽然他们构建 BufferManager 的代码大部分是好的,但他们在 SocketAsyncEventArgs 类的示例代码中,在
ProcessReceive
方法里处理 SetBuffer 方法的方式,在狭窄的示例中几乎都能工作。如果您发送一个 10 字节的string
,然后一个 20 字节的string
,它将无法工作。他们的代码将缓冲区设置为从客户端发送的第一条消息的大小。所以,在第一条消息之后,它只会回传前 10 个字节。因此,我们需要一个更好的示例来展示如何在接收操作完成后从缓冲区获取数据、使用数据、将数据放入缓冲区以进行发送操作,以及在发送操作之前和之后调整缓冲区大小。另外,在创建BufferManager
的SocketListener
构造函数中,Microsoft 的示例将opsToPreAlloc
变量包含在totalBytes
的计算中,但没有包含bufferSize
。这是一个错误,导致一半的总缓冲区空间未使用。 - 在对示例代码的解释中,他们说:“例如,如果一个服务器应用程序需要始终保持 15 个 Socket 接受操作处于挂起状态,以支持传入的客户端连接速率,它可以为该目的分配 15 个可重用的 SocketAsyncEventArgs 对象。” 但随后他们的示例仅包含用于接收/发送的可重用 SocketAsyncEventArgs 对象,而不是用于接受操作。接受操作的 SocketAsyncEventArgs 对象将等到接收/发送完成才执行另一个接受操作。相反,我们可以使用池,正如他们在解释中提到的,并更快地发布接受操作。
在 Microsoft 投入资源创建 SocketAsyncEventArgs 类之后,令人惊讶的是他们没有投入资源提供良好易懂的示例代码和解释来帮助我们学习如何使用它。本文旨在填补这一空白,因为该类确实做得很好,并且在您理解它之后非常有帮助。
本文中的代码是在 Visual Studio 2008 中使用 .NET 3.5 开发的。本文假定您对 Windows 中的委托和事件处理有所了解。
关于 SocketAsyncEventArgs 类,Microsoft 的网站称其需要“平台:Windows 7、Windows Vista、Windows XP SP3、Windows Server 2008、Windows Server 2003。(支持的).NET Framework:4、3.5 SP1、3.0 SP1、2.0 SP1。(支持的).NET Framework 客户端配置文件:4、3.5 SP1。”
TCP Socket 基础知识
如果您有 Socket 服务器代码经验,可以跳过本节。
Socket 类似于对一个 端口的引用或“句柄”,它允许我们访问发送到该端口的数据,该端口是内存中的一个保留空间。我们将通过一个“监听”在 TCP 端口上的 Socket 来访问网络数据。对于 Socket 编程新手来说,使用 TCP Socket 服务器的四个主要步骤是。(通常描述为六个部分,但我喜欢将前三个部分组合成一个。)
- 监听服务器上的连接请求
- 创建一个 Socket
- 将该 Socket 绑定到端口
- 使用该 Socket 进行监听
- 接受连接请求
- 通过连接进行接收/发送
- 关闭连接
要进行监听,您需要
客户端也必须做好它的工作。客户端(非服务器)可以使用 Connect
或 ConnectAsync
方法来发起连接请求。客户端机器的 Windows TCP/IP 子系统将自动为客户端机器上的 Socket 分配一个出站端口。它将通过发送一个 SYN 数据包来联系服务器,该数据包地址指向服务器的 IP 地址和端口号。客户端不监听入站连接。它总是发起连接,服务器响应。在客户端发起连接到服务器的监听 Socket 后,服务器的 Windows TCP/IP 子系统将用 SYN, ACK 进行响应。然后客户端机器的 Windows TCP/IP 子系统用 ACK 数据包进行响应。当服务器收到 ACK 时,“握手”完成,连接建立。Windows 将为您处理这些 TCP/IP 协议事务。换句话说,您不必编写 SYN、ACK、PSH、数据包以及 TCP/IP 协议的类似部分。(此处微笑。)
服务器的监听 Socket 可以维护一个连接请求队列,等待被接受。这个队列被称为“积压”(backlog)。监听 Socket 通过“接受”操作将连接信息传递给另一个 Socket,然后获取积压队列中的下一个入站连接,或者如果没有,则等待直到有新的客户端连接请求。
为了在同一端口上有多个连接,服务器的监听 Socket 必须将连接信息传递给另一个接受它的 Socket。接受 Socket不绑定到端口。您发布一个接受操作以将连接从监听 Socket 传递给接受 Socket。接受操作可以在连接建立之前发布,以便监听 Socket 立即将新的连接信息传递给接受 Socket。客户端不需要执行接受操作。
在接受操作完成后,您就可以通过该连接接收或发送数据了。(执行接受操作的同一个 SocketAsyncEventArgs 对象也可以执行接收或发送操作,如果我们发布一个接收或发送操作并在其上设置了缓冲区空间。)在下面的代码设计中,执行接受操作的 SocketAsyncEventArgs 对象将连接信息传递给另一个 SocketAsyncEventArgs 对象来执行接收/发送。“接收”也称为“读取”。“发送”也称为“写入”。(如果我们愿意,我们也可以将接收和发送分成两个单独的 SocketAsyncEventArgs 对象。但这更困难。)
客户端或服务器都可以发起关闭连接的操作。通常,客户端会发起此操作。同样,断开连接的底层 TCP/IP 由 Windows 操作系统处理。可以使用 Close
方法关闭连接,该方法销毁 Socket
并清理其托管和非托管资源。
所以,这就是使用 TCP Socket 服务器的四个主要步骤。为了编写使用它的代码,您还需要了解 TCP 的一些其他知识。
使用 TCP,客户端上的一个发送操作不一定等于服务器上的一个接收操作。客户端上的一个发送操作可能等于服务器上的一个、两个或多个接收操作。反之亦然。这种特殊性可能由缓冲区大小、网络延迟以及操作系统处理 TCP 以提高性能的方式引起。因此,您必须有一些方法来确定 TCP 消息的开始和/或结束位置。处理 TCP 消息的三种可能方法是:
- 在每条消息前加上一个整数,表示消息的长度。
- 所有消息都是固定长度。在发送消息之前,客户端和服务器都必须知道长度。
- 在每条消息后附加一个定界符,以显示其结束位置。在发送消息之前,客户端和服务器都必须知道定界符是什么。
此外,您的通信协议应包括服务器在收到每条消息后是否会向客户端发送响应(发送操作)。该响应是在一条完整的 TCP 消息之后,还是可以在多条消息之后?如果是在一条消息之后,代码可能会更简单。
好的,那么让我们考虑一下服务器在一次接收操作中可能收到的数据的情况:
- 在第一次接收操作中,接收的字节数少于前缀的长度。
- 在前一次或多次接收操作中收到部分前缀后,又收到一部分前缀,但不是全部。
- 在前一次或多次接收操作中收到部分前缀后,又收到剩余的前缀,但没有更多内容。
- 在前一次或多次接收操作中收到部分前缀后,然后收到剩余的前缀,再加上部分消息。
- 在前一次或多次接收操作中收到部分前缀后,然后收到剩余的前缀,再加上整个消息。
- 接收的字节数正好等于前缀的长度,但没有更多内容。
- 在前一次或多次接收操作中正好接收了前缀长度的字节数后,然后接收部分消息。
- 在前一次或多次接收操作中正好接收了前缀长度的字节数后,然后接收整个消息。
- 接收的字节数包含前缀和部分消息,但不包含整个消息。
- 在前一次或多次接收操作中收到前缀和部分消息后,又收到另一部分消息,但不是全部。
- 在前一次或多次接收操作中收到前缀和部分消息后,然后收到剩余的整个消息。
- 在第一次接收操作中,接收的字节数包含前缀和整个消息。
最后一个实际上是最常见的情况。但以上所有情况都可能发生并且确实会发生。如果客户端和服务器的缓冲区大小都大于消息,那么在同一台机器上运行客户端和服务器,甚至在局域网内运行时,上述情况可能不会发生。但是,当数据通过多台机器传输时,Internet 上的 TCP 更不可预测。因此,您的代码需要考虑所有这些可能性。
代码简介
接受操作。在此应用程序中,执行接受操作的 Socket 可以通过 SocketAsyncEventArgs 对象中的 AcceptSocket 属性访问。在 Microsoft 的 AcceptSocket 页面上,它说:“如果未在调用 Socket.AcceptAsync 方法之前提供(设置为 null
),则会自动创建一个新的 Socket。”这正是我们在下面的代码中所做的。 Socket.AcceptAsync 方法将为每个新连接创建一个新的 Socket
对象。根据 Socket.AcceptAsync 页面,该“新 Socket 将以与当前 Socket 相同的AddressFamily、SocketType 和 ProtocolType 构造”。我发现在 .NET 3.5 中, Socket.AcceptAsync 方法 还会复制 LingerState
和 NoDelay
的设置,尽管 Socket.AcceptAsync 页面并未说明这一点。不确定其他版本的 .NET 是否如此。
我们可以有一个这些 SocketAsyncEventArgs 对象的池来处理接受操作。在此池中,您不需要为服务器维护的每个连接一个对象,因为接受操作完成后,Socket 的引用会很快传递给另一个 SocketAsyncEventArgs 对象。在接受操作的池中放入大量 SocketAsyncEventArgs 对象似乎并没有帮助。再次重复以供澄清,执行接受操作的 Socket 可以通过我们为接受操作创建的 SocketAsyncEventArgs 对象池中的 SocketAsyncEventArgs 对象中的 AcceptSocket 属性进行访问。在我们从执行接受操作的 SocketAsyncEventArgs 对象将 Socket 对象的引用传递给执行发送/接收操作的 SocketAsyncEventArgs 对象之后,您将通过执行发送/接收操作的 SocketAsyncEventArgs 对象中的 AcceptSocket 属性来访问 Socket。
在 .NET 中,而不是为每个接受操作创建一个新的 Socket 对象,可以选择拥有一个 Socket
对象池并重用 Socket。在有很多客户端快速连接和断开连接的情况下,Socket 池可以在服务器上提供性能提升。(不要尝试在客户端上重用 Socket,除非您在重用时连接到不同的服务器终结点。)如果使用 Socket 池,请使用 Disconnect
或 DisconnectAsync
方法,并带有适当的选项,而不是 Close
方法。
接收/发送操作。在此应用程序中,接收和发送操作通过来自我们为接收/发送操作创建的 SocketAsyncEventArgs 对象池的 SocketAsyncEventArgs 对象来处理。这不是我们刚才检查的接受操作的同一个池。为了提高性能,我们有一个用于执行接收和发送操作的对象池。用于接收/发送操作的 SocketAsyncEventArgs 对象数量至少应等于允许的最大并发连接数。
此代码中的通信协议是什么?
- 客户端的一条消息将对应服务器的一条消息。
- 建立连接后,客户端将首先发送一条消息,然后发布一个接收操作等待服务器的响应。对于客户端发送的每条消息,服务器都会向客户端发送一条消息作为响应。然后,它会发布另一个接收操作并等待来自客户端的下一条消息。在我们的代码中,服务器将在响应客户端之前对数据进行一些更改,以便我们不仅仅是回显客户端发送的数据。这种方法应该有助于您了解缓冲区中发生了什么。
- 我们将使用一个整数前缀来表示每条消息的长度。
注意:下面的代码不足以处理一个计算机在另一个计算机响应之前向另一台计算机发送多条消息的情况。这种情况更复杂,并且为此编写代码会使我们偏离本文和代码的主要目的。
缓冲区:TCP 中的缓冲区是非托管的,也就是说,它们不受 .NET Framework 控制,而是由 Windows 系统控制。因此,缓冲区会被“固定”在内存中的一个位置,从而导致内存碎片,因为 .NET 垃圾回收器无法收集这部分空间。通过将所有缓冲区放在一个内存块中,并反复重用同一空间,可以改善这种情况。请特别注意与缓冲区相关的代码,因为与缓冲区相关的内容似乎是人们遇到的更多困难的领域。
在此代码中,我为发送和接收使用了单独的缓冲区空间。您也可以为发送和接收重用相同的空间,从而只使用一半的内存。不一定是分开两者最好。我这样做只是为了帮助您思考缓冲区。(如果您重用相同的空间,那么您可以从 DataHoldingUserToken
中删除 bufferOffsetReceive
和 bufferOffsetSend
。而改为仅使用 SocketAsyncEventArgs
对象中的 Offset
属性。Offset
属性就是为此目的而设计的。)
由于使用了整数数据类型,缓冲区块的理论最大大小为 2.147 GB。并且在 32 位 Windows 上,您可能确实只需要小于 500 MB。当您使用较大的缓冲区大小和/或大量并发连接时,此限制才应该变得相关。例如,如果您使用 50,000 字节的缓冲区大小,并为发送和接收使用单独的缓冲区,那么每个连接需要 100,000 字节。2.147 GB 除以 100,000 字节 = 21,470,这将是使用此缓冲区块、此缓冲区大小和此设计可以使用的最大连接数。
通用:我在下面的代码中使用了说明性注释,以便无论是在屏幕上还是在打印页面上都能轻松理解它们。有时在代码注释中,我将“SocketAsyncEventArgs
”缩写为“SAEA”。
class Program
{
//This variable determines the number of
//SocketAsyncEventArg objects put in the pool of objects for receive/send.
//The value of this variable also affects the Semaphore.
//This app uses a Semaphore to ensure that the max # of connections
//value does not get exceeded.
//Max # of connections to a socket can be limited by the Windows Operating System
//also.
public const Int32 maxNumberOfConnections = 3000;
//If this port # will not work for you, it's okay to change it.
public const Int32 port = 4444;
//You would want a buffer size larger than 25 probably, unless you know the
//data will almost always be less than 25. It is just 25 in our test app.
public const Int32 testBufferSize = 25;
//This is the maximum number of asynchronous accept operations that can be
//posted simultaneously. This determines the size of the pool of
//SocketAsyncEventArgs objects that do accept operations. Note that this
//is NOT the same as the maximum # of connections.
public const Int32 maxSimultaneousAcceptOps = 10;
//The size of the queue of incoming connections for the listen socket.
public const Int32 backlog = 100;
//For the BufferManager
public const Int32 opsToPreAlloc = 2; // 1 for receive, 1 for send
//allows excess SAEA objects in pool.
public const Int32 excessSaeaObjectsInPool = 1;
//This number must be the same as the value on the client.
//Tells what size the message prefix will be. Don't change this unless
//you change the code, because 4 is the length of 32 bit integer, which
//is what we are using as prefix.
public const Int32 receivePrefixLength = 4;
public const Int32 sendPrefixLength = 4;
public static Int32 mainTransMissionId = 10000;
public static Int32 startingTid; //
public static Int32 mainSessionId = 1000000000;
public static List listOfDataHolders;
// To keep a record of maximum number of simultaneous connections
// that occur while the server is running. This can be limited by operating
// system and hardware. It will not be higher than the value that you set
// for maxNumberOfConnections.
public static Int32 maxSimultaneousClientsThatWereConnected = 0;
static void Main(String[] args)
{
try
{
// Get endpoint for the listener.
IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
WriteInfoToConsole(localEndPoint);
//This object holds a lot of settings that we pass from Main method
//to the SocketListener. In a real app, you might want to read
//these settings from a database or windows registry settings that
//you would create.
SocketListenerSettings theSocketListenerSettings =
new SocketListenerSettings(maxNumberOfConnections,
excessSaeaObjectsInPool, backlog, maxSimultaneousAcceptOps,
receivePrefixLength, testBufferSize, sendPrefixLength, opsToPreAlloc,
localEndPoint);
//instantiate the SocketListener.
SocketListener socketListener = new SocketListener(theSocketListenerSettings);
}
catch (Exception ex)
{
Console.WriteLine("Error: " + ex.Message);
}
}
主要类是 SocketListener
。
class SocketListener
{
//Buffers for sockets are unmanaged by .NET.
//So memory used for buffers gets "pinned", which makes the
//.NET garbage collector work around it, fragmenting the memory.
//Circumvent this problem by putting all buffers together
//in one block in memory. Then we will assign a part of that space
//to each SocketAsyncEventArgs object, and
//reuse that buffer space each time we reuse the SocketAsyncEventArgs object.
//Create a large reusable set of buffers for all socket operations.
BufferManager theBufferManager;
// the socket used to listen for incoming connection requests
Socket listenSocket;
//A Semaphore has two parameters, the initial number of available slots
// and the maximum number of slots. We'll make them the same.
//This Semaphore is used to keep from going over max connection #.
//(It is not about controlling threading really here.)
Semaphore theMaxConnectionsEnforcer;
//an object that we pass in and which has all the settings the listener needs
SocketListenerSettings socketListenerSettings;
PrefixHandler prefixHandler;
MessageHandler messageHandler;
// pool of reusable SocketAsyncEventArgs objects for accept operations
SocketAsyncEventArgsPool poolOfAcceptEventArgs;
// pool of reusable SocketAsyncEventArgs objects for
//receive and send socket operations
SocketAsyncEventArgsPool poolOfRecSendEventArgs;
//_______________________________________________________________________________
// Constructor.
public SocketListener(SocketListenerSettings theSocketListenerSettings)
{
this.socketListenerSettings = theSocketListenerSettings;
this.prefixHandler = new PrefixHandler();
this.messageHandler = new MessageHandler();
//Allocate memory for buffers. We are using a separate buffer space for
//receive and send, instead of sharing the buffer space, like the Microsoft
//example does.
this.theBufferManager = new BufferManager(this.socketListenerSettings.BufferSize
* this.socketListenerSettings.NumberOfSaeaForRecSend
* this.socketListenerSettings.OpsToPreAllocate,
this.socketListenerSettings.BufferSize
* this.socketListenerSettings.OpsToPreAllocate);
this.poolOfRecSendEventArgs = new
SocketAsyncEventArgsPool(this.socketListenerSettings.NumberOfSaeaForRecSend);
this.poolOfAcceptEventArgs = new
SocketAsyncEventArgsPool(this.socketListenerSettings.MaxAcceptOps);
// Create connections count enforcer
this.theMaxConnectionsEnforcer = new
Semaphore(this.socketListenerSettings.MaxConnections,
this.socketListenerSettings.MaxConnections);
//Microsoft's example called these from Main method, which you
//can easily do if you wish.
Init();
StartListen();
}
//____________________________________________________________________________
// initializes the server by preallocating reusable buffers and
// context objects (SocketAsyncEventArgs objects).
//It is NOT mandatory that you preallocate them or reuse them. But, but it is
//done this way to illustrate how the API can
// easily be used to create reusable objects to increase server performance.
internal void Init()
{
// Allocate one large byte buffer block, which all I/O operations will
//use a piece of. This guards against memory fragmentation.
this.theBufferManager.InitBuffer();
// preallocate pool of SocketAsyncEventArgs objects for accept operations
for (Int32 i = 0; i < this.socketListenerSettings.MaxAcceptOps; i++)
{
// add SocketAsyncEventArg to the pool
this.poolOfAcceptEventArgs.Push(
CreateNewSaeaForAccept(poolOfAcceptEventArgs));
}
//The pool that we built ABOVE is for SocketAsyncEventArgs objects that do
// accept operations.
//Now we will build a separate pool for SAEAs objects
//that do receive/send operations. One reason to separate them is that accept
//operations do NOT need a buffer, but receive/send operations do.
//ReceiveAsync and SendAsync require
//a parameter for buffer size in SocketAsyncEventArgs.Buffer.
// So, create pool of SAEA objects for receive/send operations.
SocketAsyncEventArgs eventArgObjectForPool;
Int32 tokenId;
for (Int32 i = 0; i < this.socketListenerSettings.NumberOfSaeaForRecSend; i++)
{
//Allocate the SocketAsyncEventArgs object for this loop,
//to go in its place in the stack which will be the pool
//for receive/send operation context objects.
eventArgObjectForPool = new SocketAsyncEventArgs();
// assign a byte buffer from the buffer block to
//this particular SocketAsyncEventArg object
this.theBufferManager.SetBuffer(eventArgObjectForPool);
tokenId = poolOfRecSendEventArgs.AssignTokenId() + 1000000;
//Attach the SocketAsyncEventArgs object
//to its event handler. Since this SocketAsyncEventArgs object is
//used for both receive and send operations, whenever either of those
//completes, the IO_Completed method will be called.
eventArgObjectForPool.Completed += new
EventHandler(IO_Completed);
//We can store data in the UserToken property of SAEA object.
DataHoldingUserToken theTempReceiveSendUserToken = new
DataHoldingUserToken(eventArgObjectForPool, eventArgObjectForPool.Offset,
eventArgObjectForPool.Offset + this.socketListenerSettings.BufferSize,
this.socketListenerSettings.ReceivePrefixLength,
this.socketListenerSettings.SendPrefixLength, tokenId);
//We'll have an object that we call DataHolder, that we can remove from
//the UserToken when we are finished with it. So, we can hang on to the
//DataHolder, pass it to an app, serialize it, or whatever.
theTempReceiveSendUserToken.CreateNewDataHolder();
eventArgObjectForPool.UserToken = theTempReceiveSendUserToken;
// add this SocketAsyncEventArg object to the pool.
this.poolOfRecSendEventArgs.Push(eventArgObjectForPool);
}
//____________________________________________________________________________
// This method is called when we need to create a new SAEA object to do
//accept operations. The reason to put it in a separate method is so that
//we can easily add more objects to the pool if we need to.
//You can do that if you do NOT use a buffer in the SAEA object that does
//the accept operations.
internal SocketAsyncEventArgs CreateNewSaeaForAccept(SocketAsyncEventArgsPool pool)
{
//Allocate the SocketAsyncEventArgs object.
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
//SocketAsyncEventArgs.Completed is an event, (the only event,)
//declared in the SocketAsyncEventArgs class.
//See http://msdn.microsoft.com/en-us/library/
// system.net.sockets.socketasynceventargs.completed.aspx.
//An event handler should be attached to the event within
//a SocketAsyncEventArgs instance when an asynchronous socket
//operation is initiated, otherwise the application will not be able
//to determine when the operation completes.
//Attach the event handler, which causes the calling of the
//AcceptEventArg_Completed object when the accept op completes.
acceptEventArg.Completed +=
new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
AcceptOpUserToken theAcceptOpToken = new
AcceptOpUserToken(pool.AssignTokenId() + 10000);
acceptEventArg.UserToken = theAcceptOpToken;
return acceptEventArg;
// accept operations do NOT need a buffer.
//You can see that is true by looking at the
//methods in the .NET Socket class on the Microsoft website. AcceptAsync does
//not require a parameter for buffer size.
}
//____________________________________________________________________________
// This method starts the socket server such that it is listening for
// incoming connection requests.
internal void StartListen()
{
// create the socket which listens for incoming connections
listenSocket = new
Socket(this.socketListenerSettings.LocalEndPoint.AddressFamily,
SocketType.Stream, ProtocolType.Tcp);
//bind it to the port
listenSocket.Bind(this.socketListenerSettings.LocalEndPoint);
// Start the listener with a backlog of however many connections.
//"backlog" means pending connections.
//The backlog number is the number of clients that can wait for a
//SocketAsyncEventArg object that will do an accept operation.
//The listening socket keeps the backlog as a queue. The backlog allows
//for a certain # of excess clients waiting to be connected.
//If the backlog is maxed out, then the client will receive an error when
//trying to connect.
//max # for backlog can be limited by the operating system.
listenSocket.Listen(this.socketListenerSettings.Backlog);
//Server is listening now****
// Calls the method which will post accepts on the listening socket.
//This call just occurs one time from this StartListen method.
//After that the StartAccept method will be called in a loop.
StartAccept();
}
//____________________________________________________________________________
// Begins an operation to accept a connection request from the client
internal void StartAccept()
{
//Get a SocketAsyncEventArgs object to accept the connection.
SocketAsyncEventArgs acceptEventArg;
//Get it from the pool if there is more than one in the pool.
//We could use zero as bottom, but one is a little safer.
if (this.poolOfAcceptEventArgs.Count > 1)
{
try
{
acceptEventArg = this.poolOfAcceptEventArgs.Pop();
}
//or make a new one.
catch
{
acceptEventArg = CreateNewSaeaForAccept(poolOfAcceptEventArgs);
}
}
//or make a new one.
else
{
acceptEventArg = CreateNewSaeaForAccept(poolOfAcceptEventArgs);
}
// Semaphore class is used to control access to a resource or pool of
// resources. Enter the semaphore by calling the WaitOne method, which is
// inherited from the WaitHandle class, and release the semaphore
// by calling the Release method. This is a mechanism to prevent exceeding
// the max # of connections we specified. We'll do this before
// doing AcceptAsync. If maxConnections value has been reached,
// then the thread will pause here until the Semaphore gets released,
// which happens in the CloseClientSocket method.
this.theMaxConnectionsEnforcer.WaitOne();
// Socket.AcceptAsync begins asynchronous operation to accept the connection.
// Note the listening socket will pass info to the SocketAsyncEventArgs
// object that has the Socket that does the accept operation.
// If you do not create a Socket object and put it in the SAEA object
// before calling AcceptAsync and use the AcceptSocket property to get it,
// then a new Socket object will be created for you by .NET.
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
// Socket.AcceptAsync returns true if the I/O operation is pending, i.e. is
// working asynchronously. The
// SocketAsyncEventArgs.Completed event on the acceptEventArg parameter
// will be raised upon completion of accept op.
// AcceptAsync will call the AcceptEventArg_Completed
// method when it completes, because when we created this SocketAsyncEventArgs
// object before putting it in the pool, we set the event handler to do it.
// AcceptAsync returns false if the I/O operation completed synchronously.
// The SocketAsyncEventArgs.Completed event on the acceptEventArg parameter
// will NOT be raised when AcceptAsync returns false.
if (!willRaiseEvent)
{
// The code in this if (!willRaiseEvent) statement only runs
// when the operation was completed synchronously. It is needed because
// when Socket.AcceptAsync returns false,
// it does NOT raise the SocketAsyncEventArgs.Completed event.
// And we need to call ProcessAccept and pass it the SAEA object.
// This is only when a new connection is being accepted.
// Probably only relevant in the case of a socket error.
ProcessAccept(acceptEventArg);
}
}
//____________________________________________________________________________
// This method is the callback method associated with Socket.AcceptAsync
// operations and is invoked when an async accept operation completes.
//This is only when a new connection is being accepted.
//Notice that Socket.AcceptAsync is returning a value of true, and
//raising the Completed event when the AcceptAsync method completes.
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
//Any code that you put in this method will NOT be called if
//the operation completes synchronously, which will probably happen when
//there is some kind of socket error. It might be better to put the code
//in the ProcessAccept method.
ProcessAccept(e);
}
//____________________________________________________________________________
//The e parameter passed from the AcceptEventArg_Completed method
//represents the SocketAsyncEventArgs object that did
//the accept operation. in this method we'll do the handoff from it to the
//SocketAsyncEventArgs object that will do receive/send.
private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
// This is when there was an error with the accept op. That should NOT
// be happening often. It could indicate that there is a problem with
// that socket. If there is a problem, then we would have an infinite
// loop here, if we tried to reuse that same socket.
if (acceptEventArgs.SocketError != SocketError.Success)
{
// Loop back to post another accept op. Notice that we are NOT
// passing the SAEA object here.
LoopToStartAccept();
AcceptOpUserToken theAcceptOpToken =
(AcceptOpUserToken)acceptEventArgs.UserToken;
//Let's destroy this socket, since it could be bad.
HandleBadAccept(acceptEventArgs);
//Jump out of the method.
return;
}
//Now that the accept operation completed, we can start another
//accept operation, which will do the same. Notice that we are NOT
//passing the SAEA object here.
LoopToStartAccept();
// Get a SocketAsyncEventArgs object from the pool of receive/send op
//SocketAsyncEventArgs objects
SocketAsyncEventArgs receiveSendEventArgs = this.poolOfRecSendEventArgs.Pop();
//Create sessionId in UserToken.
((DataHoldingUserToken)receiveSendEventArgs.UserToken).CreateSessionId();
//A new socket was created by the AcceptAsync method. The
//SocketAsyncEventArgs object which did the accept operation has that
//socket info in its AcceptSocket property. Now we will give
//a reference for that socket to the SocketAsyncEventArgs
//object which will do receive/send.
receiveSendEventArgs.AcceptSocket = acceptEventArgs.AcceptSocket;
//We have handed off the connection info from the
//accepting socket to the receiving socket. So, now we can
//put the SocketAsyncEventArgs object that did the accept operation
//back in the pool for them. But first we will clear
//the socket info from that object, so it will be
//ready for a new socket when it comes out of the pool.
acceptEventArgs.AcceptSocket = null;
this.poolOfAcceptEventArgs.Push(acceptEventArgs);
StartReceive(receiveSendEventArgs);
}
//____________________________________________________________________________
//LoopToStartAccept method just sends us back to the beginning of the
//StartAccept method, to start the next accept operation on the next
//connection request that this listening socket will pass of to an
//accepting socket. We do NOT actually need this method. You could
//just call StartAccept() in ProcessAccept() where we called LoopToStartAccept().
//This method is just here to help you visualize the program flow.
private void LoopToStartAccept()
{
StartAccept();
}
//____________________________________________________________________________
// Set the receive buffer and post a receive op.
private void StartReceive(SocketAsyncEventArgs receiveSendEventArgs)
{
//Set the buffer for the receive operation.
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetReceive,
this.socketListenerSettings.BufferSize);
// Post async receive operation on the socket.
bool willRaiseEvent =
receiveSendEventArgs.AcceptSocket.ReceiveAsync(receiveSendEventArgs);
//Socket.ReceiveAsync returns true if the I/O operation is pending. The
//SocketAsyncEventArgs.Completed event on the e parameter will be raised
//upon completion of the operation. So, true will cause the IO_Completed
//method to be called when the receive operation completes.
//That's because of the event handler we created when building
//the pool of SocketAsyncEventArgs objects that perform receive/send.
//It was the line that said
//eventArgObjectForPool.Completed +=
// new EventHandler<SocketAsyncEventArgs>(IO_Completed);
//Socket.ReceiveAsync returns false if I/O operation completed synchronously.
//In that case, the SocketAsyncEventArgs.Completed event on the e parameter
//will not be raised and the e object passed as a parameter may be
//examined immediately after the method call
//returns to retrieve the result of the operation.
// It may be false in the case of a socket error.
if (!willRaiseEvent)
{
//If the op completed synchronously, we need to call ProcessReceive
//method directly. This will probably be used rarely, as you will
//see in testing.
ProcessReceive(receiveSendEventArgs);
}
}
//____________________________________________________________________________
// This method is called whenever a receive or send operation completes.
// Here "e" represents the SocketAsyncEventArgs object associated
//with the completed receive or send operation
void IO_Completed(object sender, SocketAsyncEventArgs e)
{
//Any code that you put in this method will NOT be called if
//the operation completes synchronously, which will probably happen when
//there is some kind of socket error.
// determine which type of operation just
// completed and call the associated handler
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
//This exception will occur if you code the Completed event of some
//operation to come to this method, by mistake.
throw new ArgumentException("The last operation completed on
the socket was not a receive or send");
}
}
//____________________________________________________________________________
// This method is invoked by the IO_Completed method
// when an asynchronous receive operation completes.
// If the remote host closed the connection, then the socket is closed.
// Otherwise, we process the received data. And if a complete message was
// received, then we do some additional processing, to
// respond to the client.
private void ProcessReceive(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
// If there was a socket error, close the connection. This is NOT a normal
// situation, if you get an error here.
// In the Microsoft example code they had this error situation handled
// at the end of ProcessReceive. Putting it here improves readability
// by reducing nesting some.
if (receiveSendEventArgs.SocketError != SocketError.Success)
{
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
//Jump out of the ProcessReceive method.
return;
}
// If no data was received, close the connection. This is a NORMAL
// situation that shows when the client has finished sending data.
if (receiveSendEventArgs.BytesTransferred == 0)
{
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
return;
}
//The BytesTransferred property tells us how many bytes
//we need to process.
Int32 remainingBytesToProcess = receiveSendEventArgs.BytesTransferred;
//If we have not got all of the prefix already,
//then we need to work on it here.
if (receiveSendToken.receivedPrefixBytesDoneCount <
this.socketListenerSettings.ReceivePrefixLength)
{
remainingBytesToProcess = prefixHandler.HandlePrefix(receiveSendEventArgs,
receiveSendToken, remainingBytesToProcess);
if (remainingBytesToProcess == 0)
{
// We need to do another receive op, since we do not have
// the message yet, but remainingBytesToProcess == 0.
StartReceive(receiveSendEventArgs);
//Jump out of the method.
return;
}
}
// If we have processed the prefix, we can work on the message now.
// We'll arrive here when we have received enough bytes to read
// the first byte after the prefix.
bool incomingTcpMessageIsReady = messageHandler
.HandleMessage(receiveSendEventArgs,
receiveSendToken, remainingBytesToProcess);
if (incomingTcpMessageIsReady == true)
{
// Pass the DataHolder object to the Mediator here. The data in
// this DataHolder can be used for all kinds of things that an
// intelligent and creative person like you might think of.
receiveSendToken.theMediator.HandleData(receiveSendToken.theDataHolder);
// Create a new DataHolder for next message.
receiveSendToken.CreateNewDataHolder();
//Reset the variables in the UserToken, to be ready for the
//next message that will be received on the socket in this
//SAEA object.
receiveSendToken.Reset();
receiveSendToken.theMediator.PrepareOutgoingData();
StartSend(receiveSendToken.theMediator.GiveBack());
}
else
{
// Since we have NOT gotten enough bytes for the whole message,
// we need to do another receive op. Reset some variables first.
// All of the data that we receive in the next receive op will be
// message. None of it will be prefix. So, we need to move the
// receiveSendToken.receiveMessageOffset to the beginning of the
// receive buffer space for this SAEA.
receiveSendToken.receiveMessageOffset = receiveSendToken.bufferOffsetReceive;
// Do NOT reset receiveSendToken.receivedPrefixBytesDoneCount here.
// Just reset recPrefixBytesDoneThisOp.
receiveSendToken.recPrefixBytesDoneThisOp = 0;
// Since we have not gotten enough bytes for the whole message,
// we need to do another receive op.
StartReceive(receiveSendEventArgs);
}
}
//____________________________________________________________________________
//Post a send op.
private void StartSend(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
//Set the buffer. You can see on Microsoft's page at
//http://msdn.microsoft.com/en-us/library/
// system.net.sockets.socketasynceventargs.setbuffer.aspx
//that there are two overloads. One of the overloads has 3 parameters.
//When setting the buffer, you need 3 parameters the first time you set it,
//which we did in the Init method. The first of the three parameters
//tells what byte array to use as the buffer. After we tell what byte array
//to use we do not need to use the overload with 3 parameters any more.
//(That is the whole reason for using the buffer block. You keep the same
//byte array as buffer always, and keep it all in one block.)
//Now we use the overload with two parameters. We tell
// (1) the offset and
// (2) the number of bytes to use, starting at the offset.
//The number of bytes to send depends on whether the message is larger than
//the buffer or not. If it is larger than the buffer, then we will have
//to post more than one send operation. If it is less than or equal to the
//size of the send buffer, then we can accomplish it in one send op.
if (receiveSendToken.sendBytesRemainingCount
<= this.socketListenerSettings.BufferSize)
{
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetSend,
receiveSendToken.sendBytesRemainingCount);
//Copy the bytes to the buffer associated with this SAEA object.
Buffer.BlockCopy(receiveSendToken.dataToSend,
receiveSendToken.bytesSentAlreadyCount,
receiveSendEventArgs.Buffer, receiveSendToken.bufferOffsetSend,
receiveSendToken.sendBytesRemainingCount);
}
else
{
//We cannot try to set the buffer any larger than its size.
//So since receiveSendToken.sendBytesRemainingCount > BufferSize, we just
//set it to the maximum size, to send the most data possible.
receiveSendEventArgs.SetBuffer(receiveSendToken.bufferOffsetSend,
this.socketListenerSettings.BufferSize);
//Copy the bytes to the buffer associated with this SAEA object.
Buffer.BlockCopy(receiveSendToken.dataToSend,
receiveSendToken.bytesSentAlreadyCount,
receiveSendEventArgs.Buffer, receiveSendToken.bufferOffsetSend,
this.socketListenerSettings.BufferSize);
//We'll change the value of sendUserToken.sendBytesRemainingCount
//in the ProcessSend method.
}
//post asynchronous send operation
bool willRaiseEvent =
receiveSendEventArgs.AcceptSocket.SendAsync(receiveSendEventArgs);
if (!willRaiseEvent)
{
ProcessSend(receiveSendEventArgs);
}
}
//____________________________________________________________________________
// This method is called by I/O Completed() when an asynchronous send completes.
// If all of the data has been sent, then this method calls StartReceive
//to start another receive op on the socket to read any additional
// data sent from the client. If all of the data has NOT been sent, then it
//calls StartSend to send more data.
private void ProcessSend(SocketAsyncEventArgs receiveSendEventArgs)
{
DataHoldingUserToken receiveSendToken =
(DataHoldingUserToken)receiveSendEventArgs.UserToken;
receiveSendToken.sendBytesRemainingCount =
receiveSendToken.sendBytesRemainingCount
- receiveSendEventArgs.BytesTransferred;
receiveSendToken.bytesSentAlreadyCount +=
receiveSendEventArgs.BytesTransferred;
if (receiveSendEventArgs.SocketError == SocketError.Success)
{
if (receiveSendToken.sendBytesRemainingCount == 0)
{
StartReceive(receiveSendEventArgs);
}
else
{
//If some of the bytes in the message have NOT been sent,
//then we will need to post another send operation.
//So let's loop back to StartSend().
StartSend(receiveSendEventArgs);
}
}
else
{
//If we are in this else-statement, there was a socket error.
//In this example we'll just close the socket if there was a socket error
//when receiving data from the client.
receiveSendToken.Reset();
CloseClientSocket(receiveSendEventArgs);
}
}
//_______________________________________________________________________
// Does the normal destroying of sockets after
// we finish receiving and sending on a connection.
private void CloseClientSocket(SocketAsyncEventArgs e)
{
var receiveSendToken = (e.UserToken as DataHoldingUserToken);
// do a shutdown before you close the socket
try
{
e.AcceptSocket.Shutdown(SocketShutdown.Both);
}
// throws if socket was already closed
catch (Exception)
{
}
//This method closes the socket and releases all resources, both
//managed and unmanaged. It internally calls Dispose.
e.AcceptSocket.Close();
//Make sure the new DataHolder has been created for the next connection.
//If it has, then dataMessageReceived should be null.
if (receiveSendToken.theDataHolder.dataMessageReceived != null)
{
receiveSendToken.CreateNewDataHolder();
}
// Put the SocketAsyncEventArg back into the pool,
// to be used by another client. This
this.poolOfRecSendEventArgs.Push(e);
// decrement the counter keeping track of the total number of clients
//connected to the server, for testing
Interlocked.Decrement(ref this.numberOfAcceptedSockets);
//Release Semaphore so that its connection counter will be decremented.
//This must be done AFTER putting the SocketAsyncEventArg back into the pool,
//or you can run into problems.
this.theMaxConnectionsEnforcer.Release();
}
//____________________________________________________________________________
private void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
{
var acceptOpToken = (acceptEventArgs.UserToken as AcceptOpUserToken);
//This method closes the socket and releases all resources, both
//managed and unmanaged. It internally calls Dispose.
acceptEventArgs.AcceptSocket.Close();
//Put the SAEA back in the pool.
poolOfAcceptEventArgs.Push(acceptEventArgs);
}
}
class PrefixHandler
{
public Int32 HandlePrefix(SocketAsyncEventArgs e,
DataHoldingUserToken receiveSendToken,
Int32 remainingBytesToProcess)
{
//receivedPrefixBytesDoneCount tells us how many prefix bytes were
//processed during previous receive ops which contained data for
//this message. Usually there will NOT have been any previous
//receive ops here. So in that case,
//receiveSendToken.receivedPrefixBytesDoneCount would equal 0.
//Create a byte array to put the new prefix in, if we have not
//already done it in a previous loop.
if (receiveSendToken.receivedPrefixBytesDoneCount == 0)
{
receiveSendToken.byteArrayForPrefix = new
Byte[receiveSendToken.receivePrefixLength];
}
//If this next if-statement is true, then we have received at
//least enough bytes to have the prefix. So we can determine the
//length of the message that we are working on.
if (remainingBytesToProcess >= receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount)
{
//Now copy that many bytes to byteArrayForPrefix.
//We can use the variable receiveMessageOffset as our main
//index to show which index to get data from in the TCP
//buffer.
Buffer.BlockCopy(e.Buffer, receiveSendToken.receiveMessageOffset
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.byteArrayForPrefix,
receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount);
remainingBytesToProcess = remainingBytesToProcess
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount;
receiveSendToken.recPrefixBytesDoneThisOp =
receiveSendToken.receivePrefixLength
- receiveSendToken.receivedPrefixBytesDoneCount;
receiveSendToken.receivedPrefixBytesDoneCount =
receiveSendToken.receivePrefixLength;
receiveSendToken.lengthOfCurrentIncomingMessage =
BitConverter.ToInt32(receiveSendToken.byteArrayForPrefix, 0);
return remainingBytesToProcess;
}
//This next else-statement deals with the situation
//where we have some bytes
//of this prefix in this receive operation, but not all.
else
{
//Write the bytes to the array where we are putting the
//prefix data, to save for the next loop.
Buffer.BlockCopy(e.Buffer, receiveSendToken.receiveMessageOffset
- receiveSendToken.receivePrefixLength
+ receiveSendToken.receivedPrefixBytesDoneCount,
receiveSendToken.byteArrayForPrefix,
receiveSendToken.receivedPrefixBytesDoneCount,
remainingBytesToProcess);
receiveSendToken.recPrefixBytesDoneThisOp = remainingBytesToProcess;
receiveSendToken.receivedPrefixBytesDoneCount += remainingBytesToProcess;
remainingBytesToProcess = 0;
}
// This section is needed when we have received
// an amount of data exactly equal to the amount needed for the prefix,
// but no more. And also needed with the situation where we have received
// less than the amount of data needed for prefix.
if (remainingBytesToProcess == 0)
{
receiveSendToken.receiveMessageOffset =
receiveSendToken.receiveMessageOffset -
receiveSendToken.recPrefixBytesDoneThisOp;
receiveSendToken.recPrefixBytesDoneThisOp = 0;
}
return remainingBytesToProcess;
}
}
class MessageHandler
{
public bool HandleMessage(SocketAsyncEventArgs receiveSendEventArgs,
DataHoldingUserToken receiveSendToken,
Int32 remainingBytesToProcess)
{
bool incomingTcpMessageIsReady = false;
//Create the array where we'll store the complete message,
//if it has not been created on a previous receive op.
if (receiveSendToken.receivedMessageBytesDoneCount == 0)
{
receiveSendToken.theDataHolder.dataMessageReceived =
new Byte[receiveSendToken.lengthOfCurrentIncomingMessage];
}
// Remember there is a receiveSendToken.receivedPrefixBytesDoneCount
// variable, which allowed us to handle the prefix even when it
// requires multiple receive ops. In the same way, we have a
// receiveSendToken.receivedMessageBytesDoneCount variable, which
// helps us handle message data, whether it requires one receive
// operation or many.
if (remainingBytesToProcess + receiveSendToken.receivedMessageBytesDoneCount
== receiveSendToken.lengthOfCurrentIncomingMessage)
{
// If we are inside this if-statement, then we got
// the end of the message. In other words,
// the total number of bytes we received for this message matched the
// message length value that we got from the prefix.
// Write/append the bytes received to the byte array in the
// DataHolder object that we are using to store our data.
Buffer.BlockCopy(receiveSendEventArgs.Buffer,
receiveSendToken.receiveMessageOffset,
receiveSendToken.theDataHolder.dataMessageReceived,
receiveSendToken.receivedMessageBytesDoneCount,
remainingBytesToProcess);
incomingTcpMessageIsReady = true;
}
else
{
// If we are inside this else-statement, then that means that we
// need another receive op. We still haven't got the whole message,
// even though we have examined all the data that was received.
// Not a problem. In SocketListener.ProcessReceive we will just call
// StartReceive to do another receive op to receive more data.
Buffer.BlockCopy(receiveSendEventArgs.Buffer,
receiveSendToken.receiveMessageOffset,
receiveSendToken.theDataHolder.dataMessageReceived,
receiveSendToken.receivedMessageBytesDoneCount,
remainingBytesToProcess);
receiveSendToken.receiveMessageOffset =
receiveSendToken.receiveMessageOffset -
receiveSendToken.recPrefixBytesDoneThisOp;
receiveSendToken.receivedMessageBytesDoneCount += remainingBytesToProcess;
}
return incomingTcpMessageIsReady;
}
}
class BufferManager
{
// This class creates a single large buffer which can be divided up
// and assigned to SocketAsyncEventArgs objects for use with each
// socket I/O operation.
// This enables buffers to be easily reused and guards against
// fragmenting heap memory.
//
//This buffer is a byte array which the Windows TCP buffer can copy its data to.
// the total number of bytes controlled by the buffer pool
Int32 totalBytesInBufferBlock;
// Byte array maintained by the Buffer Manager.
byte[] bufferBlock;
Stack<int> freeIndexPool;
Int32 currentIndex;
Int32 bufferBytesAllocatedForEachSaea;
public BufferManager(Int32 totalBytes, Int32 totalBufferBytesInEachSaeaObject)
{
totalBytesInBufferBlock = totalBytes;
this.currentIndex = 0;
this.bufferBytesAllocatedForEachSaea = totalBufferBytesInEachSaeaObject;
this.freeIndexPool = new Stack<int>();
}
// Allocates buffer space used by the buffer pool
internal void InitBuffer()
{
// Create one large buffer block.
this.bufferBlock = new byte[totalBytesInBufferBlock];
}
// Divide that one large buffer block out to each SocketAsyncEventArg object.
// Assign a buffer space from the buffer block to the
// specified SocketAsyncEventArgs object.
//
// returns true if the buffer was successfully set, else false
internal bool SetBuffer(SocketAsyncEventArgs args)
{
if (this.freeIndexPool.Count > 0)
{
//This if-statement is only true if you have called the FreeBuffer
//method previously, which would put an offset for a buffer space
//back into this stack.
args.SetBuffer(this.bufferBlock, this.freeIndexPool.Pop(),
this.bufferBytesAllocatedForEachSaea);
}
else
{
//Inside this else-statement is the code that is used to set the
//buffer for each SAEA object when the pool of SAEA objects is built
//in the Init method.
if ((totalBytesInBufferBlock - this.bufferBytesAllocatedForEachSaea) <
this.currentIndex)
{
return false;
}
args.SetBuffer(this.bufferBlock, this.currentIndex,
this.bufferBytesAllocatedForEachSaea);
this.currentIndex += this.bufferBytesAllocatedForEachSaea;
}
return true;
}
// Removes the buffer from a SocketAsyncEventArg object. This frees the
// buffer back to the buffer pool. Try NOT to use the FreeBuffer method,
// unless you need to destroy the SAEA object, or maybe in the case
// of some exception handling. Instead, on the server
// keep the same buffer space assigned to one SAEA object for the duration of
// this app's running.
internal void FreeBuffer(SocketAsyncEventArgs args)
{
this.freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
}
class DataHoldingUserToken
{
internal Mediator theMediator;
internal DataHolder theDataHolder;
internal readonly Int32 bufferOffsetReceive;
internal readonly Int32 permanentReceiveMessageOffset;
internal readonly Int32 bufferOffsetSend;
private Int32 idOfThisObject;
internal Int32 lengthOfCurrentIncomingMessage;
//receiveMessageOffset is used to mark the byte position where the message
//begins in the receive buffer. This value can sometimes be out of
//bounds for the data stream just received. But, if it is out of bounds, the
//code will not access it.
internal Int32 receiveMessageOffset;
internal Byte[] byteArrayForPrefix;
internal readonly Int32 receivePrefixLength;
internal Int32 receivedPrefixBytesDoneCount = 0;
internal Int32 receivedMessageBytesDoneCount = 0;
//This variable will be needed to calculate the value of the
//receiveMessageOffset variable in one situation. Notice that the
//name is similar but the usage is different from the variable
//receiveSendToken.receivePrefixBytesDone.
internal Int32 recPrefixBytesDoneThisOp = 0;
internal Int32 sendBytesRemainingCount;
internal readonly Int32 sendPrefixLength;
internal Byte[] dataToSend;
internal Int32 bytesSentAlreadyCount;
//The session ID correlates with all the data sent in a connected session.
//It is different from the transmission ID in the DataHolder, which relates
//to one TCP message. A connected session could have many messages, if you
//set up your app to allow it.
private Int32 sessionId;
public DataHoldingUserToken(SocketAsyncEventArgs e, Int32 rOffset, Int32 sOffset,
Int32 receivePrefixLength, Int32 sendPrefixLength, Int32 identifier)
{
this.idOfThisObject = identifier;
//Create a Mediator that has a reference to the SAEA object.
this.theMediator = new Mediator(e);
this.bufferOffsetReceive = rOffset;
this.bufferOffsetSend = sOffset;
this.receivePrefixLength = receivePrefixLength;
this.sendPrefixLength = sendPrefixLength;
this.receiveMessageOffset = rOffset + receivePrefixLength;
this.permanentReceiveMessageOffset = this.receiveMessageOffset;
}
//Let's use an ID for this object during testing, just so we can see what
//is happening better if we want to.
public Int32 TokenId
{
get
{
return this.idOfThisObject;
}
}
internal void CreateNewDataHolder()
{
theDataHolder = new DataHolder();
}
//Used to create sessionId variable in DataHoldingUserToken.
//Called in ProcessAccept().
internal void CreateSessionId()
{
sessionId = Interlocked.Increment(ref Program.mainSessionId);
}
public Int32 SessionId
{
get
{
return this.sessionId;
}
}
public void Reset()
{
this.receivedPrefixBytesDoneCount = 0;
this.receivedMessageBytesDoneCount = 0;
this.recPrefixBytesDoneThisOp = 0;
this.receiveMessageOffset = this.permanentReceiveMessageOffset;
}
}
class Mediator
{
private IncomingDataPreparer theIncomingDataPreparer;
private OutgoingDataPreparer theOutgoingDataPreparer;
private DataHolder theDataHolder;
private SocketAsyncEventArgs saeaObject;
public Mediator(SocketAsyncEventArgs e)
{
this.saeaObject = e;
this.theIncomingDataPreparer = new IncomingDataPreparer(saeaObject);
this.theOutgoingDataPreparer = new OutgoingDataPreparer();
}
internal void HandleData(DataHolder incomingDataHolder)
{
theDataHolder = theIncomingDataPreparer.HandleReceivedData
(incomingDataHolder, this.saeaObject);
}
internal void PrepareOutgoingData()
{
theOutgoingDataPreparer.PrepareOutgoingData(saeaObject, theDataHolder);
}
internal SocketAsyncEventArgs GiveBack()
{
return saeaObject;
}
}
class IncomingDataPreparer
{
private DataHolder theDataHolder;
private SocketAsyncEventArgs theSaeaObject;
public IncomingDataPreparer(SocketAsyncEventArgs e)
{
this.theSaeaObject = e;
}
private Int32 ReceivedTransMissionIdGetter()
{
Int32 receivedTransMissionId =
Interlocked.Increment(ref Program.mainTransMissionId);
return receivedTransMissionId;
}
private EndPoint GetRemoteEndpoint()
{
return this.theSaeaObject.AcceptSocket.RemoteEndPoint;
}
internal DataHolder HandleReceivedData(DataHolder incomingDataHolder,
SocketAsyncEventArgs theSaeaObject)
{
DataHoldingUserToken receiveToken =
(DataHoldingUserToken)theSaeaObject.UserToken;
theDataHolder = incomingDataHolder;
theDataHolder.sessionId = receiveToken.SessionId;
theDataHolder.receivedTransMissionId =
this.ReceivedTransMissionIdGetter();
theDataHolder.remoteEndpoint = this.GetRemoteEndpoint();
this.AddDataHolder();
return theDataHolder;
}
private void AddDataHolder()
{
lock (Program.lockerForList)
{
Program.listOfDataHolders.Add(theDataHolder);
}
}
}
class OutgoingDataPreparer
{
private DataHolder theDataHolder;
internal void PrepareOutgoingData(SocketAsyncEventArgs e,
DataHolder handledDataHolder)
{
DataHoldingUserToken theUserToken = (DataHoldingUserToken)e.UserToken;
theDataHolder = handledDataHolder;
//In this example code, we will send back the receivedTransMissionId,
// followed by the
//message that the client sent to the server. And we must
//prefix it with the length of the message. So we put 3
//things into the array.
// 1) prefix,
// 2) receivedTransMissionId,
// 3) the message that we received from the client, which
// we stored in our DataHolder until we needed it.
//That is our communication protocol. The client must know the protocol.
//Convert the receivedTransMissionId to byte array.
Byte[] idByteArray = BitConverter.GetBytes
(theDataHolder.receivedTransMissionId);
//Determine the length of all the data that we will send back.
Int32 lengthOfCurrentOutgoingMessage = idByteArray.Length
+ theDataHolder.dataMessageReceived.Length;
//So, now we convert the length integer into a byte array.
//Aren't byte arrays wonderful? Maybe you'll dream about byte arrays tonight!
Byte[] arrayOfBytesInPrefix = BitConverter.GetBytes
(lengthOfCurrentOutgoingMessage);
//Create the byte array to send.
theUserToken.dataToSend = new Byte[theUserToken.sendPrefixLength
+ lengthOfCurrentOutgoingMessage];
//Now copy the 3 things to the theUserToken.dataToSend.
Buffer.BlockCopy(arrayOfBytesInPrefix, 0, theUserToken.dataToSend,
0, theUserToken.sendPrefixLength);
Buffer.BlockCopy(idByteArray, 0, theUserToken.dataToSend,
theUserToken.sendPrefixLength, idByteArray.Length);
//The message that the client sent is already in a byte array, in DataHolder.
Buffer.BlockCopy(theDataHolder.dataMessageReceived, 0,
theUserToken.dataToSend, theUserToken.sendPrefixLength
+ idByteArray.Length, theDataHolder.dataMessageReceived.Length);
theUserToken.sendBytesRemainingCount =
theUserToken.sendPrefixLength + lengthOfCurrentOutgoingMessage;
theUserToken.bytesSentAlreadyCount = 0;
}
}
class DataHolder
{
//Remember, if a socket uses a byte array for its buffer, that byte array is
//unmanaged in .NET and can cause memory fragmentation. So, first write to the
//buffer block used by the SAEA object. Then, you can copy that data to another
//byte array, if you need to keep it or work on it, and want to be able to put
//the SAEA object back in the pool quickly, or continue with the data
//transmission quickly.
//DataHolder has this byte array to which you can copy the data.
internal Byte[] dataMessageReceived;
internal Int32 receivedTransMissionId;
internal Int32 sessionId;
//for testing. With a packet analyzer this can help you see specific connections.
internal EndPoint remoteEndpoint;
}
internal sealed class SocketAsyncEventArgsPool
{
//just for assigning an ID so we can watch our objects while testing.
private Int32 nextTokenId = 0;
// Pool of reusable SocketAsyncEventArgs objects.
Stack pool;
// initializes the object pool to the specified size.
// "capacity" = Maximum number of SocketAsyncEventArgs objects
internal SocketAsyncEventArgsPool(Int32 capacity)
{
this.pool = new Stack(capacity);
}
// The number of SocketAsyncEventArgs instances in the pool.
internal Int32 Count
{
get { return this.pool.Count; }
}
internal Int32 AssignTokenId()
{
Int32 tokenId = Interlocked.Increment(ref nextTokenId);
return tokenId;
}
// Removes a SocketAsyncEventArgs instance from the pool.
// returns SocketAsyncEventArgs removed from the pool.
internal SocketAsyncEventArgs Pop()
{
lock (this.pool)
{
return this.pool.Pop();
}
}
// Add a SocketAsyncEventArg instance to the pool.
// "item" = SocketAsyncEventArgs instance to add to the pool.
internal void Push(SocketAsyncEventArgs item)
{
if (item == null)
{
throw new ArgumentNullException("Items added to a
SocketAsyncEventArgsPool cannot be null");
}
lock (this.pool)
{
this.pool.Push(item);
}
}
}
class SocketListenerSettings
{
// the maximum number of connections the sample is designed to handle simultaneously
private Int32 maxConnections;
// this variable allows us to create some extra SAEA objects for the pool,
// if we wish.
private Int32 numberOfSaeaForRecSend;
// max # of pending connections the listener can hold in queue
private Int32 backlog;
// tells us how many objects to put in pool for accept operations
private Int32 maxSimultaneousAcceptOps;
// buffer size to use for each socket receive operation
private Int32 receiveBufferSize;
// length of message prefix for receive ops
private Int32 receivePrefixLength;
// length of message prefix for send ops
private Int32 sendPrefixLength;
// See comments in buffer manager.
private Int32 opsToPreAllocate;
// Endpoint for the listener.
private IPEndPoint localEndPoint;
public SocketListenerSettings(Int32 maxConnections,
Int32 excessSaeaObjectsInPool, Int32 backlog, Int32 maxSimultaneousAcceptOps,
Int32 receivePrefixLength, Int32 receiveBufferSize, Int32 sendPrefixLength,
Int32 opsToPreAlloc, IPEndPoint theLocalEndPoint)
{
this.maxConnections = maxConnections;
this.numberOfSaeaForRecSend = maxConnections + excessSaeaObjectsInPool;
this.backlog = backlog;
this.maxSimultaneousAcceptOps = maxSimultaneousAcceptOps;
this.receivePrefixLength = receivePrefixLength;
this.receiveBufferSize = receiveBufferSize;
this.sendPrefixLength = sendPrefixLength;
this.opsToPreAllocate = opsToPreAlloc;
this.localEndPoint = theLocalEndPoint;
}
public Int32 MaxConnections
{
get
{
return this.maxConnections;
}
}
public Int32 NumberOfSaeaForRecSend
{
get
{
return this.numberOfSaeaForRecSend;
}
}
public Int32 Backlog
{
get
{
return this.backlog;
}
}
public Int32 MaxAcceptOps
{
get
{
return this.maxSimultaneousAcceptOps;
}
}
public Int32 ReceivePrefixLength
{
get
{
return this.receivePrefixLength;
}
}
public Int32 BufferSize
{
get
{
return this.receiveBufferSize;
}
}
public Int32 SendPrefixLength
{
get
{
return this.sendPrefixLength;
}
}
public Int32 OpsToPreAllocate
{
get
{
return this.opsToPreAllocate;
}
}
public IPEndPoint LocalEndPoint
{
get
{
return this.localEndPoint;
}
}
}
服务器应用程序
下载包含代码的 zip 文件后,将其保存到磁盘。为了避免在 Visual Studio 中使用它时出现问题,在解压缩之前,右键单击已保存的 zip 文件并选择“属性”,然后选择“解除阻止”,然后单击“确定”。然后解压缩。如果您不这样做,可能会收到来自 Visual Studio 的安全警告错误。
在第一次运行服务器代码之前,您可能需要更改日志的写入文件夹。默认路径是c:\LogForSaeaTest\,它将在服务器启动时创建(如果不存在)。如果您不想使用默认文件夹,请在第一次运行应用程序之前修改 TestFileWriter
类中的路径。(TestFileWriter
代码未包含在上面的文章中,但包含在源代码中。)对于大多数情况,我没有将服务器应用程序设置为可以从控制台控制 SocketListenerSettings
和其他变量。您需要修改源代码并重新编译才能在测试过程中进行大多数更改。
最好在不同计算机上运行客户端和服务器。如果您尝试在同一台计算机上运行客户端和服务器,请尝试在客户端的“host”变量值中使用计算机名。如果不起作用,请尝试将“localhost”作为客户端“host”变量的值。
在尝试将客户端连接到服务器时,如果收到“连接被主动拒绝”的消息,请检查您的防火墙是否阻止了服务器入站端口的传输。您可能需要在本地网络上允许该端口的入站传输。如果您有阻止客户端出站传输的防火墙,那么您也需要更改该防火墙的设置。
客户端应用程序
客户端应用程序中的许多代码与服务器应用程序非常相似。服务器代码已完全注释。因此,我并不总是完全注释客户端代码。如果在客户端中发现不理解且未注释的代码,请查看服务器应用程序的类似部分以获取代码注释。
客户端应用程序不是一个普通的客户端,而是一个旨在测试服务器的应用程序。它被设置为可以向服务器提供任意数量的连接。客户端应用程序被设置为在测试开始前为所有这些连接构建所有消息。它为每个客户端发送不同的消息。所有消息都在测试开始前放入内存,因此消息创建不会在测试期间拖累客户端应用程序。如果您选择让 3000 个连接每个连接发送 50000 条消息,那么就是 1.5 亿条消息。这可能太多了,无法放入内存。如果您想进行这样的长时间测试,那么在客户端应用程序中,将 runLongTest
改为 true
。在这种情况下,它不会在测试开始前为每个客户端创建单独的消息数组,而是会为每个客户端反复发送相同的消息数组。这样,消息就可以适合内存。(如果您进行这样的长时间测试,也请将服务器上的 runLongTest
设置为 true
。这将阻止服务器应用程序在测试结束时将接收到的数据写入字典。否则,您可能会用完服务器的内存。)
客户端应用程序设置为您可以在控制台执行以下操作:
- 在查找主机机器网络地址的方法中,输入机器名或 IP 地址,
- 根据您选择的主机网络地址获取方法,输入正确的主机机器名或 IP 地址字符串,
- 输入主机应用程序的端口号,或接受默认值,
- 为日志文件指定一个文件夹名称,或接受默认值,
- 指定缓冲区大小,或接受先前的值,
- 指定要尝试的客户端连接数,或接受先前的值,
- 指示每个连接要发送的 TCP 消息数,或接受先前的值。
在可下载的源代码中,通过写入日志和控制台,有大量的可视化能力。您可以可视化的内容是:
- 程序流程,从方法到方法
- 连接和断开连接
- 从客户端发送到服务器以及从服务器发送到客户端的数据
- 线程,仅在服务器应用程序中(请最后保存线程监视)
理解代码的简单过程
首先,如果您在客户端和/或服务器上使用软件防火墙,请打开这些防火墙以允许应用程序传输到服务器的端口 4444(或您使用的任何端口)。4444 是默认端口。
启动服务器应用程序,并确保在控制台中看到“Server is listening”。然后启动客户端应用程序。您将在控制台中被要求指定主机。使用计算机名,除非服务器有固定 IP 地址,在这种情况下,您可以使用 IP 地址。对于它会询问的其他项目,希望您只需接受默认值。当它显示“Press Enter to begin socket test”时,按 Enter。它应该会很快完成。然后关闭客户端和服务器,以完成日志的写入。您只发送了一条来自一个客户端连接到服务器的消息,以及服务器发回给该客户端连接的响应消息。(如果您遇到问题,请参阅上面的防火墙相关内容。)
现在查看服务器和客户端的日志文件。(如果您将它们打印出来会更容易。)将它们与代码进行比较,并长时间思考。您正在查看一条从一个连接发送的消息以及对该消息的响应。您应该能够从中非常清楚地看到和理解程序流程。
(您可能会发现有用的工具。在测试任何网络应用程序时,如果您使用很棒的免费程序 Wireshark 或类似的工具来查看网络上发生的所有情况,您将学到更多。对于读取在长时间测试运行时由日志记录器生成的非常大的文本文件,请尝试 Cream for Vim。)
现在,对下面的每个测试重复相同的过程。启动服务器,然后启动客户端。客户端将询问您关于缓冲区大小、连接数和消息数。您将在客户端上为这些项目进行一些选择。当您更改连接数时,您就改变了模拟用户的数量。一个连接就像一个在 Internet 或局域网上运行的客户端用户。运行测试。然后,在每次测试后,关闭客户端和服务器应用程序以写入日志。查看日志并确保您理解每次测试后发生的情况。然后进行列表中的下一个测试。
- 测试 1。缓冲区大小保持不变(25),连接数 = 2,消息数 = 2。
- 测试 2。缓冲区大小保持不变(25),连接数 = 3,消息数 = 5。
- 测试 3。客户端缓冲区大小 = 5,连接数 = 3,消息数 = 5。(在此测试中,缓冲区小于带有其前缀的消息。因此,客户端将需要多次发送操作来发送一条消息。并且客户端将需要多次接收操作来接收一条消息。您可以在客户端日志中看到这一点。服务器日志中发生的情况可能有所不同,因为客户端的一个发送操作不一定与服务器的一个接收操作相关联,这是由于 TCP 的工作方式。)
- 测试 4。客户端缓冲区大小 = 3,连接数 = 3,消息数 = 5。(在此测试中,缓冲区甚至小于前缀本身。将需要多次发送操作。)
- 测试 5。将客户端缓冲区大小改回 25,连接数 = 3,消息数 = 5。现在,在服务器的 Program.cs 代码中,将
testBufferSize
从 25 改为 5。编译服务器应用程序。运行测试。您将看到需要多次接收操作才能从客户端接收消息。并且需要多次发送操作才能从服务器发送回客户端。 - 测试 6。在服务器上,将
watchThreads
从false
改为true
,然后编译。在客户端上保持与之前相同的设置,然后再次运行测试。现在,在关闭服务器应用程序后,服务器日志文件将显示有关线程的信息。在记事本或您用来查看日志的任何程序中,搜索短语“New managed thread”。您将看到该短语每次启动新线程时都会出现。通常,应用程序只有 2-4 个托管线程在运行。现在,托管线程号和 Socket 句柄标识符将显示在整个日志文件中。 - 测试 7。在服务器上,将
watchThreads
改为false
,watchProgramFlow
改为false
,maxNumberOfConnections
改为 10,000,然后编译。在客户端上,将watchProgramFlow
改为false
,然后编译。在客户端控制台启动时,将连接数设置为 1000,消息数设置为 500。运行测试。(如果这导致您的客户端应用程序崩溃,请将客户端上的runLongTest
改为true
,然后再次编译。如果您将客户端上的runLongTest
改为true
,那么在开始测试之前,客户端将不会创建一个巨大的消息数组。相反,只会创建一个消息数组,并且该数组将被用于所有连接,反复发送。当客户端上的runLongTest
为false
时,每个连接都有自己唯一的消息数组。) - 测试 8。要运行更大的测试,我建议您也将服务器上的
runLongTest
改为true
。尝试在客户端上设置连接数 = 8000,消息数 = 500000,看看它是否会导致您的服务器应用程序崩溃。在客户端上,有一个tickDelayBeforeNextConn
变量,设置为延迟下一个连接 50000 个刻度(5 毫秒)。您可以玩一下那个变量。如果您一次发送太多新的连接请求,您将使服务器不堪重负。这样做一两次很有趣。服务器可以处理多少连接?这取决于您的硬件、配置和 Windows 版本。在我进行的一些测试中,当在有线局域网(100 MB NIC)上运行 Windows XP Pro 32 位操作系统的旧单处理器 Dell 桌面机上运行服务器时,它可以处理 2000 个连接持续发送/接收消息而没有任何问题。它通常可以很好地处理 3000 个连接。而 4000 个连接有时可能会导致问题。请记住,如果您打开数千个连接,可能会使运行客户端应用程序的机器不堪重负,因为您为每个连接打开一个端口。如果服务器负载很重,它可能会拒绝一些新的连接。但是,客户端应用程序可以重试被拒绝的连接,直到连接成功。
历史
- 2010 年 12 月 13 日:代码版本 1.3。将客户端更改为使用阻塞的
Stack<T>
来以更受控的方式处理新连接的启动。以前,当服务器过载并拒绝大量连接时,客户端应用程序对连接重试的处理不佳。在客户端的DisconnectAsync()
之前添加了Shutdown()
。更改了客户端,使其在完成使用后能够正确关闭 Socket。在 v1.2 的服务器应用程序中,处理坏的接受 Socket 的代码在CloseClientSocket
中。现在它在HandleBadAccept
中处理。该更改消除了在 v1.2 中的CloseClientSocket
方法中在用户令牌强制转换后检查null
的情况。 - 2010 年 10 月 12 日:代码版本 1.2。修复了
ProcessAccept
中的一个 bug,在发生 Socket 错误的情况下,我们需要LoopToStartAccept
并删除坏的 Socket。修复了CloseClientSocket
中的一个 bug,当时我们试图销毁接受操作的 SAEA 中的 Socket,因为 SAEA 的用户令牌无法强制转换为DataHoldingUserToken
类型。修复了客户端代码中的一个 bug——当runlongtest
变量为true
时,客户端仍然尝试将接收到的数据保存在 List<T> 中,这最终会导致其耗尽内存并在具有海量消息的测试中崩溃。改进了一些日志写入代码。清理了客户端测试应用程序代码,我之前只是随便写的,没有考虑可读性。将SocketListener.ProcessReceive
中的几行移到了PrefixHandler
。改进了客户端应用程序中的错误处理。在服务器中,为了可读性,一些代码从Main
方法移到了其他方法。 - 2010 年 9 月 8 日:代码版本 1.1。
ProcessReceive
方法以前太大了。我将其中一部分移到了PrefixHandler
和MessageHandler
类中。将接收缓冲区设置从ProcessSend()
移到StartReceive()
。修复了两个 bug。一个是在接收字节数正好等于前缀长度时。另一个是由用户 mixal11 注意到的。在异常断开连接的情况下,我未能重置 Token 的某些成员。感谢 mixal11。对文章进行了一些小的修改。 - 2010 年 8 月 14 日:修复了文章中的一些拼写错误。更改了文章中的一些格式,并移动了一个段落以求清晰。重写了一些内容。