65.9K
CodeProject 正在变化。 阅读更多。
Home

.NET 和 Delphi 通过命名管道实现进程间通信

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2014年12月12日

CPOL

7分钟阅读

viewsIcon

31347

downloadIcon

1495

.NET 应用程序通过标准 Windows 命名管道调用原生方法

引言

假设我们有使用 Delphi 编写的旧代码,它执行某种复杂的数据处理。该代码是 32 位且非线程安全的,这意味着它一次只能处理一个请求。我们的任务是通过在其周围引入一个 .NET 包装器来扩展现有代码的功能,可能通过 WCF 或 .NET 世界中可用的其他方式公开功能,并在两者之间实现高效的通信机制。我们还需要使 .NET 包装器为 64 位并可能支持并行处理。

后两个要求决定了包装器和 Delphi 部分必须编译成单独的可执行文件,因为共享单个进程边界(例如通过 P/Invoke 进行通信)在这种情况下不是一个选项:32 位 DLL 无法在 64 位进程中运行,并且由于我们的 Delphi 代码不是线程安全的,因此无法在单个进程中实现并行性。

组织进程间通信 (IPC) 的方法有很多种。但是,在本文中,我们将重点介绍标准 Windows 命名管道。附加的解决方案由两个项目组成:.NET 和 Delphi 控制台应用程序。为了模拟数据处理,我们将在 Delphi 项目中实现 4 种算术运算:加法、减法、乘法和除法。.NET 应用程序将用于调用这 4 种运算之一。由于我们专注于命名管道进程间通信方面,因此我们将不涉及安全、自动消息序列化、自动调用分派或支持并行性等其他问题。

消息格式

在当前解决方案中,两个进程通过发送 XML 格式的字符串消息进行通信。这是一个描述乘法 XY 变量请求的消息 XML 示例

<Message Name="Mult">
  <Data Key="X">3</Data>
  <Data Key="Y">5</Data>
</Message>

生成的消息将是

<Message Name="Result">
  <Data Key="Result">15</Data>
</Message>

如果计算由于某种原因产生异常,消息将如下所示

<Message IsError="1" ErrorCode="EZeroDivide" ErrorText="Floating point division by zero"/>

调用方和被调用方都必须了解消息的格式并同意遵循该协议。也就是说,我们可以描述任何方法,无论有多少个参数,通过管道发送,并期望从通道返回结果,包括在过程中可能发生的任何异常。尽管消息格式比实际需要更冗长,但对于我们的演示目的来说是可以接受的。

消息结构分别封装在 .NET 和 Delphi 项目中的 PipeMessageTPipeMessage 类中。

高层协议

如上所述,解决方案中有两个控制台应用程序。如果我们执行 Delphi 应用程序,它将不会做任何事情,除非我们提供一个带有管道通道名称的命令行参数,如下所示:pipe=<管道通道名称>。此名称是一个任意字符串,表示我们期望通过其进行通信的通道名称。如果指定了通道名称,Delphi 应用程序将实例化一个管道服务器并开始侦听命令。

另一方面,调用 .NET 应用程序定义了一个名为“interop.demo”的通道,并尝试通过该通道发送所有请求。发送请求后,如果 Delphi 应用程序已经实例化并正在侦听该通道,它可能会获得结果,否则会收到一个名为 PipeServerNotFoundException 的异常,指示尚不存在此类通道。在处理此异常的后一种情况下,它将尝试实例化 Delphi 可执行文件,将通道名称作为命令行参数传递,并再次重新发出相同的请求。

public sealed class PipeInteropDispatcher
{
  private const string c_pipeName = @"interop.demo";
  private const string c_pipeServerName = "NamedPipesInteropDelphi.exe";

  private static async Task CreatePipeServer(string pipeName)
  {
    try
    {
      Process.Start(c_pipeServerName, "pipe=" + pipeName);
      await new PipeClient(pipeName).WaitForPipe(10000);
    }
    catch (Exception ex)
    {
      throw new InvalidOperationException(string.Format(
        "Unable to instantiate Server: {0}", ex.Message), ex);
    }
  }

