65.9K
CodeProject 正在变化。 阅读更多。
Home

c# 异步套接字服务器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.55/5 (21投票s)

2014年3月17日

CPOL

4分钟阅读

viewsIcon

98748

如何在 c# 中编写异步套接字服务器

引言

我最近需要一个用于 .NET 项目的进程间通信机制。该项目由多个服务器和客户端组成,包括 ASP.NET、Windows Forms 和控制台应用程序。考虑到各种可能性,我最终决定使用原始套接字,而不是 .NET 中许多预构建的机制,例如命名管道、NetTcpClient 和 Azure Service Bus。

本文中的服务器基于 System.Net.Sockets 类的异步方法。这些方法允许您支持大量套接字客户端,并且服务器唯一的阻塞机制是客户端连接时。阻塞时间可以忽略不计,并且服务器实际上将作为多线程套接字服务器运行。

背景

原始套接字的优点是您可以完全控制通信层,并且在处理不同数据类型时具有很大的灵活性。您甚至可以通过套接字发送序列化的 CLR 对象,尽管我在这里不会深入探讨。该项目将向您展示如何在套接字之间发送文本。

使用代码

要使用该代码,您需要实例化 Server 类,并运行 Start() 方法

Server myServer = new Server();
myServer.Start();

如果您计划在 Windows 窗体中托管服务器,我建议使用 BackgroundWorker,因为套接字方法(特别是 ManualResentEvent)将阻塞 GUI 线程。

Server 类:

using System.Net.Sockets;

public class Server
{
    private static Socket listener;
    public static ManualResetEvent allDone = new ManualResetEvent(false);
    public const int _bufferSize = 1024;
    public const int _port = 50000;
    public static bool _isRunning = true;

    class StateObject
    {
        public Socket workSocket = null;
        public byte[] buffer = new byte[bufferSize];
        public StringBuilder sb = new StringBuilder();
    }

    // Returns the string between str1 and str2
    static string Between(string str, string str1, string str2)
    {
        int i1 = 0, i2 = 0;
        string rtn = "";

        i1 = str.IndexOf(str1, StringComparison.InvariantCultureIgnoreCase);
        if (i1 > -1)
        {
            i2 = str.IndexOf(str2, i1 + 1, StringComparison.InvariantCultureIgnoreCase);
            if (i2 > -1)
            {
                rtn = str.Substring(i1 + str1.Length, i2 - i1 - str1.Length);
            }
        }
        return rtn;
    }

    // Checks if the socket is connected
    static bool IsSocketConnected(Socket s)
    {
        return !((s.Poll(1000, SelectMode.SelectRead) && (s.Available == 0)) || !s.Connected);
    }

    // Insert all the other methods here.
}  

ManualResetEvent 是一个 .NET 类,它在您的套接字服务器中实现事件。我们需要此对象来通知我们的代码何时要释放阻塞操作。您可以试验 bufferSize 以满足您的需求。如果您有可预测大小的消息,请将 bufferSize 设置为消息大小(以字节为单位)。port 是要侦听的 TCP 端口。请注意不要使用为其他应用程序保留的端口。如果要能够正常停止服务器,则需要实现一些机制将 _isRunning 设置为 false。这通常可以通过使用 BackgroundWorker 来完成,您可以在其中将 _isRunning 替换为 myWorker.CancellationPending。我包含 _isRunning 的原因是给您一个关于处理取消的方向,并向您展示侦听器可以正常停止。

Between()IsSocketConnected() 是辅助方法。

现在到方法。首先,Start() 方法

