65.9K
CodeProject 正在变化。 阅读更多。
Home

IPC 与命名管道

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (15投票s)

2015年1月7日

CPOL

4分钟阅读

viewsIcon

48066

downloadIcon

2028

本文档介绍了一个使用命名管道实现的简单消息传递层。

引言

本文介绍了一个消息传递库,该库可用于在同一网络上运行的两个.NET应用程序之间发送消息。该库允许在客户端和服务器应用程序之间发送简单的string

该库在其内部实现中使用了命名管道。有关命名管道的更多信息,请访问MSDN页面。

我还使用了以下CodeProject文章作为开发库的起点:

背景

我最近遇到了一个场景,需要通知一个应用程序有一个升级正在等待开始。我需要一个简单的机制来向应用程序发出信号。一旦收到信号,应用程序就会安全关闭,从而允许升级开始。

在开发此库之前,我研究了多种IPC解决方案。我选择了命名管道,因为这是我能找到的最轻量级的IPC方法。

Using the Code

该库易于使用,有两个主要入口点:PipeServerPipeClient。以下代码片段是从源代码中包含的示例应用程序复制的。

var pipeServer = new PipeServer("demo", PipeDirection.InOut);
pipeServer.MessageReceived += (s, o) => pipeServer.Send(o.Message); 
pipeServer.Start();

var pipeClient = new PipeClient("demo", PipeDirection.InOut);
pipeClient.MessageReceived += (s, o) => Console.WriteLine("Client Received: value: {0}", o.Message);
pipeClient.Connect();

示例应用程序演示了如何使用该库构建一个简单的回显服务器。MessageReceived事件处理程序仅回显从客户端接收到的任何消息。

PipeServer

Start

start方法调用BeginWaitingForConnection并传递一个state对象。Start方法被重载,允许客户端提供一个取消令牌。

public void Start(CancellationToken token)
{
    if (this.disposed)
    {
       throw new ObjectDisposedException(typeof(PipeServer).Name);
    }

    var state = new PipeServerState(this.ServerStream, token);
    this.ServerStream.BeginWaitForConnection(this.ConnectionCallback, state);
}

停止

stop方法简单地调用内部CancelllationTokenSourceCancel方法。调用Cancel会设置令牌的IsCancelationRequested属性,从而优雅地终止服务器。

public void Stop()
{
    if (this.disposed)
    {
        throw new ObjectDisposedException(typeof(PipeServer).Name);
    }

    this.cancellationTokenSource.Cancel();
}

发送

send方法首先将提供的string转换为字节数组。然后使用PipeStreamBeginWrite方法将字节写入stream

public void Send(string value)
{
    if (this.disposed)
    {
        throw new ObjectDisposedException(typeof(PipeClient).Name);
    }

    byte[] buffer = Encoding.UTF8.GetBytes(value);
    this.ServerStream.BeginWrite(buffer, 0, buffer.Length, this.SendCallback, this.ServerStream);
}

ReadCallback

当在传入流上接收到数据时,将调用ReadCallback方法。首先从stream读取接收到的字节,将其解码回string并存储在state对象中。

如果设置了IsMessageComplete属性,则表示客户端已完成当前消息的发送。将调用MessageReceived事件,并清空Message缓冲区。

如果服务器尚未停止(由取消令牌指示)且客户端仍处于连接状态,则服务器将继续读取数据,否则服务器将开始等待下一个连接。

private void ReadCallback(IAsyncResult ar)
{
    var pipeState = (PipeServerState)ar.AsyncState;

    int received = pipeState.PipeServer.EndRead(ar);
    string stringData = Encoding.UTF8.GetString(pipeState.Buffer, 0, received);
    pipeState.Message.Append(stringData);
    if (pipeState.PipeServer.IsMessageComplete)
    {
        this.OnMessageReceived(new MessageReceivedEventArgs(stringData));
        pipeState.Message.Clear();
    }

    if (!(this.cancellationToken.IsCancellationRequested || 
             pipeState.ExternalCancellationToken.IsCancellationRequested))
    {
        if (pipeState.PipeServer.IsConnected)
        {
            pipeState.PipeServer.BeginRead(pipeState.Buffer, 0, 255, this.ReadCallback, pipeState);
        }
        else
        {
            pipeState.PipeServer.BeginWaitForConnection(this.ConnectionCallback, pipeState);
        }
    }
}