  public static async Task ProcessRequestAsync(PipeMessage request)
  {
    var needInstance = false;
    var result = default (PipeMessage);
    var pipe = new PipeClient(c_pipeName);

    try
    {
      result = await pipe.ProcessRequest(request);
    }
    catch (PipeServerBrokenPipeException)
    {
      needInstance = true;
    }
    catch (PipeServerNotFoundException)
    {
      needInstance = true;
    }

    if (needInstance)
    {
      await CreatePipeServer(c_pipeName);
      result = await pipe.ProcessRequest(request);
    }

    return result;
  }
}

有了这些规则,我们获得了以下好处

  1. 易于调试 - 我们可以从 Delphi 调试器运行 Delphi 应用程序,将正确的管道通道名称指定为命令行参数。
  2. Delphi 进程可以随时自行终止(例如,在空闲超时时),而不会破坏系统,因为通过管道发出的任何后续请求都会因适当的异常而失败,并且调用进程将重新实例化该应用程序。事实上,我们可以随时从任务管理器强制终止 Delphi 进程,效果相同。
  3. 我们可以在现有架构之上实现并行功能(未在当前解决方案中呈现),方法是维护一个管道通道名称池,我们可以并行地通过它发送消息。

客户端(.NET 应用)

根据用户的输入,我们将适当的方法名称以及 XY 变量发送到以下方法

private static async Task<decimal> CalcAsync(string method, decimal x, decimal y)
{
  var args = new Dictionary<string, string>
  {
    {"X", x.ToString(CultureInfo.InvariantCulture)},
    {"Y", y.ToString(CultureInfo.InvariantCulture)}
  };
  var result = await PipeInteropDispatcher.ProcessRequestAsync(new PipeMessage(method, args));
  return decimal.Parse(result["Result"]);
}

我们只需创建 Dictionary<string, string> 对象,用输入参数填充它,然后将其交给 PipeInteropDispatcher 进行进一步处理。

PipeClient 类在通过管道通道发送消息方面发挥着核心作用。在内部,它使用标准的 .NET NamedPipeClientStream 类与原生 Windows 命名管道 API 进行通信。为了高效地在通道上传输大型消息,我们以最大 4KB 的块发送它们。因此,任何大于 4KB 的请求都会被分解成一系列较小的消息,这些消息通过通道顺序地流出。

为了协调原始消息大小与对应进程,流数据的第一个 4 字节将包含一个表示该原始消息大小的 Int32 数字。这样,接收消息的进程将从流中读取前 4 字节,并使用该值继续读取数据,直到读取完整个 X 字节的消息。

public sealed class PipeClient
{
  private readonly string m_serverName;

  private readonly string m_pipeName;

  public PipeClient(string pipeName)
    : this(".", pipeName) { }

  public PipeClient(string serverName, string pipeName)
  {
    m_serverName = serverName;
    m_pipeName = pipeName;
  }

  private static async Task<bool> CopyStream(Stream src, Stream dst, int bytes, int bufferSize)
  {
    var buffer = new byte[bufferSize];
    int read;
    while (bytes > 0 && 
    (read = await src.ReadAsync(buffer, 0, Math.Min(buffer.Length, bytes))) > 0)
    {
      await dst.WriteAsync(buffer, 0, read);
      bytes -= read;
    }
    return bytes == 0;
  }

  private async Task<string> ProcessRequest(string message)
  {
    var dataBuffer = Encoding.ASCII.GetBytes(message);
    var dataSize = dataBuffer.Length;

    if (dataSize == 0)
      return null;

    using (var pipe = new NamedPipeClientStream(m_serverName, m_pipeName, 
      PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))
    {
      try
      {
        await Task.Run(() => pipe.Connect(500));
      }
      catch (TimeoutException e)
      {
        throw new PipeServerNotFoundException(e.Message, e);
      }

      pipe.ReadMode = PipeTransmissionMode.Message;

      const int cBufferSize = 4096;

      try
      {
        var dataSizeBuffer = BitConverter.GetBytes(dataSize);
        await pipe.WriteAsync(dataSizeBuffer, 0, dataSizeBuffer.Length);
        using (var stream = new MemoryStream(dataBuffer))
          await CopyStream(stream, pipe, dataSize, cBufferSize);
        await pipe.FlushAsync();

        dataSizeBuffer = new byte[sizeof(Int32)];
        var bytesRead = await pipe.ReadAsync(dataSizeBuffer, 0, sizeof(Int32));
        if (bytesRead <= 0)
          throw new PipeServerBrokenPipeException();

        dataSize = BitConverter.ToInt32(dataSizeBuffer, 0);
        if (dataSize <= 0)
          return null;

        using (var stream = new MemoryStream(dataSize))
        {
          if (!await CopyStream(pipe, stream, dataSize, cBufferSize))
            throw new PipeServerBrokenPipeException();
          var resultBuffer = stream.GetBuffer();
          var decoder = Encoding.ASCII.GetDecoder();
          var charCount = decoder.GetCharCount(resultBuffer, 0, dataSize, false);
          var charResultBuffer = new char[charCount];
          decoder.GetChars(resultBuffer, 0, dataSize, charResultBuffer, 0, false);
          decoder.Reset();
          return new string(charResultBuffer);
        }
      }
      catch (IOException ex)
      {
        // Console.WriteLine(ex.Message);
        // NOTE: This is not reliable, but will do for now
        if (ex.Message.Contains("Pipe is broken"))
          throw new PipeServerBrokenPipeException();
        throw;
      }
    }
  }