public void Start()
{
    IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
    IPEndPoint localEP = new IPEndPoint(IPAddress.Any, _port);
    listener = new Socket(localEP.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    listener.Bind(localEP);

    while (_IsRunning)
    {
        allDone.Reset();
        listener.Listen(10);
        listener.BeginAccept(new AsyncCallback(acceptCallback), listener);
        bool isRequest = allDone.WaitOne(new TimeSpan(12, 0, 0));  // Blocks for 12 hours

        if (!isRequest)
        {
            allDone.Set();
            // Do some work here every 12 hours
        }
    }
    listener.Close();
} 

此方法启动侦听器套接字,并开始等待客户端连接。此项目中的主要模式是使用异步委托。当状态在调用方中更改时,异步委托是异步调用的函数。isRequest 告诉您 WaitOne 是否由于客户端连接而退出,或者由于超时已到达。

如果您有大量客户端同时连接,请考虑增加 Listen() 方法的队列参数。

现在是下一个方法,acceptCallback 。此方法由 listener.BeginAccept 异步调用。当该方法完成时,侦听器将立即侦听新客户端。

static void acceptCallback(IAsyncResult ar)
{
    // Get the listener that handles the client request.
    Socket listener = (Socket)ar.AsyncState;

    if (listener != null)
    {
        Socket handler = listener.EndAccept(ar);

        // Signal main thread to continue
        allDone.Set();

        // Create state
        StateObject state = new StateObject();
        state.workSocket = handler;
        handler.BeginReceive(state.buffer, 0, _bufferSize, 0, new AsyncCallback(readCallback), state);
    }
}

acceptCallback 派生另一个异步委托:readCallback。此方法将从套接字读取实际数据。我为发送和接收数据创建了自己的协议,该协议与 _bufferSize 无关。发送到服务器的每个字符串都必须被 <!--SOCKET--> 和 <!--ENDSOCKET--> 包裹。同样,客户端在接收来自服务器的响应时,必须解包响应,该响应由 <!--RESPONSE--> 和 <!--ENDRESPONSE--> 包裹

static void readCallback(IAsyncResult ar)
{
    StateObject state = (StateObject)ar.AsyncState;
    Socket handler = state.workSocket;

    if (!IsSocketConnected(handler)) 
    {
        handler.Close();
        return;
    }

    int read = handler.EndReceive(ar);

    // Data was read from the client socket.
    if (read > 0)
    {
        state.sb.Append(Encoding.UTF8.GetString(state.buffer, 0, read));

        if (state.sb.ToString().Contains("<!--ENDSOCKET-->"))
        {
            string toSend = "";
            string cmd = ts.Strings.Between(state.sb.ToString(), "<!--SOCKET-->", "<!--ENDSOCKET-->");
                    
            switch (cmd)
            {
                case "Hi!":
                    toSend = "How are you?";
                    break;
                case "Milky Way?":
                    toSend = "No I am not.";
                    break;
            }

            toSend = "<!--RESPONSE-->" + toSend + "<!--ENDRESPONSE-->";

            byte[] bytesToSend = Encoding.UTF8.GetBytes(toSend);
            handler.BeginSend(bytesToSend, 0, bytesToSend.Length, SocketFlags.None
                , new AsyncCallback(sendCallback), state);
        }
        else 
        {
            handler.BeginReceive(state.buffer, 0, _bufferSize, 0
                    , new AsyncCallback(readCallback), state);
        }
    }
    else
    {
            handler.Close();
    }
}

readCallback 将派生另一个方法,sendCallback,它将向客户端发送响应。如果客户端未关闭连接,sendCallback 将向套接字发出信号,以侦听更多数据。

static void sendCallback(IAsyncResult ar)
{
    StateObject state = (StateObject)ar.AsyncState;
    Socket handler = state.workSocket;
    handler.EndSend(ar);

    StateObject newstate = new StateObject();
    newstate.workSocket = handler;
    handler.BeginReceive(newstate.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(readCallback), newstate);
}

我将留给读者来编写套接字客户端。套接字客户端应使用与异步回调相同的编程模式。希望您喜欢这篇文章,并希望您能成为一名优秀的套接字程序员!

关注点

我正在生产环境中使用此代码,其中套接字服务器是一个自由文本搜索引擎。SQL Server 缺乏对自由文本搜索的支持(您可以使用自由文本索引,但它们很慢且昂贵)。套接字服务器将大量文本数据加载到 iEnumerables 中,并使用 Linq 搜索文本。当搜索数百万行 unicode 文本数据时,来自套接字服务器的响应以毫秒为单位。我们还使用了三个分布式 Sphinx 服务器 (www.sphinxsearch.com)。套接字服务器充当 Sphinx 服务器的缓存。如果您需要一个快速的自由文本搜索引擎,我强烈推荐 Sphinx。

历史

First version.

© . All rights reserved.