65.9K
CodeProject 正在变化。 阅读更多。
Home

创建使用命名管道的服务器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.80/5 (31投票s)

2015年1月31日

CPOL

8分钟阅读

viewsIcon

107594

downloadIcon

5065

在本文中,我将介绍如何使用命名管道创建进程间通信库。

引言

在本文中,我将介绍如何使用 C# .Net 4 中的命名管道实现服务器和客户端。
我使用了 NamedPipeServerStreamNamedPipeClientStream,但很快就意识到“服务器”这个名字有点令人困惑。NamedPipeServerStream 一次只能处理一个客户端(请参阅 MSDN 中的 命名管道实例主题),但我需要一个能够处理多个客户端请求的服务器。
我在网上找不到任何适合我需求的示例,因此,我使用 NamedPipeServerStream 和 NamedPipeClientStream 创建了我自己的服务器实现。

背景

正如 Microsoft 定义的那样,命名管道是一种命名的一路或双路管道,用于在管道服务器和一个或多个管道客户端之间进行通信。命名管道的所有实例共享相同的管道名称,但每个实例都有自己的缓冲区和句柄,并为客户端/服务器通信提供单独的通道。实例的使用使得多个管道客户端可以同时使用同一个命名管道。任何进程都可以访问命名管道,但需经过安全检查,这使得命名管道成为相关或不相关进程之间通信的便捷方式。 更多...

下载代码

https://github.com/IfatChitin/Named-Pipes

代码简介

PipeServer 负责创建和维护命名管道流,这些管道是为每个客户端打开的。
InternalPipeServer 是 NamedPipeServerStream 的包装器。
PipeClient 是 NamedPipeClientStream 的包装器。

主要流程

  1. 创建并启动 PipeServer
    • 生成新的管道名称。
    • 创建 InternalPipeServer 的新实例并开始等待客户端连接。
  2. 创建并启动 PipeClient
    • 与 InternalPipeServer 建立连接。
    • InternalPipeServer 触发一个事件,让 PipeServer 知道已建立连接。
      • PipeServer 触发自己的事件,告知外界已连接客户端。然后,它会创建一个新的 InternalPipeServer 实例并启动它,以便它开始等待新的连接,同时第一个实例与第一个客户端进行通信。
    • InternalPipeServer 开始异步读取操作,该操作在客户端发送消息、断开连接或管道关闭时完成。
  3. PipeClient 发送消息 
    • InternalPipeServer 接收到消息的一部分,因为消息比其缓冲区大小长,并启动新的异步读取操作。
    • InternalPipeServer 接收消息的其余部分,将其附加到第一部分,触发一个事件告知 PipeServer 已收到新消息,并启动新的异步读取操作以等待新消息。
      • PipeServer 触发自己的事件,告知外界已收到来自某个客户端的新消息。
  4. PipeClient 断开连接
    • InternalPipeServer 的读取操作在未读取到字节的情况下结束,因此 InternalPipeServer 假定客户端已断开连接。它会触发一个事件告知 PipeServer。
    • PipeServer 触发自己的事件,告知外界某个客户端已断开连接。
  5. PipeServer 停止
    • PipeServer 停止其所有 InternalPipeServer 实例。

使用代码

如果您需要与其他进程通信,请使用附带的代码。
在一个进程中创建一个 PipeServer,在另一个进程中创建一个 PipeClient。然后使用 PipeClient 向服务器发送消息。

InternalPipeServer

InternalPipeServer 构造函数

NamedPipeServerStream 在 InternalPipeServer 的构造函数中创建。
传递给 NamedPipeServerStream 构造函数的参数是

  • pipeName: 要创建的管道的名称。客户端必须熟悉此名称才能连接到管道服务器。
  • PipeDirection.InOut: 管道方向。
  • maxNumberOfServerInstances: 共享同一名称的最大服务器实例数。创建 NamedPipeServerStream 时,如果其创建达到最大数量,将抛出 I/O 异常。
  • PipeTransmissionMode.Message: 管道传输模式。我选择 Message,因为它暴露了一个有用的标志“IsMessageCompleted”,有助于流上的通信实现。为了使用 Message 传输模式,管道方向必须是 InOut。
  • PipeOptions.Asynchronous: 这启用了异步读写操作。