  public async Task<PipeMessage> ProcessRequest(PipeMessage message)
  {
    var resultMessage = await ProcessRequest(message.ToString());
    var result = new PipeMessage(resultMessage);
    if (result.IsError)
      throw new PipeServerException(result.ErrorCode, result.ErrorText);
    return result;
  }

  public async Task WaitForPipe(int timeout)
  {
    await Task.Run(() =>
    {
      using (var pipe = new NamedPipeClientStream(m_serverName, m_pipeName, 
        PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))
        pipe.Connect(timeout);
    });
  }
}

服务器端 (Delphi 应用)

以下是服务器端用于支持命名管道通信的类列表

  1. TNamedPipeServer。此类封装了与原生 Windows 命名管道 API 的底层交互。
  2. TPipeMessage。表示 XML 消息的对象包装器。
  3. TCalcPipeServer。继承自 TNamedPipeServer 并重写其 DispatchMessage 方法,将请求传递给调度程序。
  4. TCalcRequestDispatcher。封装方法调度功能,将传入的 XML 消息转换为实际的方法调用,并将结果打包回生成的 XML 消息。
  5. TCalcProcessor。一个帮助类,包含业务逻辑的实际实现(在本例中为 4 种算术运算)。

当 Delphi 应用程序启动时,它将从命令行读取管道通道名称,并实例化一个管道服务器,将该通道名称传递给它。管道服务器实例将创建通道并开始侦听传入的请求,直到用户按下 ENTER 或应用程序以其他方式关闭。

收到来自通道的消息后,管道服务器会将其交给一个 DispatchMessage 虚拟方法,该方法被 TCalcPipeServer 重写

procedure TCalcPipeServer.DispatchMessage(ARequestStream, AResponseStream: TStream);
var
  lData: string;
begin
  SetLength(lData, ARequestStream.Size);
  ARequestStream.Read(lData[1], Length(lData));
  lData := RequestDispatcher.ProcessRequest(lData);
  AResponseStream.Write(lData[1], Length(lData));
end;

在这种情况下,RequestDispatcherTCalcRequestDispatcher 类表示,它将 XML 请求转换为实际的方法调用,并将结果作为 XML 字符串返回

function TCalcRequestDispatcher.InternalProcessRequest(const ARequest: string): string;
var
  lRequest: TPipeMessage;
  X, Y, lResult: Double;
begin
  Result := '';
  lRequest := TPipeMessage.Create(ARequest);
  try
    if lRequest.Name = 'Add' then
    begin
      X := StrToFloat(lRequest.Data['X']);
      Y := StrToFloat(lRequest.Data['Y']);
      lResult := TCalcProcessor.Add(X, Y);
      Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
    end
    else if lRequest.Name = 'Subtract' then
    begin
      X := StrToFloat(lRequest.Data['X']);
      Y := StrToFloat(lRequest.Data['Y']);
      lResult := TCalcProcessor.Subtract(X, Y);
      Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
    end
    else if lRequest.Name = 'Mult' then
    begin
      X := StrToFloat(lRequest.Data['X']);
      Y := StrToFloat(lRequest.Data['Y']);
      lResult := TCalcProcessor.Mult(X, Y);
      Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
    end
    else if lRequest.Name = 'Div' then
    begin
      X := StrToFloat(lRequest.Data['X']);
      Y := StrToFloat(lRequest.Data['Y']);
      lResult := TCalcProcessor.Divide(X, Y);
      Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
    end
    else if lRequest.Name = 'ExitProcess' then
    begin
      ExitProcess(0);
    end
    else if lRequest.Name = 'Ping' then
    begin
      // do nothing;
    end
    else
      raise Exception.CreateFmt('Unknown request: %s', [lRequest.Name]);
  finally
    lRequest.Free;
  end;