PipeClient

连接

connect方法简单地建立与PipeServer的连接,并开始读取从服务器接收到的第一条消息。一个有趣的细节是,您只能在建立连接后设置ClientStreamReadMode

public void Connect(int timeout = 1000)
{
    if (this.disposed)
    {
        throw new ObjectDisposedException(typeof(PipeClient).Name);
    }

    this.ClientStream.Connect(timeout);
    this.ClientStream.ReadMode = PipeTransmissionMode.Message;

    var clientState = new PipeClientState(this.ClientStream);
    this.ClientStream.BeginRead(
        clientState.Buffer,
        0,
        clientState.Buffer.Length, 
        this.ReadCallback, 
        clientState);
}

发送

PipeClientsend方法与PipeServer非常相似。提供的string被转换为字节数组,然后写入stream

public void Send(string value)
{
    if (this.disposed)
    {
        throw new ObjectDisposedException(typeof(PipeClient).Name);
    }

    byte[] buffer = Encoding.UTF8.GetBytes(value);
    this.ClientStream.BeginWrite(buffer, 0, buffer.Length, this.SendCallback, this.ClientStream);
}

ReadCallback

ReadCallback方法与PipeServer.ReadCallback方法再次相似,只是没有处理取消的额外复杂性。

private void ReadCallback(IAsyncResult ar)
{
    var pipeState = (PipeClientState)ar.AsyncState;
    int received = pipeState.PipeClient.EndRead(ar);
    string stringData = Encoding.UTF8.GetString(pipeState.Buffer, 0, received);
    pipeState.Message.Append(stringData);
    if (pipeState.PipeClient.IsMessageComplete)
    {
        this.OnMessageReceived(new MessageReceivedEventArgs(pipeState.Message.ToString()));
        pipeState.Message.Clear();
    }

    if (pipeState.PipeClient.IsConnected)
    {
        pipeState.PipeClient.BeginRead(pipeState.Buffer, 0, 255, this.ReadCallback, pipeState);
    }
}

关注点

NamedPipes vs AnonymousPipes

System.IO.Pipes命名空间包含AnonymousPipesNamedPipes的托管API。MSDN页面MSDN页面说明如下:

匿名管道

引用

匿名管道是单向的,不能在网络上使用。它们只支持单个服务器实例。匿名管道可用于线程之间或父子进程之间通信,其中管道句柄可以在创建子进程时轻松传递给子进程。

命名管道

引用

命名管道提供管道服务器和多个管道客户端之间的进程间通信。命名管道可以是单向的或双向的。它们支持基于消息的通信,并允许多个客户端使用相同的管道名称同时连接到服务器进程。

我基于命名管道构建了此库,因为我希望该库支持双向通信。此外,匿名管道的父子模型不适合我的场景。

PipeTansmissionMode

命名管道提供两种传输模式:字节模式和消息模式。

  • 在字节模式下,消息作为连续的字节流在客户端和服务器之间传输。
  • 在消息模式下,客户端和服务器以离散的单元发送和接收数据。通过设置IsMessageComplete属性来指示消息的结束。

在这两种模式下,一侧的写入并不总是会在另一侧产生相同大小的读取。这意味着客户端应用程序和服务器应用程序在任何给定时刻都不知道正在读取或写入管道的字节数。

在字节模式下,可以通过搜索并标记消息结束来识别完整消息的结束。这需要定义一个应用程序级别的协议,这会给库增加不必要的复杂性。

在消息模式下,可以通过读取IsMessageComplete属性来识别消息的结束。通过调用ReadEndRead来设置IsMessageComplete属性为true

源代码

如果您想查看库的源代码和演示应用程序,可以在我的Bit Bucket网站上找到代码。

git 存储库地址如下:

历史

日期 变更
07/01/2015 首次发布
© . All rights reserved.