一个用于客户端/服务器套接字系统的完整库






4.81/5 (42投票s)
我的文章介绍了一个任何人都可以用来创建套接字通信的库。此外,它还解释了该库是如何开发的。
引言
在我职业生涯中,我用 C++、Java、C# 开发了各种不同目的和使用不同技术的套接字服务器。在过去的 3 年里,我开发了一个客户端/服务器套接字系统,用于将消息从多个客户端分发到多个服务器。我想通过这次经历分享我的学习心得,并编写一个库来创建你自己的消息系统。
该库公开了一组 abstract
类,你可以快速实现它们来创建自己的消息系统。这样,你就不需要深入了解该库,也不需要修改它。该库使用 I/O 完成端口和多线程池来实现高性能服务器。该库将客户端和服务器公开为带有头部的消息。
背景
为了让服务器能够处理不同的客户端并在接收时执行其他操作,接收是异步的。调用 BeginReceive
方法以启动接收,并传递一个回调方法和一个 SocketObjectState
作为异步接收的状态对象。然后,调用 EndReceive
方法来获取接收到的字节。当服务器接收到消息时,它会触发一个事件。该事件在一个不同的线程上触发,以避免阻塞服务器。ThreadPool
系统类就是为此目的而设计的。
当服务器接收消息时,系统套接字会使用一个缓冲区,并通过多次读取来接收长消息,因为它可以将多个小消息接收到同一个缓冲区中,这些消息可能同时发送。因此,AbstractMessage
公开了一个 static
方法 TryReadMessage
,用于处理所有情况。在单元测试项目中,你可以看到对该方法的单元测试。
术语“socket client handler”用于表示处理系统套接字的类。该库包含一个抽象处理器,它可以管理作为 SSL 流的套接字。
Using the Code
要创建自己的消息系统,你需要实现以下 abstract
类列表:
AbstractMessageHeader
定义每条消息的消息头。AbstractMessage
定义消息。AbstractTcpSocketClientHandler
定义套接字客户端处理器。AbstractSocketServer
定义套接字服务器。AbstractSocketClient
定义套接字客户端。
对 AbstractMessageHeader
的实现应定义头部是如何组成的(字节到字节),以及其在缓冲区中读写头部的相关方法。AbstractMessage
的实现应定义所使用的消息头类型,如下面的代码所示。
public class BasicMessage : AbstractMessage
{
....
....
protected override AbstractMessageHeader GetMessageHeaderInstance()
{
return new BasicHeader();
}
}
其他实现非常简单,如果你不需要做特殊的事情。从 AbstractTcpSocketClient
实现的类定义了所使用的消息类。
/// <summary>
/// Basic Socket Client Handler. Implements the AbstractTcpSocketClientHandler class.
/// The message class is BasicMessage.
/// </summary>
class BasicSocketClientHandler : AbstractTcpSocketClientHandler
{
/// <summary>
/// The constructor for SSL connection.
/// </summary>
/// <param name="handler">The socket client handler</param>
/// <param name="stream">The ssl stream</param>
public BasicSocketClientHandler(Socket handler, SslStream stream)
: base(handler, stream)
{
}
/// <summary>
/// The constructor for not SSL connection.
/// </summary>
/// <param name="handler">The socket client handler</param>
public BasicSocketClientHandler(Socket handler)
: base(handler, null)
{
}
/// <summary>
/// Return a BasicMessage empty instance.
/// </summary>
/// <returns>The BasicMessage instance</returns>
protected override AbstractMessage GetMessageInstance()
{
return new BasicMessage();
}
}
最后,你的自定义客户端和服务器类都定义了所使用的套接字客户端处理器类。
protected override AbstractTcpSocketClientHandler GetHandler
(System.Net.Sockets.Socket handler, System.Net.Security.SslStream stream)
{
return new BasicClientServerLib.SocketHandler.BasicSocketClientHandler(handler, stream);
}
该解决方案包含一个 BasicClientServerLib
项目,提供了一个所有 abstract
类的简单实现和一个简单的测试应用程序。
关注点
上面对该库的解释由以下主要组件组成:
- 消息 + 消息头
- 套接字客户端处理器
- 服务器
- 客户端
你可以使用证书初始化服务器。这样,通信就是 SSL 通信,客户端必须提供客户端证书才能连接到服务器。如果你想验证客户端证书(或服务器证书),你需要编写一段代码,就像你想从 Windows 存储库而不是文件系统中获取证书一样。
当调用接收回调时,你可以在缓冲区中读到一个完整的消息,或多个完整的消息,或消息的一部分。套接字客户端处理器调用 TryReadMessage
方法来尝试从缓冲区读取消息。如果读取到一个完整的消息,就会触发一个事件,然后处理器会检查缓冲区中是否还有其他消息。如果缓冲区不包含完整的消息,则不完整的消息会保存在 state
对象中,并在下一次回调时,新的缓冲区将添加到之前的缓冲区中。你可以在 TryReadMessage
方法中看到这部分代码。
/// <summary>
/// Callback for asynchronous receiving.
/// </summary>
/// <param name="ar">The socket state object for receiving data</param>
private static void readCallback(IAsyncResult ar)
{
SocketStateObject state = (SocketStateObject)ar.AsyncState;
AbstractTcpSocketClientHandler handler = state.workHandler;
try
{
// Read data from the client socket.
int read = handler.EndReceive(ar);
Trace.WriteLine(string.Format("Receive {0} bytes", read));
// Data was read from the client socket.
if (read > 0)
{
// Fire event for incoming message
handler.OnReceivingMessage(handler);
while (true)
{
AbstractMessage message = AbstractMessage.TryReadMessage
(handler.GetMessageInstance(), state, read);
// Fire event for received message
if (message != null)
{
ReceiveMessageStateObject rcvObj = new ReceiveMessageStateObject()
{ Handler = handler, Message = message };
handler.OnReceiveMessage(rcvObj);
}
if (state.pendingBuffer == null)
{
break;
}
read = 0;
}
handler.socket.BeginReceive(state, new AsyncCallback(readCallback));
}
else
{
handler.Close();
}
}
catch (MessageException mex)
{
....
....
....
}
....
....
....
}
因此,SocketStateObject
不仅用于以异步方式读取传入消息,还用于存储处理不同情况所需的必要信息。具体来说,属性“message
”用于保存不完整的消息,而 pendingBuffer
用于包含第一条消息后的剩余数据。
internal class SocketStateObject
{
public AbstractTcpSocketClientHandler workHandler = null;
public const int BufferSize = 8192;
public byte[] buffer = new byte[BufferSize];
public AbstractMessage message = null;
public byte[] pendingBuffer = null;
}
/// <summary>
/// Try to read a message from the buffer.
/// </summary>
/// <param name="message">The destination message</param>
/// <param name="state">The state object</param>
/// <param name="byteRead">The umber of bytes in the input buffer</param>
/// <returns>The message read, otherwise false.</returns>
internal static AbstractMessage TryReadMessage
(AbstractMessage message, SocketStateObject state, int byteRead)
{
AbstractMessage messageRead = null;
int moreMessage = 0;
byte[] buffer = state.buffer; // Get buffer
if (state.pendingBuffer != null) //Check for pending data and merge it
{
buffer = new byte[byteRead + state.pendingBuffer.Length];
Array.Copy(state.pendingBuffer, 0, buffer, 0, state.pendingBuffer.Length);
Array.Copy(state.buffer, 0, buffer, state.pendingBuffer.Length, byteRead);
byteRead = buffer.Length;
}
state.pendingBuffer = null;
if (state.message == null)
{
state.message = message;
moreMessage = state.message.ReadFirstMessage(buffer, byteRead);
Trace.WriteLine(string.Format("Receive 1st package MessageUID {0} ClientUID {1}",
state.message.MessageUID, state.message.ClientUID));
}
else
{
moreMessage = state.message.AppendBuffer(buffer, byteRead);
Trace.WriteLine(string.Format("Receive more package MessageUID {0} ClientUID {1}",
state.message.MessageUID, state.message.ClientUID));
}
if (state.message.IsComplete())
{
Trace.WriteLine(string.Format("Receive complete message {0} len {1}",
state.message.MessageUID, state.message.MessageLength));
messageRead = state.message;
Trace.WriteLine(string.Format
("Prepare to receive a new message. moreMessage = {0}", moreMessage));
state.message = null;
if (moreMessage > 0)
{
state.pendingBuffer = new byte[byteRead - moreMessage];
Array.Copy(buffer, moreMessage, state.pendingBuffer, 0, state.pendingBuffer.Length);
Trace.WriteLine(string.Format
("Copy {0} bytes to pending buffer", state.pendingBuffer.Length));
}
}
return messageRead;
}
新版本 (V3) 的更改
根据一些建议,我在客户端类中添加了连接事件,并在抽象库模型中添加了两个新类,以便提供一个使用 ReceiveAsync
方法而不是 Begin
/End Receive
的版本。
新类是:
TcpSocketAsync
– 继承自TcpSocket
AbstractAsyncTcpSocketClientHandler
– 继承自AbstractTcpSocketClientHandler
此外,我还添加了一个名为 AsyncClientServerLib
的新测试库,它使用新类实现了一个测试库。
添加新功能 (V4)
我已为该库添加了另外两项功能:
- 使用异步方法发送消息 (
SendAsync
) - 使用队列接收消息
第一项功能在服务器端非常有用,可以避免传输过程中可能发生的阻塞。但是,会使用线程事件来阻止新的传输,直到前一次传输完成。
第二个新功能可能在客户端更有用,以避免过多的线程并按队列排序所有传入的消息。为了实现此功能,我使用了 .NET 4.0 中的一个特性 (BlockingCollection
),因此此版本不能与之前的 .NET 版本一起使用。