c# 异步套接字服务器






4.55/5 (21投票s)
如何在 c# 中编写异步套接字服务器
引言
我最近需要一个用于 .NET 项目的进程间通信机制。该项目由多个服务器和客户端组成,包括 ASP.NET、Windows Forms 和控制台应用程序。考虑到各种可能性,我最终决定使用原始套接字,而不是 .NET 中许多预构建的机制,例如命名管道、NetTcpClient 和 Azure Service Bus。
本文中的服务器基于 System.Net.Sockets 类的异步方法。这些方法允许您支持大量套接字客户端,并且服务器唯一的阻塞机制是客户端连接时。阻塞时间可以忽略不计,并且服务器实际上将作为多线程套接字服务器运行。
背景
原始套接字的优点是您可以完全控制通信层,并且在处理不同数据类型时具有很大的灵活性。您甚至可以通过套接字发送序列化的 CLR 对象,尽管我在这里不会深入探讨。该项目将向您展示如何在套接字之间发送文本。
使用代码
要使用该代码,您需要实例化 Server 类,并运行 Start() 方法
Server myServer = new Server();
myServer.Start();
如果您计划在 Windows 窗体中托管服务器,我建议使用 BackgroundWorker,因为套接字方法(特别是 ManualResentEvent)将阻塞 GUI 线程。
Server 类:
using System.Net.Sockets;
public class Server
{
private static Socket listener;
public static ManualResetEvent allDone = new ManualResetEvent(false);
public const int _bufferSize = 1024;
public const int _port = 50000;
public static bool _isRunning = true;
class StateObject
{
public Socket workSocket = null;
public byte[] buffer = new byte[bufferSize];
public StringBuilder sb = new StringBuilder();
}
// Returns the string between str1 and str2
static string Between(string str, string str1, string str2)
{
int i1 = 0, i2 = 0;
string rtn = "";
i1 = str.IndexOf(str1, StringComparison.InvariantCultureIgnoreCase);
if (i1 > -1)
{
i2 = str.IndexOf(str2, i1 + 1, StringComparison.InvariantCultureIgnoreCase);
if (i2 > -1)
{
rtn = str.Substring(i1 + str1.Length, i2 - i1 - str1.Length);
}
}
return rtn;
}
// Checks if the socket is connected
static bool IsSocketConnected(Socket s)
{
return !((s.Poll(1000, SelectMode.SelectRead) && (s.Available == 0)) || !s.Connected);
}
// Insert all the other methods here.
}
ManualResetEvent
是一个 .NET 类,它在您的套接字服务器中实现事件。我们需要此对象来通知我们的代码何时要释放阻塞操作。您可以试验 bufferSize
以满足您的需求。如果您有可预测大小的消息,请将 bufferSize 设置为消息大小(以字节为单位)。port
是要侦听的 TCP 端口。请注意不要使用为其他应用程序保留的端口。如果要能够正常停止服务器,则需要实现一些机制将 _isRunning
设置为 false。这通常可以通过使用 BackgroundWorker 来完成,您可以在其中将 _isRunning 替换为 myWorker.CancellationPending。我包含 _isRunning 的原因是给您一个关于处理取消的方向,并向您展示侦听器可以正常停止。
Between()
和 IsSocketConnected()
是辅助方法。
现在到方法。首先,Start() 方法
public void Start()
{
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
IPEndPoint localEP = new IPEndPoint(IPAddress.Any, _port);
listener = new Socket(localEP.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(localEP);
while (_IsRunning)
{
allDone.Reset();
listener.Listen(10);
listener.BeginAccept(new AsyncCallback(acceptCallback), listener);
bool isRequest = allDone.WaitOne(new TimeSpan(12, 0, 0)); // Blocks for 12 hours
if (!isRequest)
{
allDone.Set();
// Do some work here every 12 hours
}
}
listener.Close();
}
此方法启动侦听器套接字,并开始等待客户端连接。此项目中的主要模式是使用异步委托。当状态在调用方中更改时,异步委托是异步调用的函数。isRequest
告诉您 WaitOne
是否由于客户端连接而退出,或者由于超时已到达。
如果您有大量客户端同时连接,请考虑增加 Listen() 方法的队列参数。
现在是下一个方法,acceptCallback
。此方法由 listener.BeginAccept 异步调用。当该方法完成时,侦听器将立即侦听新客户端。
static void acceptCallback(IAsyncResult ar)
{
// Get the listener that handles the client request.
Socket listener = (Socket)ar.AsyncState;
if (listener != null)
{
Socket handler = listener.EndAccept(ar);
// Signal main thread to continue
allDone.Set();
// Create state
StateObject state = new StateObject();
state.workSocket = handler;
handler.BeginReceive(state.buffer, 0, _bufferSize, 0, new AsyncCallback(readCallback), state);
}
}
acceptCallback
派生另一个异步委托:readCallback
。此方法将从套接字读取实际数据。我为发送和接收数据创建了自己的协议,该协议与 _bufferSize 无关。发送到服务器的每个字符串都必须被 <!--SOCKET--> 和 <!--ENDSOCKET--> 包裹。同样,客户端在接收来自服务器的响应时,必须解包响应,该响应由 <!--RESPONSE--> 和 <!--ENDRESPONSE--> 包裹
static void readCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
if (!IsSocketConnected(handler))
{
handler.Close();
return;
}
int read = handler.EndReceive(ar);
// Data was read from the client socket.
if (read > 0)
{
state.sb.Append(Encoding.UTF8.GetString(state.buffer, 0, read));
if (state.sb.ToString().Contains("<!--ENDSOCKET-->"))
{
string toSend = "";
string cmd = ts.Strings.Between(state.sb.ToString(), "<!--SOCKET-->", "<!--ENDSOCKET-->");
switch (cmd)
{
case "Hi!":
toSend = "How are you?";
break;
case "Milky Way?":
toSend = "No I am not.";
break;
}
toSend = "<!--RESPONSE-->" + toSend + "<!--ENDRESPONSE-->";
byte[] bytesToSend = Encoding.UTF8.GetBytes(toSend);
handler.BeginSend(bytesToSend, 0, bytesToSend.Length, SocketFlags.None
, new AsyncCallback(sendCallback), state);
}
else
{
handler.BeginReceive(state.buffer, 0, _bufferSize, 0
, new AsyncCallback(readCallback), state);
}
}
else
{
handler.Close();
}
}
readCallback
将派生另一个方法,sendCallback
,它将向客户端发送响应。如果客户端未关闭连接,sendCallback
将向套接字发出信号,以侦听更多数据。
static void sendCallback(IAsyncResult ar)
{
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
handler.EndSend(ar);
StateObject newstate = new StateObject();
newstate.workSocket = handler;
handler.BeginReceive(newstate.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(readCallback), newstate);
}
我将留给读者来编写套接字客户端。套接字客户端应使用与异步回调相同的编程模式。希望您喜欢这篇文章,并希望您能成为一名优秀的套接字程序员!
关注点
我正在生产环境中使用此代码,其中套接字服务器是一个自由文本搜索引擎。SQL Server 缺乏对自由文本搜索的支持(您可以使用自由文本索引,但它们很慢且昂贵)。套接字服务器将大量文本数据加载到 iEnumerables 中,并使用 Linq 搜索文本。当搜索数百万行 unicode 文本数据时,来自套接字服务器的响应以毫秒为单位。我们还使用了三个分布式 Sphinx 服务器 (www.sphinxsearch.com)。套接字服务器充当 Sphinx 服务器的缓存。如果您需要一个快速的自由文本搜索引擎,我强烈推荐 Sphinx。
历史
First version.