创建使用命名管道的服务器






4.80/5 (31投票s)
在本文中,我将介绍如何使用命名管道创建进程间通信库。
引言
在本文中,我将介绍如何使用 C# .Net 4 中的命名管道实现服务器和客户端。
我使用了 NamedPipeServerStream 和 NamedPipeClientStream,但很快就意识到“服务器”这个名字有点令人困惑。NamedPipeServerStream 一次只能处理一个客户端(请参阅 MSDN 中的 命名管道实例主题),但我需要一个能够处理多个客户端请求的服务器。
我在网上找不到任何适合我需求的示例,因此,我使用 NamedPipeServerStream 和 NamedPipeClientStream 创建了我自己的服务器实现。
背景
正如 Microsoft 定义的那样,命名管道是一种命名的一路或双路管道,用于在管道服务器和一个或多个管道客户端之间进行通信。命名管道的所有实例共享相同的管道名称,但每个实例都有自己的缓冲区和句柄,并为客户端/服务器通信提供单独的通道。实例的使用使得多个管道客户端可以同时使用同一个命名管道。任何进程都可以访问命名管道,但需经过安全检查,这使得命名管道成为相关或不相关进程之间通信的便捷方式。 更多...
下载代码
https://github.com/IfatChitin/Named-Pipes
代码简介
PipeServer 负责创建和维护命名管道流,这些管道是为每个客户端打开的。
InternalPipeServer 是 NamedPipeServerStream 的包装器。
PipeClient 是 NamedPipeClientStream 的包装器。
主要流程
- 创建并启动 PipeServer
- 生成新的管道名称。
- 创建 InternalPipeServer 的新实例并开始等待客户端连接。
- 创建并启动 PipeClient
- 与 InternalPipeServer 建立连接。
- InternalPipeServer 触发一个事件,让 PipeServer 知道已建立连接。
- PipeServer 触发自己的事件,告知外界已连接客户端。然后,它会创建一个新的 InternalPipeServer 实例并启动它,以便它开始等待新的连接,同时第一个实例与第一个客户端进行通信。
- InternalPipeServer 开始异步读取操作,该操作在客户端发送消息、断开连接或管道关闭时完成。
- PipeClient 发送消息
- InternalPipeServer 接收到消息的一部分,因为消息比其缓冲区大小长,并启动新的异步读取操作。
- InternalPipeServer 接收消息的其余部分,将其附加到第一部分,触发一个事件告知 PipeServer 已收到新消息,并启动新的异步读取操作以等待新消息。
- PipeServer 触发自己的事件,告知外界已收到来自某个客户端的新消息。
- PipeClient 断开连接
- InternalPipeServer 的读取操作在未读取到字节的情况下结束,因此 InternalPipeServer 假定客户端已断开连接。它会触发一个事件告知 PipeServer。
- PipeServer 触发自己的事件,告知外界某个客户端已断开连接。
- PipeServer 停止
- PipeServer 停止其所有 InternalPipeServer 实例。
使用代码
如果您需要与其他进程通信,请使用附带的代码。
在一个进程中创建一个 PipeServer,在另一个进程中创建一个 PipeClient。然后使用 PipeClient 向服务器发送消息。
InternalPipeServer
InternalPipeServer 构造函数
NamedPipeServerStream 在 InternalPipeServer 的构造函数中创建。
传递给 NamedPipeServerStream 构造函数的参数是
pipeName
: 要创建的管道的名称。客户端必须熟悉此名称才能连接到管道服务器。PipeDirection.InOut
: 管道方向。maxNumberOfServerInstances
: 共享同一名称的最大服务器实例数。创建 NamedPipeServerStream 时,如果其创建达到最大数量,将抛出 I/O 异常。PipeTransmissionMode.Message
: 管道传输模式。我选择 Message,因为它暴露了一个有用的标志“IsMessageCompleted”,有助于流上的通信实现。为了使用 Message 传输模式,管道方向必须是 InOut。PipeOptions.Asynchronous
: 这启用了异步读写操作。
/// <summary>
/// Creates a new NamedPipeServerStream
/// </summary>
public InternalPipeServer(string pipeName, int maxNumberOfServerInstances)
{
_pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
Id = Guid.NewGuid().ToString();
}
Start
BeginWaitForConnection 接收一个回调,在操作完成后调用,以及一个将传递给回调的用户定义对象。在这种情况下,我们发送 null。
/// <summary>
/// This method begins an asynchronous operation to wait for a client to connect.
/// </summary>
public void Start()
{
try
{
_pipeServer.BeginWaitForConnection(WaitForConnectionCallBack, null);
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
}
WaitForConnectionCallBack
WaitForConnectionCallBack
在客户端连接后调用,但不仅限于此。服务器关闭时也会调用它。
- 对于每个
BeginWaitForConnection
,必须使用返回的 asyncResult 调用EndWaitForConnection
。 - 为了避免在关闭服务器时调用
EndWaitForConnection
时抛出异常,我们检查 _isStopping 标志,在关闭服务器时将其设置为 True。 - 此类中的所有操作都是异步的,因此在读取标志之前使用锁。
- 然而,由于锁定会降低性能,我们在锁定之前检查标志。然后我们会在锁定之后再次检查它,因为它可能在上次检查和锁定之间发生了变化。
/// <summary>
/// This callback is called when the async WaitForConnection operation is completed,
/// whether a connection was made or not. WaitForConnection can be completed when the server disconnects.
/// </summary>
private void WaitForConnectionCallBack(IAsyncResult result)
{
if (!_isStopping)
{
lock (_lockingObject)
{
if (!_isStopping)
{
// Call EndWaitForConnection to complete the connection operation
_pipeServer.EndWaitForConnection(result);
OnConnected();
BeginRead(new Info());
}
}
}
}
OnConnected
向订阅它的任何侦听器触发 ClientConnectedEvent 事件。
/// <summary>
/// This method fires ConnectedEvent
/// </summary>
private void OnConnected()
{
if (ClientConnectedEvent != null)
{
ClientConnectedEvent(this, new ClientConnectedEventArgs { ClientId = Id });
}
}
Info(信息)
Info
类保存消息信息。在调用 BeginRead
之前,在 WaitForConnectionCallBack
中创建了一个新的 Info 实例。
private class Info
{
public readonly byte[] Buffer;
public readonly StringBuilder StringBuilder;
public Info()
{
Buffer = new byte[BufferSize];
StringBuilder = new StringBuilder();
}
}
BeginRead
BeginRead
接收以下参数
buffer:
要将数据读取到的缓冲区。offset:
在 buffer 中开始读取的字节偏移量。count:
要读取的最大字节数。callback:
异步读取操作完成后要调用的方法。state:
用户提供的对象,用于区分此特定的异步读取请求与其他请求。
给定的 Info
作为 state 传递给 EndReadCallBack
。
/// <summary>
/// This method begins an asynchronous read operation.
/// </summary>
private void BeginRead(Info info)
{
try
{
_pipeServer.BeginRead(info.Buffer, 0, BufferSize, EndReadCallBack, info);
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
}
EndReadCallBack
EndReadCallBack
是一个非常有趣的方法。
它在 BeginRead
操作完成时调用,而这不一定仅仅是客户端向管道写入消息时,也包括客户端断开连接或服务器关闭时。
- 调用
EndRead
返回读取的字节数。 - 如果读取了字节(非零),则将从给定的 info 中提取它们,并且 info 的字符串构建器将附加消息。
- 如果消息不完整,则通过调用
BeginRead
并使用相同的 info 来启动新的读取操作。 - 如果消息完整,则触发
MessageReceivedEvent
并使用新的 info 启动新的读取操作。
- 如果消息不完整,则通过调用
- 如果没有读取到字节,则可能意味着客户端已断开连接或服务器已关闭。将触发
ClienticonnectedEvent
并停止 InternalPipeServer。
/// <summary>
/// This callback is called when the BeginRead operation is completed.
/// We can arrive here whether the connection is valid or not
/// </summary>
private void EndReadCallBack(IAsyncResult result)
{
var readBytes = _pipeServer.EndRead(result);
if (readBytes > 0)
{
var info = (Info)result.AsyncState;
// Get the read bytes and append them
info.StringBuilder.Append(Encoding.UTF8.GetString(info.Buffer, 0, readBytes));
if (!_pipeServer.IsMessageComplete) // Message is not complete, continue reading
{
BeginRead(info);
}
else // Message is completed
{
// Finalize the received string and fire MessageReceivedEvent
var message = info.StringBuilder.ToString().TrimEnd('\0');
OnMessageReceived(message);
// Begin a new reading operation
BeginRead(new Info());
}
}
else // When no bytes were read, it can mean that the client have been disconnected
{
if (!_isStopping)
{
lock (_lockingObject)
{
if (!_isStopping)
{
OnDisconnected();
Stop();
}
}
}
}
}
OnMessageReceived
/// <summary>
/// This method fires MessageReceivedEvent with the given message
/// </summary>
private void OnMessageReceived(string message)
{
if (MessageReceivedEvent != null)
{
MessageReceivedEvent(this,
new MessageReceivedEventArgs
{
Message = message
});
}
}
OnDisconnected
/// <summary>
/// This method fires DisconnectedEvent
/// </summary>
private void OnDisconnected()
{
if (ClientDisconnectedEvent != null)
{
ClientDisconnectedEvent(this, new ClientDisconnectedEventArgs { ClientId = Id });
}
}
停止
/// <summary>
/// This method disconnects, closes and disposes the server
/// </summary>
public void Stop()
{
_isStopping = true;
try
{
if (_pipeServer.IsConnected)
{
_pipeServer.Disconnect();
}
}
catch (Exception ex)
{
Logger.Error(ex);
throw;
}
finally
{
_pipeServer.Close();
_pipeServer.Dispose();
}
}
PipeClient
PipeClient 构造函数
NamedPipeClientStream 在 PipeClient 的构造函数中创建。给定的 serverId 是要连接的管道名称。
public PipeClient(string serverId)
{
_pipeClient = new NamedPipeClientStream(".", serverId, PipeDirection.InOut, PipeOptions.Asynchronous);
}
Start
在给定的超时时间内发起与服务器的连接。如果在该时间内未连接,将抛出异常。
/// <summary>
/// Starts the client. Connects to the server.
/// </summary>
public void Start()
{
const int tryConnectTimeout = 5 * 60 * 1000; // 5 minutes
_pipeClient.Connect(tryConnectTimeout);
}
SendMessage
连接后,客户端就可以开始通过管道发送其消息。SendMessage
方法返回一个任务,其中包含异步结果或异常。
BeginWrite
接收以下参数buffer:
包含要写入当前流的数据的缓冲区。offset:
buffer 中开始将字节复制到当前流的零基字节偏移量。count:
要写入的最大字节数。callback:
异步写入操作完成后要调用的方法。state:
用户提供的对象,用于区分此特定的异步写入请求与其他请求。
- 我们使用的回调是一个匿名函数,它将
EndWriteCallBack
的结果设置为 taskCompletionSource,或者将其设置为异常。 - taskCompletionSource 的任务是 SendMessage 方法的返回值。
public Task<TaskResult> SendMessage(string message)
{
var taskCompletionSource = new TaskCompletionSource<TaskResult>();
if (_pipeClient.IsConnected)
{
var buffer = Encoding.UTF8.GetBytes(message);
_pipeClient.BeginWrite(buffer, 0, buffer.Length, asyncResult =>
{
try
{
taskCompletionSource.SetResult(EndWriteCallBack(asyncResult));
}
catch (Exception ex)
{
taskCompletionSource.SetException(ex);
}
}, null);
}
else
{
Logger.Error("Cannot send message, pipe is not connected");
throw new IOException("pipe is not connected");
}
return taskCompletionSource.Task;
}
TaskResult
public class TaskResult
{
public bool IsSuccess { get; set; }
public string ErrorMessage { get; set; }
}
EndWriteCallBack
EndWriteCallBack
调用 EndWrite
并刷新管道。重要的是要使用对应 BeginMethod
返回的结果来调用 EndMethod
。
/// <summary>
/// This callback is called when the BeginWrite operation is completed.
/// It can be called whether the connection is valid or not.
/// </summary>
/// <param name="asyncResult"></param>
private TaskResult EndWriteCallBack(IAsyncResult asyncResult)
{
_pipeClient.EndWrite(asyncResult);
_pipeClient.Flush();
return new TaskResult { IsSuccess = true };
}
停止
/// <summary>
/// Stops the client. Waits for pipe drain, closes and disposes it.
/// </summary>
public void Stop()
{
try
{
_pipeClient.WaitForPipeDrain();
}
finally
{
_pipeClient.Close();
_pipeClient.Dispose();
}
}
PipeServer
PipeServer 构造函数
- 创建一个新的 guid,将用作管道名称。
- 使用
AsyncOperationManager.SynchronizationContext
定义了一个同步上下文,它返回 .NET 框架支持的所有应用程序模型的正确同步上下文。这意味着无论我们在 WPF、WinForms 等中工作,我们都会收到正确的上下文。有关更多信息,请参阅 Gabriel Schenker 的这篇精彩文章。
同步上下文稍后使用。
public PipeServer()
{
_pipeName = Guid.NewGuid().ToString();
_synchronizationContext = AsyncOperationManager.SynchronizationContext;
_servers = new ConcurrentDictionary<string, ICommunicationServer>();
}
Start
public void Start()
{
StartNamedPipeServer();
}
StartNamedPipeServer
- 使用管道名称和
MaxNumberOfServerInstances
字段中定义的允许的服务器实例数量创建一个新的InternalPipeServer
。 - 创建的服务器存储在一个线程安全的字典中。其 ID 作为键。
- 向服务器事件注册处理程序。
- 调用
Start
以启动服务器。
/// <summary>
/// Starts a new NamedPipeServerStream that waits for connection
/// </summary>
private void StartNamedPipeServer()
{
var server = new InternalPipeServer(_pipeName, MaxNumberOfServerInstances);
_servers[server.Id] = server;
server.ClientConnectedEvent += ClientConnectedHandler;
server.ClientDisconnectedEvent += ClientDisconnectedHandler;
server.MessageReceivedEvent += MessageReceivedHandler;
server.Start();
}
ClientConnectedHandler
当 InternalPipeServer 触发 ClientConnectedEvent 时,调用此处理程序。
- OnClientConnected 让外界了解收到的消息。
- StartNamedPipeServer 为新的客户端连接做准备。
/// <summary>
/// Handles a client connection. Fires the relevant event and prepares for new connection.
/// </summary>
private void ClientConnectedHandler(object sender, ClientConnectedEventArgs eventArgs)
{
OnClientConnected(eventArgs);
StartNamedPipeServer(); // Create a additional server as a preparation for new connection
}
OnClientConnected
此方法可以从工作线程调用。因此,在触发 ClientConnectedEvent 之前,我们必须将线程同步到 UI 线程。如果我们不这样做,PipeServer 侦听器的处理程序也将会在工作线程中调用,然后它们才需要同步才能访问任何 UI 控件(例如,为了显示消息)。
同步是通过调用 _synchronizationContext.Post
来完成的。
/// <summary>
/// Fires ClientConnectedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnClientConnected(ClientConnectedEventArgs eventArgs)
{
_synchronizationContext.Post(e => ClientConnectedEvent.SafeInvoke(this, (ClientConnectedEventArgs)e), eventArgs);
}
SafeInvoke
我们使用了一个非常好的模式,该模式在这个关于这个伟大问题的伟大回答中进行了说明。
/// <summary>
/// This method is a safe way to fire an event in a multithreaded process.
/// Since there is a tiny chance that the event becomes null after the null check but before the invocation,
/// we use this extension where the event is passed as an argument.
/// Why is this helpful? MulticastDelagates are immutable, so if you first assign a variable, null check against the variable and invoke through it,
/// you are safe
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event"></param>
/// <param name="sender"></param>
/// <param name="eventArgs"></param>
public static void SafeInvoke<T>(this EventHandler<T> @event, object sender, T eventArgs) where T : EventArgs
{
if (@event != null)
{
@event(sender, eventArgs);
}
}
MessageReceivedHandler
/// <summary>
/// Handles a message that is received from the client. Fires the relevant event.
/// </summary>
private void MessageReceivedHandler(object sender, MessageReceivedEventArgs eventArgs)
{
OnMessageReceived(eventArgs);
}
OnMessageReceived
/// <summary>
/// Fires MessageReceivedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnMessageReceived(MessageReceivedEventArgs eventArgs)
{
_synchronizationContext.Post(e => MessageReceivedEvent.SafeInvoke(this, (MessageReceivedEventArgs)e), eventArgs);
}
ClientDisconnectedHandler
/// <summary>
/// Hanldes a client disconnection. Fires the relevant event ans removes its server from the pool
/// </summary>
private void ClientDisconnectedHandler(object sender, ClientDisconnectedEventArgs eventArgs)
{
OnClientDisconnected(eventArgs);
StopNamedPipeServer(eventArgs.ClientId);
}
OnClientDisconnected
/// <summary>
/// Fires ClientDisconnectedEvent in the current thread
/// </summary>
/// <param name="eventArgs"></param>
private void OnClientDisconnected(ClientDisconnectedEventArgs eventArgs)
{
_synchronizationContext.Post(e => ClientDisconnectedEvent.SafeInvoke(this, (ClientDisconnectedEventArgs)e), eventArgs);
}
StopNamedPipeServer
/// <summary>
/// Stops the server that belongs to the given id
/// </summary>
/// <param name="id"></param>
private void StopNamedPipeServer(string id)
{
UnregisterFromServerEvents(_servers[id]);
_servers[id].Stop();
_servers.Remove(id);
}
UnregisterFromServerEvents
/// <summary>
/// Unregisters from the given server's events
/// </summary>
/// <param name="server"></param>
private void UnregisterFromServerEvents(ICommunicationServer server)
{
server.ClientConnectedEvent -= ClientConnectedHandler;
server.ClientDisconnectedEvent -= ClientDisconnectedHandler;
server.MessageReceivedEvent -= MessageReceivedHandler;
}
停止
public void Stop()
{
foreach (var server in _servers.Values)
{
try
{
UnregisterFromServerEvents(server);
server.Stop();
}
catch (Exception)
{
Logger.Error("Fialed to stop server");
}
}
_servers.Clear();
}
用法
附带的解决方案包括演示库服务器用法的单元测试。
关注点
从事客户端服务器实现的工作非常有启发性。作为我研究的一部分,我有机会阅读了一些非常有趣的在线文章,我在本文中引用了它们。
在多线程环境中进行调试既具挑战性又令人耳目一新。
我想感谢...
我想感谢我的同事们和那些帮助我提供了想法、代码审查和深刻见解的杰出软件工程师:Avishay Ben Shabtai、Niv Lederer 和 Amit Bezalel。
谢谢大家!