end;

管道服务器实现

TNamedPipeServer 类在其构造函数中接受一个表示管道通道名称参数的字符串参数,并在内部创建一个 Listener Thread、一个 Message Queue 和一个 Worker Thread

监听器由 TPipeListenerThread 类表示。它的主要工作是不断监视管道通道的传入请求,并将它们发送到消息队列。后者由 TThreadQueue 类表示,这是一个具有阻塞行为的线程安全队列实现,这意味着如果队列大小达到最大消息数,推送线程将被阻塞,如果队列中没有元素,弹出线程将被阻塞。最后,工作线程由 TPipeWorkerThread 类表示,它不断从队列中提取消息并将其交给消息处理程序。

消息处理程序由 TPipeMessageHandler 类表示,并实现流读/写逻辑。消息处理程序使用给定管道通道的句柄并开始从中读取。一旦读取消息,它就会按照协议规则将其传递给调度程序(我们已经描述过),并将最终结果写回到流中。消息流的源代码如下所示

procedure TPipeMessageHandler.Execute;
var
  lReadStream, lWriteStream: TMemoryStream;
  lBytesRead, lBytesWritten: DWORD;
  lSuccess: BOOL;
  lDataSize, lBufferSize: Integer;
  lBuffer: PChar;
begin
  lBufferSize := FServer.BufferSize;
  GetMem(lBuffer, lBufferSize);
  lReadStream := TMemoryStream.Create;
  try
    try
      lSuccess := ReadFile(fPipeHandle, lDataSize, SizeOf(lDataSize), lBytesRead, nil);

      if not lSuccess then
      begin
        if GetLastError = ERROR_BROKEN_PIPE then
          Exit
        else
          RaiseLastOSError;
      end;

      while lDataSize > 0 do
      begin
        lSuccess := ReadFile(fPipeHandle, lBuffer^, lBufferSize, lBytesRead, nil);

        if not lSuccess and (GetLastError <> ERROR_MORE_DATA) then
          RaiseLastOSError;

        if lBytesRead > 0 then
          lReadStream.Write(lBuffer^, lBytesRead);

        Dec(lDataSize, lBytesRead);
      end;

      if not lSuccess then
        RaiseLastOSError;

      if lReadStream.Size > 0 then
      begin
        lReadStream.Position := 0;
        lWriteStream := TMemoryStream.Create;
        try
          FServer.DispatchMessage(lReadStream, lWriteStream);
          lReadStream.Position := 0;
          lReadStream.SetSize(0);
          lWriteStream.Position := 0;

          lDataSize := lWriteStream.Size;
          lSuccess := WriteFile(fPipeHandle, lDataSize, SizeOf(lDataSize), lBytesWritten, nil);

          if not lSuccess then
            RaiseLastOSError;

          while lDataSize > 0 do
          begin
            lBytesRead := lWriteStream.Read(lBuffer^, lBufferSize);
            Dec(lDataSize, lBytesRead);
            lSuccess := WriteFile(fPipeHandle, lBuffer^, lBytesRead, lBytesWritten, nil);

            if not lSuccess then
              RaiseLastOSError;
          end;
        finally
          lWriteStream.Free;
        end;
      end;
    except
      on E: EOSError do
      begin
        if not (E.ErrorCode in [ERROR_NO_DATA, ERROR_BROKEN_PIPE]) then
          raise;
      end;
    end;
  finally
    FreeMem(lBuffer);
    lReadStream.Free;
    FlushFileBuffers(FPipeHandle);
    DisconnectNamedPipe(FPipeHandle);
    CloseHandle(FPipeHandle);
  end;
end;

摘要

在本文中,我试图提出一种可能的解决方案,用于在 .NET Framework 和可能具有某些限制(例如线程安全或无法在 64 位进程中运行)的旧项目之间实现进程间通信。有时,能够优雅地将旧代码库迁移到 .NET Framework 而不引入重大副作用并使过渡尽可能顺畅是非常重要的。基于命名管道的进程间通信提供了一种非常高效的双向协议,可能有助于实现这些目标。

© . All rights reserved.