支持多个客户端的 C# 命名管道库






4.93/5 (10投票s)
一个易于理解且同时支持多个命名管道客户端的 C# 命名管道库
引言
命名管道是 Windows 中进程间通信 (IPC) 的一个很棒的工具。虽然这个话题已经在许多教程和论坛中被讨论过,但我没有找到一个易于理解并且同时支持多个命名管道客户端的例子。在本文中,我将从一个简单的例子开始,解释基本原理 (Demo1
)。然后,它被扩展到类 NamedPipeClient
、NamedPipeServerInstance
和 NamedPipeServer
。最后,这些类的用法在 Demo2
中展示,供您参考。
背景
首先,让我们检查 Demo1.cs
,看看一个简单的命名管道通信是如何工作的。虽然这个演示在技术上是“线程间通信”,但它可以很容易地适应到实际的进程间通信场景。
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeDemo
{
class Demo1
{
private static string pipeName = "Demo1Pipe";
public static void Run()
{
Task.Run(() => Server());
Task.Delay(300).Wait();
Client();
}
static void Server()
{
using (var server = new NamedPipeServerStream(pipeName))
{
server.WaitForConnection();
var reader = new StreamReader(server);
var writer = new StreamWriter(server);
var received = reader.ReadLine();
Console.WriteLine("Received from client: " + received);
var toSend = "Hello, client.";
writer.WriteLine(toSend);
writer.Flush();
}
}
static void Client()
{
using (var client = new NamedPipeClientStream(pipeName))
{
client.Connect(100);
var writer = new StreamWriter(client);
var request = "Hello, server.";
writer.WriteLine(request);
writer.Flush();
var reader = new StreamReader(client);
var response = reader.ReadLine();
Console.WriteLine("Response from server: " + response);
}
}
}
}
像许多其他应用程序一样,命名管道使用客户端-服务器模型。要使其工作,我们需要先启动服务器。在 Demo1
中,服务器通过 Task.Run(() => Server())
启动,并在主线程之外的一个新线程中运行。如果您查看方法 Server
的内部,您会发现启动一个新的命名管道服务器是多么容易。服务器启动后,它将等待新的客户端连接。服务器线程将被阻塞,直到有新的客户端连接到服务器。现在服务器已经启动并主动等待传入的客户端连接,我们可以启动我们的客户端并将其连接到服务器。接下来发生的是一个典型的客户端服务器通信:客户端向服务器发送一个请求 -> 服务器读取请求并将响应发送回客户端 -> 客户端读取响应。
详细说明
在本节中,我们将把上面的例子扩展到一个可重用的库,它支持多个客户端。首先,让我们看一下 NamedPipeClient.cs
。基本上,它包装了前一节中的方法 Client
,并添加了一些基本的错误处理。
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class NamedPipeClient : IDisposable
{
private NamedPipeClientStream client;
private StreamReader reader;
private StreamWriter writer;
public NamedPipeClient(string pipeName) : this(pipeName, 100) { }
public NamedPipeClient(string pipeName, int timeOut)
{
client = new NamedPipeClientStream(pipeName);
client.Connect(timeOut);
reader = new StreamReader(client);
writer = new StreamWriter(client);
}
public void Dispose()
{
writer.Dispose();
reader.Dispose();
client.Dispose();
}
public string SendRequest(string request)
{
if (request != null)
{
try
{
writer.WriteLine(request);
writer.Flush();
return reader.ReadLine();
}
catch (Exception ex)
{
return string.Format("{0}\r\nDetails:\r\n{1}", "Error on server communication.", ex.Message);
}
}
else
{
return "Error. Null request.";
}
}
}
}
接下来,让我们检查 PipeMsgEventArgs.cs
。这里没有什么特别之处。它只是用两个属性 Request
和 Response
扩展了 EventArgs
。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class PipeMsgEventArgs : EventArgs
{
public string Request { get; set; }
public string Response { get; set; }
public PipeMsgEventArgs()
{
}
public PipeMsgEventArgs(string request)
{
this.Request = request;
}
}
}
NamedPipeServerInstance
值得详细解释。与前一节中的示例一样,这里也有一个 NamedPipeServerStream
服务器。此外,我们还有一个 bool disposeFlag
,它记录服务器是否已被释放,一个 Task TaskCommunication
,它处理与客户端的通信,一个 EventHandler newServerInstanceEvent
,当客户端连接到此服务器时将被调用,以及一个 EventHandler<PipeMsgEventArgs> newRequestEvent
,当客户端向服务器发送一个新的请求时将被调用。
在 NamedPipeServerInstance
的构造函数中,我们使用更多参数初始化服务器,使其成为异步的。我们调用 server.BeginWaitForConnection()
,而不是像 Demo1
那样调用 server.WaitForConnection()
。这将不会阻塞线程,并且可以更好地利用我们有限的线程资源。当一个客户端连接到我们的服务器时,方法 OnConnected
将被调用。
方法 OnConnected
可能会在新客户端连接或服务器释放时被调用。因此,有必要检查 disposeFlag
。如果是新的客户端连接,我们将调用 EndWaitForConnection
,调用 newServerInstanceEvent
,并开始我们与客户端的通信。
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
class NamedPipeServerInstance : IDisposable
{
private NamedPipeServerStream server;
private bool disposeFlag = false;
public Task TaskCommunication { get; private set; }
public event EventHandler newServerInstanceEvent = delegate { };
public event EventHandler<PipeMsgEventArgs> newRequestEvent = delegate { };
public NamedPipeServerInstance(string pipeName, int maxNumberOfServerInstances)
{
server = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
var asyncResult = server.BeginWaitForConnection(OnConnected, null);
}
public void Dispose()
{
disposeFlag = true;
server.Dispose();
}
private void OnConnected(IAsyncResult result)
{
/// This method might be invoked either on new client connection
/// or on dispose. Thus, it is necessary to check disposeFlag.
if (!disposeFlag)
{
server.EndWaitForConnection(result);
newServerInstanceEvent.Invoke(this, EventArgs.Empty);
TaskCommunication = Task.Factory.StartNew(Communication);
}
}
private void Communication()
{
using (var reader = new StreamReader(server))
{
while (!reader.EndOfStream)
{
var request = reader.ReadLine();
if (request != null)
{
var msgEventArgs = new PipeMsgEventArgs(request);
newRequestEvent.Invoke(this, msgEventArgs);
var response = msgEventArgs.Response + Environment.NewLine;
var bytes = Encoding.UTF8.GetBytes(response);
server.Write(bytes, 0, bytes.Count());
}
}
}
}
}
}
值得注意的是,单个 NamedPipeServerInstance
(更准确地说,是单个 NamedPipeServerStream
)只能处理一个客户端连接。为了支持多个客户端连接,我们需要多个 NamedPipeServerInstance
,这就是 NamedPipeServer
发挥作用的地方。
NamedPipeServer
通过方法 NewServerInstance
和 CleanServers
创建和释放 NamedPipeServerInstance
。方法 NewServerInstance
订阅 NamedPipeServerInstance
的 newServerInstanceEvent
。当一个新的客户端连接到 NamedPipeServerInstance
时,newServerInstanceEvent
将被调用,并将调用方法 NewServerInstance
,只要现有服务器的数量不超过 maxNumberOfServerInstances
,该方法就会创建一个新的 NamedPipeServerInstance
。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace NamedPipeLib
{
public class NamedPipeServer
{
private readonly string pipeName;
private readonly int maxNumberOfServerInstances;
private List<NamedPipeServerInstance> servers = new List<NamedPipeServerInstance>();
public event EventHandler<PipeMsgEventArgs> newRequestEvent = delegate { };
public NamedPipeServer(string pipeName) : this(pipeName, 20, 4) { }
public NamedPipeServer(string pipeName, int maxNumberOfServerInstances, int initialNumberOfServerInstances)
{
this.pipeName = pipeName;
this.maxNumberOfServerInstances = maxNumberOfServerInstances;
for (int i = 0; i < initialNumberOfServerInstances; i++)
{
NewServerInstance();
}
}
public void Dispose()
{
CleanServers(true);
}
private void NewServerInstance()
{
// Start a new server instance only when the number of server instances
// is smaller than maxNumberOfServerInstances
if (servers.Count < maxNumberOfServerInstances)
{
var server = new NamedPipeServerInstance(pipeName, maxNumberOfServerInstances);
server.newServerInstanceEvent += (s, e) => NewServerInstance();
server.newRequestEvent += (s, e) => newRequestEvent.Invoke(s, e);
servers.Add(server);
}
// Run clean servers anyway
CleanServers(false);
}
/// <summary>
/// A routine to clean NamedPipeServerInstances. When disposeAll is true,
/// it will dispose all server instances. Otherwise, it will only dispose
/// the instances that are completed, canceled, or faulted.
/// PS: disposeAll is true only for this.Dispose()
/// </summary>
/// <param name="disposeAll"></param>
private void CleanServers(bool disposeAll)
{
if (disposeAll)
{
foreach (var server in servers)
{
server.Dispose();
}
}
else
{
for (int i = servers.Count - 1; i >= 0; i--)
{
if (servers[i] == null)
{
servers.RemoveAt(i);
}
else if (servers[i].TaskCommunication != null &&
(servers[i].TaskCommunication.Status == TaskStatus.RanToCompletion ||
servers[i].TaskCommunication.Status == TaskStatus.Canceled ||
servers[i].TaskCommunication.Status == TaskStatus.Faulted))
{
servers[i].Dispose();
servers.RemoveAt(i);
}
}
}
}
}
}
使用代码
与 Demo1
类似,Demo2
也从一个运行服务器的 Task
开始。一个订阅服务器的 newRequestEvent
的内联函数负责处理来自客户端的传入请求。该内联函数会将 "Echo. " 和请求连接起来作为响应。服务器创建后,我们并行启动 8 个客户端。每个客户端将向服务器发送三个不同的请求,并且响应将被记录到控制台。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NamedPipeLib;
namespace NamedPipeDemo
{
class Demo2
{
private static string pipeName = "Demo2Pipe";
public static void Run()
{
Task.Run(() => Server());
Task.Delay(300).Wait();
var clients = new List<string>()
{
"Client 1",
"Client 2",
"Client 3",
"Client 4",
"Client 5",
"Client 6",
"Client 7",
"Client 8"
};
Parallel.ForEach(clients, (c) => Client(c));
}
static void Server()
{
var server = new NamedPipeServer(pipeName);
server.newRequestEvent += (s, e) => e.Response = "Echo. " + e.Request;
Task.Delay(10000).Wait();
server.Dispose();
}
static void Client(string clientName)
{
using (var client = new NamedPipeClient(pipeName))
{
var request = clientName + " Request a";
var response = client.SendRequest(request);
Console.WriteLine(response);
Task.Delay(100).Wait();
var request1 = clientName + " Request b";
var response1 = client.SendRequest(request1);
Console.WriteLine(response1);
Task.Delay(100).Wait();
var request2 = clientName + " Request c";
var response2 = client.SendRequest(request2);
Console.WriteLine(response2);
}
}
}
}
关注点
我发现编写这样一个库很有趣,特别是管理命名管道服务器实例的方式,这样服务器就可以支持多个客户端。希望您也喜欢它。
历史
2017 年 7 月 30 日。版本 1.0。