.NET 和 Delphi 通过命名管道实现进程间通信





5.00/5 (4投票s)
.NET 应用程序通过标准 Windows 命名管道调用原生方法
引言
假设我们有使用 Delphi 编写的旧代码,它执行某种复杂的数据处理。该代码是 32 位且非线程安全的,这意味着它一次只能处理一个请求。我们的任务是通过在其周围引入一个 .NET 包装器来扩展现有代码的功能,可能通过 WCF 或 .NET 世界中可用的其他方式公开功能,并在两者之间实现高效的通信机制。我们还需要使 .NET 包装器为 64 位并可能支持并行处理。
后两个要求决定了包装器和 Delphi 部分必须编译成单独的可执行文件,因为共享单个进程边界(例如通过 P/Invoke 进行通信)在这种情况下不是一个选项:32 位 DLL 无法在 64 位进程中运行,并且由于我们的 Delphi 代码不是线程安全的,因此无法在单个进程中实现并行性。
组织进程间通信 (IPC) 的方法有很多种。但是,在本文中,我们将重点介绍标准 Windows 命名管道。附加的解决方案由两个项目组成:.NET 和 Delphi 控制台应用程序。为了模拟数据处理,我们将在 Delphi 项目中实现 4 种算术运算:加法、减法、乘法和除法。.NET 应用程序将用于调用这 4 种运算之一。由于我们专注于命名管道进程间通信方面,因此我们将不涉及安全、自动消息序列化、自动调用分派或支持并行性等其他问题。
消息格式
在当前解决方案中,两个进程通过发送 XML 格式的字符串消息进行通信。这是一个描述乘法 X
和 Y
变量请求的消息 XML 示例
<Message Name="Mult">
<Data Key="X">3</Data>
<Data Key="Y">5</Data>
</Message>
生成的消息将是
<Message Name="Result">
<Data Key="Result">15</Data>
</Message>
如果计算由于某种原因产生异常,消息将如下所示
<Message IsError="1" ErrorCode="EZeroDivide" ErrorText="Floating point division by zero"/>
调用方和被调用方都必须了解消息的格式并同意遵循该协议。也就是说,我们可以描述任何方法,无论有多少个参数,通过管道发送,并期望从通道返回结果,包括在过程中可能发生的任何异常。尽管消息格式比实际需要更冗长,但对于我们的演示目的来说是可以接受的。
消息结构分别封装在 .NET 和 Delphi 项目中的 PipeMessage
和 TPipeMessage
类中。
高层协议
如上所述,解决方案中有两个控制台应用程序。如果我们执行 Delphi 应用程序,它将不会做任何事情,除非我们提供一个带有管道通道名称的命令行参数,如下所示:pipe=<管道通道名称>
。此名称是一个任意字符串,表示我们期望通过其进行通信的通道名称。如果指定了通道名称,Delphi 应用程序将实例化一个管道服务器并开始侦听命令。
另一方面,调用 .NET 应用程序定义了一个名为“interop.demo
”的通道,并尝试通过该通道发送所有请求。发送请求后,如果 Delphi 应用程序已经实例化并正在侦听该通道,它可能会获得结果,否则会收到一个名为 PipeServerNotFoundException
的异常,指示尚不存在此类通道。在处理此异常的后一种情况下,它将尝试实例化 Delphi 可执行文件,将通道名称作为命令行参数传递,并再次重新发出相同的请求。
public sealed class PipeInteropDispatcher
{
private const string c_pipeName = @"interop.demo";
private const string c_pipeServerName = "NamedPipesInteropDelphi.exe";
private static async Task CreatePipeServer(string pipeName)
{
try
{
Process.Start(c_pipeServerName, "pipe=" + pipeName);
await new PipeClient(pipeName).WaitForPipe(10000);
}
catch (Exception ex)
{
throw new InvalidOperationException(string.Format(
"Unable to instantiate Server: {0}", ex.Message), ex);
}
}
public static async Task ProcessRequestAsync(PipeMessage request)
{
var needInstance = false;
var result = default (PipeMessage);
var pipe = new PipeClient(c_pipeName);
try
{
result = await pipe.ProcessRequest(request);
}
catch (PipeServerBrokenPipeException)
{
needInstance = true;
}
catch (PipeServerNotFoundException)
{
needInstance = true;
}
if (needInstance)
{
await CreatePipeServer(c_pipeName);
result = await pipe.ProcessRequest(request);
}
return result;
}
}
有了这些规则,我们获得了以下好处
- 易于调试 - 我们可以从 Delphi 调试器运行 Delphi 应用程序,将正确的管道通道名称指定为命令行参数。
- Delphi 进程可以随时自行终止(例如,在空闲超时时),而不会破坏系统,因为通过管道发出的任何后续请求都会因适当的异常而失败,并且调用进程将重新实例化该应用程序。事实上,我们可以随时从任务管理器强制终止 Delphi 进程,效果相同。
- 我们可以在现有架构之上实现并行功能(未在当前解决方案中呈现),方法是维护一个管道通道名称池,我们可以并行地通过它发送消息。
客户端(.NET 应用)
根据用户的输入,我们将适当的方法名称以及 X
和 Y
变量发送到以下方法
private static async Task<decimal> CalcAsync(string method, decimal x, decimal y)
{
var args = new Dictionary<string, string>
{
{"X", x.ToString(CultureInfo.InvariantCulture)},
{"Y", y.ToString(CultureInfo.InvariantCulture)}
};
var result = await PipeInteropDispatcher.ProcessRequestAsync(new PipeMessage(method, args));
return decimal.Parse(result["Result"]);
}
我们只需创建 Dictionary<string, string>
对象,用输入参数填充它,然后将其交给 PipeInteropDispatcher
进行进一步处理。
PipeClient
类在通过管道通道发送消息方面发挥着核心作用。在内部,它使用标准的 .NET NamedPipeClientStream
类与原生 Windows 命名管道 API 进行通信。为了高效地在通道上传输大型消息,我们以最大 4KB 的块发送它们。因此,任何大于 4KB 的请求都会被分解成一系列较小的消息,这些消息通过通道顺序地流出。
为了协调原始消息大小与对应进程,流数据的第一个 4 字节将包含一个表示该原始消息大小的 Int32
数字。这样,接收消息的进程将从流中读取前 4 字节,并使用该值继续读取数据,直到读取完整个 X 字节的消息。
public sealed class PipeClient
{
private readonly string m_serverName;
private readonly string m_pipeName;
public PipeClient(string pipeName)
: this(".", pipeName) { }
public PipeClient(string serverName, string pipeName)
{
m_serverName = serverName;
m_pipeName = pipeName;
}
private static async Task<bool> CopyStream(Stream src, Stream dst, int bytes, int bufferSize)
{
var buffer = new byte[bufferSize];
int read;
while (bytes > 0 &&
(read = await src.ReadAsync(buffer, 0, Math.Min(buffer.Length, bytes))) > 0)
{
await dst.WriteAsync(buffer, 0, read);
bytes -= read;
}
return bytes == 0;
}
private async Task<string> ProcessRequest(string message)
{
var dataBuffer = Encoding.ASCII.GetBytes(message);
var dataSize = dataBuffer.Length;
if (dataSize == 0)
return null;
using (var pipe = new NamedPipeClientStream(m_serverName, m_pipeName,
PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))
{
try
{
await Task.Run(() => pipe.Connect(500));
}
catch (TimeoutException e)
{
throw new PipeServerNotFoundException(e.Message, e);
}
pipe.ReadMode = PipeTransmissionMode.Message;
const int cBufferSize = 4096;
try
{
var dataSizeBuffer = BitConverter.GetBytes(dataSize);
await pipe.WriteAsync(dataSizeBuffer, 0, dataSizeBuffer.Length);
using (var stream = new MemoryStream(dataBuffer))
await CopyStream(stream, pipe, dataSize, cBufferSize);
await pipe.FlushAsync();
dataSizeBuffer = new byte[sizeof(Int32)];
var bytesRead = await pipe.ReadAsync(dataSizeBuffer, 0, sizeof(Int32));
if (bytesRead <= 0)
throw new PipeServerBrokenPipeException();
dataSize = BitConverter.ToInt32(dataSizeBuffer, 0);
if (dataSize <= 0)
return null;
using (var stream = new MemoryStream(dataSize))
{
if (!await CopyStream(pipe, stream, dataSize, cBufferSize))
throw new PipeServerBrokenPipeException();
var resultBuffer = stream.GetBuffer();
var decoder = Encoding.ASCII.GetDecoder();
var charCount = decoder.GetCharCount(resultBuffer, 0, dataSize, false);
var charResultBuffer = new char[charCount];
decoder.GetChars(resultBuffer, 0, dataSize, charResultBuffer, 0, false);
decoder.Reset();
return new string(charResultBuffer);
}
}
catch (IOException ex)
{
// Console.WriteLine(ex.Message);
// NOTE: This is not reliable, but will do for now
if (ex.Message.Contains("Pipe is broken"))
throw new PipeServerBrokenPipeException();
throw;
}
}
}
public async Task<PipeMessage> ProcessRequest(PipeMessage message)
{
var resultMessage = await ProcessRequest(message.ToString());
var result = new PipeMessage(resultMessage);
if (result.IsError)
throw new PipeServerException(result.ErrorCode, result.ErrorText);
return result;
}
public async Task WaitForPipe(int timeout)
{
await Task.Run(() =>
{
using (var pipe = new NamedPipeClientStream(m_serverName, m_pipeName,
PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough))
pipe.Connect(timeout);
});
}
}
服务器端 (Delphi 应用)
以下是服务器端用于支持命名管道通信的类列表
TNamedPipeServer
。此类封装了与原生 Windows 命名管道 API 的底层交互。TPipeMessage
。表示 XML 消息的对象包装器。TCalcPipeServer
。继承自TNamedPipeServer
并重写其DispatchMessage
方法,将请求传递给调度程序。TCalcRequestDispatcher
。封装方法调度功能,将传入的 XML 消息转换为实际的方法调用,并将结果打包回生成的 XML 消息。TCalcProcessor
。一个帮助类,包含业务逻辑的实际实现(在本例中为 4 种算术运算)。
当 Delphi 应用程序启动时,它将从命令行读取管道通道名称,并实例化一个管道服务器,将该通道名称传递给它。管道服务器实例将创建通道并开始侦听传入的请求,直到用户按下 ENTER
或应用程序以其他方式关闭。
收到来自通道的消息后,管道服务器会将其交给一个 DispatchMessage
虚拟方法,该方法被 TCalcPipeServer
重写
procedure TCalcPipeServer.DispatchMessage(ARequestStream, AResponseStream: TStream);
var
lData: string;
begin
SetLength(lData, ARequestStream.Size);
ARequestStream.Read(lData[1], Length(lData));
lData := RequestDispatcher.ProcessRequest(lData);
AResponseStream.Write(lData[1], Length(lData));
end;
在这种情况下,RequestDispatcher
由 TCalcRequestDispatcher
类表示,它将 XML 请求转换为实际的方法调用,并将结果作为 XML 字符串返回
function TCalcRequestDispatcher.InternalProcessRequest(const ARequest: string): string;
var
lRequest: TPipeMessage;
X, Y, lResult: Double;
begin
Result := '';
lRequest := TPipeMessage.Create(ARequest);
try
if lRequest.Name = 'Add' then
begin
X := StrToFloat(lRequest.Data['X']);
Y := StrToFloat(lRequest.Data['Y']);
lResult := TCalcProcessor.Add(X, Y);
Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
end
else if lRequest.Name = 'Subtract' then
begin
X := StrToFloat(lRequest.Data['X']);
Y := StrToFloat(lRequest.Data['Y']);
lResult := TCalcProcessor.Subtract(X, Y);
Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
end
else if lRequest.Name = 'Mult' then
begin
X := StrToFloat(lRequest.Data['X']);
Y := StrToFloat(lRequest.Data['Y']);
lResult := TCalcProcessor.Mult(X, Y);
Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
end
else if lRequest.Name = 'Div' then
begin
X := StrToFloat(lRequest.Data['X']);
Y := StrToFloat(lRequest.Data['Y']);
lResult := TCalcProcessor.Divide(X, Y);
Result := TPipeMessage.MakeMessage('Result', ['Result'], [FloatToStr(lResult)]);
end
else if lRequest.Name = 'ExitProcess' then
begin
ExitProcess(0);
end
else if lRequest.Name = 'Ping' then
begin
// do nothing;
end
else
raise Exception.CreateFmt('Unknown request: %s', [lRequest.Name]);
finally
lRequest.Free;
end;
end;
管道服务器实现
TNamedPipeServer
类在其构造函数中接受一个表示管道通道名称参数的字符串参数,并在内部创建一个 Listener Thread
、一个 Message Queue
和一个 Worker Thread
。
监听器由 TPipeListenerThread
类表示。它的主要工作是不断监视管道通道的传入请求,并将它们发送到消息队列。后者由 TThreadQueue
类表示,这是一个具有阻塞行为的线程安全队列实现,这意味着如果队列大小达到最大消息数,推送线程将被阻塞,如果队列中没有元素,弹出线程将被阻塞。最后,工作线程由 TPipeWorkerThread
类表示,它不断从队列中提取消息并将其交给消息处理程序。
消息处理程序由 TPipeMessageHandler
类表示,并实现流读/写逻辑。消息处理程序使用给定管道通道的句柄并开始从中读取。一旦读取消息,它就会按照协议规则将其传递给调度程序(我们已经描述过),并将最终结果写回到流中。消息流的源代码如下所示
procedure TPipeMessageHandler.Execute;
var
lReadStream, lWriteStream: TMemoryStream;
lBytesRead, lBytesWritten: DWORD;
lSuccess: BOOL;
lDataSize, lBufferSize: Integer;
lBuffer: PChar;
begin
lBufferSize := FServer.BufferSize;
GetMem(lBuffer, lBufferSize);
lReadStream := TMemoryStream.Create;
try
try
lSuccess := ReadFile(fPipeHandle, lDataSize, SizeOf(lDataSize), lBytesRead, nil);
if not lSuccess then
begin
if GetLastError = ERROR_BROKEN_PIPE then
Exit
else
RaiseLastOSError;
end;
while lDataSize > 0 do
begin
lSuccess := ReadFile(fPipeHandle, lBuffer^, lBufferSize, lBytesRead, nil);
if not lSuccess and (GetLastError <> ERROR_MORE_DATA) then
RaiseLastOSError;
if lBytesRead > 0 then
lReadStream.Write(lBuffer^, lBytesRead);
Dec(lDataSize, lBytesRead);
end;
if not lSuccess then
RaiseLastOSError;
if lReadStream.Size > 0 then
begin
lReadStream.Position := 0;
lWriteStream := TMemoryStream.Create;
try
FServer.DispatchMessage(lReadStream, lWriteStream);
lReadStream.Position := 0;
lReadStream.SetSize(0);
lWriteStream.Position := 0;
lDataSize := lWriteStream.Size;
lSuccess := WriteFile(fPipeHandle, lDataSize, SizeOf(lDataSize), lBytesWritten, nil);
if not lSuccess then
RaiseLastOSError;
while lDataSize > 0 do
begin
lBytesRead := lWriteStream.Read(lBuffer^, lBufferSize);
Dec(lDataSize, lBytesRead);
lSuccess := WriteFile(fPipeHandle, lBuffer^, lBytesRead, lBytesWritten, nil);
if not lSuccess then
RaiseLastOSError;
end;
finally
lWriteStream.Free;
end;
end;
except
on E: EOSError do
begin
if not (E.ErrorCode in [ERROR_NO_DATA, ERROR_BROKEN_PIPE]) then
raise;
end;
end;
finally
FreeMem(lBuffer);
lReadStream.Free;
FlushFileBuffers(FPipeHandle);
DisconnectNamedPipe(FPipeHandle);
CloseHandle(FPipeHandle);
end;
end;
摘要
在本文中,我试图提出一种可能的解决方案,用于在 .NET Framework 和可能具有某些限制(例如线程安全或无法在 64 位进程中运行)的旧项目之间实现进程间通信。有时,能够优雅地将旧代码库迁移到 .NET Framework 而不引入重大副作用并使过渡尽可能顺畅是非常重要的。基于命名管道的进程间通信提供了一种非常高效的双向协议,可能有助于实现这些目标。