/// <summary>
/// Creates a new NamedPipeServerStream 
/// </summary>
public InternalPipeServer(string pipeName, int maxNumberOfServerInstances)
{
    _pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
    Id = Guid.NewGuid().ToString();
}

Start

BeginWaitForConnection 接收一个回调,在操作完成后调用,以及一个将传递给回调的用户定义对象。在这种情况下,我们发送 null。

/// <summary>
/// This method begins an asynchronous operation to wait for a client to connect.
/// </summary>
public void Start()
{
    try
    {
        _pipeServer.BeginWaitForConnection(WaitForConnectionCallBack, null);
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
}

WaitForConnectionCallBack

WaitForConnectionCallBack 在客户端连接后调用,但不仅限于此。服务器关闭时也会调用它。

  • 对于每个 BeginWaitForConnection,必须使用返回的 asyncResult 调用 EndWaitForConnection
  • 为了避免在关闭服务器时调用 EndWaitForConnection 时抛出异常,我们检查 _isStopping 标志,在关闭服务器时将其设置为 True。
  • 此类中的所有操作都是异步的,因此在读取标志之前使用锁。
  • 然而,由于锁定会降低性能,我们在锁定之前检查标志。然后我们会在锁定之后再次检查它,因为它可能在上次检查和锁定之间发生了变化。
/// <summary>
/// This callback is called when the async WaitForConnection operation is completed,
/// whether a connection was made or not. WaitForConnection can be completed when the server disconnects.
/// </summary>
private void WaitForConnectionCallBack(IAsyncResult result)
{
    if (!_isStopping)
    {
        lock (_lockingObject)
        {
            if (!_isStopping)
            {
                // Call EndWaitForConnection to complete the connection operation
                _pipeServer.EndWaitForConnection(result);

                OnConnected();

                BeginRead(new Info());
            }
        }
    }
}

OnConnected

向订阅它的任何侦听器触发 ClientConnectedEvent 事件。

/// <summary>
/// This method fires ConnectedEvent
/// </summary>
private void OnConnected()
{
    if (ClientConnectedEvent != null)
    {
        ClientConnectedEvent(this, new ClientConnectedEventArgs { ClientId = Id });
    }
}

Info(信息)

Info 类保存消息信息。在调用 BeginRead 之前,在 WaitForConnectionCallBack 中创建了一个新的 Info 实例。

private class Info
{
    public readonly byte[] Buffer;
    public readonly StringBuilder StringBuilder;

    public Info()
    {
        Buffer = new byte[BufferSize];
        StringBuilder = new StringBuilder();
    }
}

BeginRead

BeginRead 接收以下参数

  • buffer: 要将数据读取到的缓冲区。
  • offset: 在 buffer 中开始读取的字节偏移量。
  • count: 要读取的最大字节数。
  • callback: 异步读取操作完成后要调用的方法。
  • state: 用户提供的对象,用于区分此特定的异步读取请求与其他请求。

给定的 Info 作为 state 传递给 EndReadCallBack

/// <summary>
/// This method begins an asynchronous read operation.
/// </summary>
private void BeginRead(Info info)
{
    try
    {
        _pipeServer.BeginRead(info.Buffer, 0, BufferSize, EndReadCallBack, info);
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
}

EndReadCallBack

EndReadCallBack 是一个非常有趣的方法。

它在 BeginRead 操作完成时调用,而这不一定仅仅是客户端向管道写入消息时,也包括客户端断开连接或服务器关闭时。

  • 调用 EndRead 返回读取的字节数。
  • 如果读取了字节(非零),则将从给定的 info 中提取它们,并且 info 的字符串构建器将附加消息。
    • 如果消息不完整,则通过调用 BeginRead 并使用相同的 info 来启动新的读取操作。
    • 如果消息完整,则触发 MessageReceivedEvent 并使用新的 info 启动新的读取操作。
  • 如果没有读取到字节,则可能意味着客户端已断开连接或服务器已关闭。将触发 ClienticonnectedEvent 并停止 InternalPipeServer。
/// <summary>
/// This callback is called when the BeginRead operation is completed.
/// We can arrive here whether the connection is valid or not
/// </summary>
private void EndReadCallBack(IAsyncResult result)
{
    var readBytes = _pipeServer.EndRead(result);
    if (readBytes > 0)
    {
        var info = (Info)result.AsyncState;

        // Get the read bytes and append them
        info.StringBuilder.Append(Encoding.UTF8.GetString(info.Buffer, 0, readBytes));

        if (!_pipeServer.IsMessageComplete) // Message is not complete, continue reading
        {
            BeginRead(info);
        }
        else // Message is completed
        {
            // Finalize the received string and fire MessageReceivedEvent
            var message = info.StringBuilder.ToString().TrimEnd('\0');

            OnMessageReceived(message);

            // Begin a new reading operation
            BeginRead(new Info());
        }
    }
    else // When no bytes were read, it can mean that the client have been disconnected
    {
        if (!_isStopping)
        {
            lock (_lockingObject)
            {
                if (!_isStopping)
                {
                    OnDisconnected();
                    Stop();
                }
            }
        }
    }
}

OnMessageReceived

/// <summary>
/// This method fires MessageReceivedEvent with the given message
/// </summary>
private void OnMessageReceived(string  message)
{
    if (MessageReceivedEvent != null)
    {
        MessageReceivedEvent(this,
            new MessageReceivedEventArgs
            {
                Message = message
            });
    }
}

OnDisconnected

/// <summary>
/// This method fires DisconnectedEvent 
/// </summary>
private void OnDisconnected()
{
    if (ClientDisconnectedEvent != null)
    {
        ClientDisconnectedEvent(this, new ClientDisconnectedEventArgs { ClientId = Id });
    }
}

停止

/// <summary>
/// This method disconnects, closes and disposes the server
/// </summary>
public void Stop()
{
    _isStopping = true;

    try
    {
        if (_pipeServer.IsConnected)
        {
            _pipeServer.Disconnect();
        }
    }
    catch (Exception ex)
    {
        Logger.Error(ex);
        throw;
    }
    finally
    {
        _pipeServer.Close();
        _pipeServer.Dispose();
    }
}

PipeClient

PipeClient 构造函数

NamedPipeClientStream 在 PipeClient 的构造函数中创建。给定的 serverId 是要连接的管道名称。

public PipeClient(string serverId)
{
    _pipeClient = new NamedPipeClientStream(".", serverId, PipeDirection.InOut, PipeOptions.Asynchronous);
}

Start

在给定的超时时间内发起与服务器的连接。如果在该时间内未连接,将抛出异常。

/// <summary>
/// Starts the client. Connects to the server.
/// </summary>
public void Start()
{
    const int tryConnectTimeout = 5 * 60 * 1000; // 5 minutes
    _pipeClient.Connect(tryConnectTimeout);
}

SendMessage

连接后,客户端就可以开始通过管道发送其消息。
SendMessage 方法返回一个任务,其中包含异步结果或异常。

  • BeginWrite 接收以下参数
    • buffer: 包含要写入当前流的数据的缓冲区。
    • offset: buffer 中开始将字节复制到当前流的零基字节偏移量。
    • count: 要写入的最大字节数。
    • callback: 异步写入操作完成后要调用的方法。
    • state: 用户提供的对象,用于区分此特定的异步写入请求与其他请求。
  • 我们使用的回调是一个匿名函数,它将 EndWriteCallBack 的结果设置为 taskCompletionSource,或者将其设置为异常。
  • taskCompletionSource 的任务是 SendMessage 方法的返回值。
public Task<TaskResult> SendMessage(string message)
{
    var taskCompletionSource = new TaskCompletionSource<TaskResult>();

    if (_pipeClient.IsConnected)
    {
        var buffer = Encoding.UTF8.GetBytes(message);
        _pipeClient.BeginWrite(buffer, 0, buffer.Length, asyncResult =>
        {
            try
            {
                taskCompletionSource.SetResult(EndWriteCallBack(asyncResult));
            }
            catch (Exception ex)
            {
                taskCompletionSource.SetException(ex);
            }

        }, null);
    }
    else
    {
        Logger.Error("Cannot send message, pipe is not connected");
        throw new IOException("pipe is not connected");
    }

    return taskCompletionSource.Task;
}

TaskResult

public class TaskResult
    {
        public bool IsSuccess { get; set; }
        public string ErrorMessage { get; set; }
    }

EndWriteCallBack

EndWriteCallBack 调用 EndWrite 并刷新管道。重要的是要使用对应 BeginMethod 返回的结果来调用 EndMethod

/// <summary>
/// This callback is called when the BeginWrite operation is completed.
/// It can be called whether the connection is valid or not.
/// </summary>
/// <param name="asyncResult"></param>
private TaskResult EndWriteCallBack(IAsyncResult asyncResult)
{
    _pipeClient.EndWrite(asyncResult);
    _pipeClient.Flush();

    return new TaskResult { IsSuccess = true };
}

停止

/// <summary>
/// Stops the client. Waits for pipe drain, closes and disposes it.
/// </summary>
public void Stop()
{
    try
    {
        _pipeClient.WaitForPipeDrain();
    }
    finally
    {
        _pipeClient.Close();
        _pipeClient.Dispose();
    }
}

PipeServer

PipeServer 构造函数

  • 创建一个新的 guid,将用作管道名称。
  • 使用 AsyncOperationManager.SynchronizationContext 定义了一个同步上下文,它返回 .NET 框架支持的所有应用程序模型的正确同步上下文。这意味着无论我们在 WPF、WinForms 等中工作,我们都会收到正确的上下文。有关更多信息,请参阅 Gabriel Schenker 的这篇精彩文章
    同步上下文稍后使用。
public PipeServer()
{
    _pipeName = Guid.NewGuid().ToString();
    _synchronizationContext = AsyncOperationManager.SynchronizationContext;
    _servers = new ConcurrentDictionary<string, ICommunicationServer>();
}

Start

public void Start()
{
    StartNamedPipeServer();
}

StartNamedPipeServer

  • 使用管道名称和 MaxNumberOfServerInstances 字段中定义的允许的服务器实例数量创建一个新的 InternalPipeServer
  • 创建的服务器存储在一个线程安全的字典中。其 ID 作为键。
  • 向服务器事件注册处理程序。
  • 调用 Start 以启动服务器。
/// <summary>
/// Starts a new NamedPipeServerStream that waits for connection
/// </summary>
private void StartNamedPipeServer()
{
    var server = new InternalPipeServer(_pipeName, MaxNumberOfServerInstances);
    _servers[server.Id] = server;

    server.ClientConnectedEvent += ClientConnectedHandler;
    server.ClientDisconnectedEvent += ClientDisconnectedHandler;
    server.MessageReceivedEvent += MessageReceivedHandler;
            
    server.Start();
}

ClientConnectedHandler

当 InternalPipeServer 触发 ClientConnectedEvent 时,调用此处理程序。

  • OnClientConnected 让外界了解收到的消息。
  • StartNamedPipeServer 为新的客户端连接做准备。
/// <summary>
/// Handles a client connection. Fires the relevant event and prepares for new connection.
/// </summary>
private void ClientConnectedHandler(object sender, ClientConnectedEventArgs eventArgs)
{
    OnClientConnected(eventArgs);

    StartNamedPipeServer(); // Create a additional server as a preparation for new connection
}

OnClientConnected

此方法可以从工作线程调用。因此,在触发 ClientConnectedEvent 之前,我们必须将线程同步到 UI 线程。如果我们不这样做,PipeServer 侦听器的处理程序也将会在工作线程中调用,然后它们才需要同步才能访问任何 UI 控件(例如,为了显示消息)。
同步是通过调用 _synchronizationContext.Post 来完成的。

    /// <summary>
    /// Fires ClientConnectedEvent in the current thread
    /// </summary>
    /// <param name="eventArgs"></param>
    private void OnClientConnected(ClientConnectedEventArgs eventArgs)
    {
        _synchronizationContext.Post(e => ClientConnectedEvent.SafeInvoke(this, (ClientConnectedEventArgs)e), eventArgs);
    }

SafeInvoke

我们使用了一个非常好的模式,该模式在这个关于这个伟大问题的伟大回答中进行了说明。

/// <summary>
/// This method is a safe way to fire an event in a multithreaded process. 
/// Since there is a tiny chance that the event becomes null after the null check but before the invocation, 
/// we use this extension where the event is passed as an argument.
/// Why is this helpful? MulticastDelagates are immutable, so if you first assign a variable, null check against the variable and invoke through it, 
/// you are safe
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="sender"></param>
/// <param name="eventArgs"></param>
public static void SafeInvoke<T>(this EventHandler<T> @event, object sender, T eventArgs) where T : EventArgs
{
    if (@event != null)
    {
        @event(sender, eventArgs);
    }
}

MessageReceivedHandler

/// <summary>
/// Handles a message that is received from the client. Fires the relevant event.
/// </summary>
private void MessageReceivedHandler(object sender, MessageReceivedEventArgs eventArgs)
{
    OnMessageReceived(eventArgs);
}

OnMessageReceived

/// <summary>
/// Fires MessageReceivedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnMessageReceived(MessageReceivedEventArgs eventArgs)
{
    _synchronizationContext.Post(e => MessageReceivedEvent.SafeInvoke(this, (MessageReceivedEventArgs)e), eventArgs);
}

ClientDisconnectedHandler

/// <summary>
/// Hanldes a client disconnection. Fires the relevant event ans removes its server from the pool
/// </summary>
private void ClientDisconnectedHandler(object sender, ClientDisconnectedEventArgs eventArgs)
{
    OnClientDisconnected(eventArgs);

    StopNamedPipeServer(eventArgs.ClientId);
}

OnClientDisconnected

/// <summary>
/// Fires ClientDisconnectedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnClientDisconnected(ClientDisconnectedEventArgs eventArgs)
{
    _synchronizationContext.Post(e => ClientDisconnectedEvent.SafeInvoke(this, (ClientDisconnectedEventArgs)e), eventArgs);
}

StopNamedPipeServer

/// <summary>
/// Stops the server that belongs to the given id
/// </summary>
/// <param name="id"></param>
private void StopNamedPipeServer(string id)
{
    UnregisterFromServerEvents(_servers[id]);
    _servers[id].Stop();
    _servers.Remove(id);
}

UnregisterFromServerEvents

/// <summary>
/// Unregisters from the given server's events
/// </summary>
/// <param name="server"></param>
private void UnregisterFromServerEvents(ICommunicationServer server)
{
    server.ClientConnectedEvent -= ClientConnectedHandler;
    server.ClientDisconnectedEvent -= ClientDisconnectedHandler;
    server.MessageReceivedEvent -= MessageReceivedHandler;
}

停止

public void Stop()
{
    foreach (var server in _servers.Values)
    {
        try
        {
            UnregisterFromServerEvents(server);
            server.Stop();
        }
        catch (Exception)
        {
            Logger.Error("Fialed to stop server");
        }
    }

    _servers.Clear();
}

用法

附带的解决方案包括演示库服务器用法的单元测试。

关注点

从事客户端服务器实现的工作非常有启发性。作为我研究的一部分,我有机会阅读了一些非常有趣的在线文章,我在本文中引用了它们。
在多线程环境中进行调试既具挑战性又令人耳目一新。

我想感谢...

我想感谢我的同事们和那些帮助我提供了想法、代码审查和深刻见解的杰出软件工程师Avishay Ben Shabtai、Niv LedererAmit Bezalel
谢谢大家!

© . All rights reserved.