65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Thread.BindHandle 进行异步 I/O

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.25/5 (7投票s)

2013年1月7日

Ms-PL

17分钟阅读

viewsIcon

34790

downloadIcon

537

描述了使用 I/O 完成端口进行异步 I/O 的用法,适用于 .NET。

引言

高性能 I/O 通常是 Windows 开发人员留给 C 和 C++ 程序员的挑战。.NET 框架也提供了高性能 I/O 的形式,即绑定到 I/O 完成端口的线程池。虽然存在相关文档,但互联网上以及 MSDN API 目录中都很难找到。本文解释了如何在 .NET 中使用 I/O 完成端口来实现高性能 I/O。

最终产品是一个类,用于启动一个进程并使用 StdIn、StdOut 和 StdErr 的重定向与该进程通信。该类本身很有用,可以不理解本文任何内容而使用。目的是描述允许高性能 I/O 的 IPC 机制。类似的概念可应用于所有重叠 I/O。

背景

读者应熟悉 .NET 的 Streams 及其异步 I/O 的用法(FileStream 类是一个特别好的例子);通过 P/Invoke 机制在 .NET 中使用 WinAPI;以及了解 .NET 中的线程池。

使用代码

该类提供了以下优势:

  • 进程的 StdInStdOut StdErr 无限缓冲。Windows 的缓冲区大小通常只有几 KB,这可能导致进程在写入其 Std* 句柄时被阻塞。
  • 缓冲是分页的,以便在 .NET GC 中进行灵活的内存管理。

启动进程

该类应尽可能易于使用。要启动一个新进程,您的主进程随后可以对其进行监视,请使用以下代码:

Process p = Process.Execute("notepad.exe", false);

将返回一个新的 Process 对象,该对象可用于终止进程、等待其结束以及监视重定向的句柄。第二个参数允许您在继续之前等待进程结束。

如果您将一个进程作为“助手”运行,即一个执行特定任务然后退出的小型进程,您应该使用 C# 的 using 子句来确保无差错的行为(以及底层句柄被释放)。

using (Process p = Process.Execute("cscript /nologo .\\script.vbs", true) {
  Console.WriteLine("Exit Code {0}", p.ExitCode);
}

在这种情况下,第二个参数为 true,表示只有在脚本结束后才会继续执行。该对象仍然有效,允许获取退出代码。进程的任何输出都可以通过 p.StdOut p.StdErr 属性进行解析,通常通过它们提供的 ReadLine() 方法。由于数据在输出时被立即捕获,与您的线程无关,因此即使在程序结束后也可以解析信息。

终止进程

该类仅支持终止由其自身启动的进程。虽然技术上可行(请参阅源代码中提供的单元测试用例 Process_CreateNotepadDispose),但结束非由主程序以编程方式启动的进程通常是一个不常见的需求。

using (Process p = Process.Execute("notepad.exe", false) {
  // Wait one second for the program to end.
  bool result = p.Wait(1000);
  if (!result) {
    p.Terminate(-1);
  }
  Console.WriteLine("Notepad Exit Code {0}", p.ExitCode);
}

这将启动 notepad.exe 进程,并等待一秒钟使其结束(通常是用户手动关闭程序)。如果在此期间未结束,则该进程将被显式终止,退出代码为 -1(因此 ExitCode 方法通常应返回 -1)。

可以通过 ExitCode 属性获取退出代码。仅当进程已结束时,此值才有效,当以下情况发生时,进程已结束:

  • Wait() 返回;
  • Wait(int timeout) 返回 true;
  • Terminate(int exitcode) 被调用。

异步 I/O

这是本文最有趣的部分的开始,即用于从进程检索输出 StdOut StdErr ,以及在需要时向进程提供输入的机制。这里的技术在实现 Process 类方面具有实际应用,但不仅限于如此简单的示例。虽然本文侧重于接收 StdOut (和 StdErr),但对于 StdIn,概念是相同的。

进程与 Process 类之间的通信通过管道进行。创建三个管道,一个用于 StdOutStdErr StdIn。管道的写入端通过 CreateProcess() 中的 StartInfo 对象传递给子进程,用于 StdOut StdErr,第三个管道的读取端用于 StdIn

读取 I/O 模式

第一个任务是选择如何从流中获取信息。有三种便捷的机制:

  1. 单个线程,每个文件句柄执行阻塞的 ReadFile() 调用,实现典型的生产者/消费者模式。
  2. 一个线程处理所有文件句柄,使用重叠 I/O 和 Events。
  3. I/O 完成端口绑定到 ThreadPool 并使用异步回调。

这三种机制将简要介绍,最后一种将详细介绍。实际实现位于 Process.AsyncConsoleReader 类中。该类的设计易于实现模式 1 和 3。

阻塞 I/O

这可以说是实现 I/O 的最简单方法。一个线程(生产者)负责从管道读取数据并将其写入队列。第二个线程(消费者),通常是主线程,从队列中读取数据。每个文件句柄有一个生产者线程,总共三个生产者线程加上主线程。

需要注意的是,对于 Process 类的实现,这可以被认为是一种有效的模式。Windows 方法 CreatePipe() 不支持标志,因此不支持 FILE_FLAG_OVERLAPPED

优点

  • 这种机制易于理解:
  • 它能够以最小的努力移植到多个操作系统。
  • 通过创建新线程/对象实例,可以轻松添加额外文件句柄。

缺点

  • 线程在使用时需要资源。虽然我们只有三个线程,但这不成问题,但扩展性不实用。
  • 存在大量的上下文切换。对于 Windows 上的 x86 处理器,这不太令人担忧,但其他架构效率较低(并且可能与基于 ARM 的架构更相关)。

重叠 I/O 与事件

阻塞 I/O 的缺点是为每个活动文件句柄运行多个线程实例。性能会因上下文切换所需的时间而受到影响,对于大量线程来说,这可能导致所谓的“线程抖动”。即使一个线程已准备好运行,它也可能无法运行。其次,没有策略可以指定哪个线程应该运行,这可能会导致特定文件句柄的 I/O 饥饿,因为 Windows 内核调度程序是为 CPU 性能而不是文件性能设计的,并且是非确定性的。

为了克服这个问题,可以实现一个线程来处理所有文件句柄。当然,性能会有所提高,因为上下文切换减少了,如果一个文件句柄已准备好服务,它可以立即利用 CPU。通过确定测试句柄的顺序,可以实现独立于调度程序的确定性行为(只要调度程序允许 I/O 线程运行)。

此类线程机制的一个示例在开源项目 SerialPortStream 中提供,位于 http://serialportstream.codeplex.com,该项目也由本文作者编写。请参阅 NativeSerialPort_CommOverlappedIo.cs 中的实现。

与阻塞 I/O 相比的优势

  • 通过为所有 I/O 使用单个线程,降低了资源开销。
  • 确定性行为,独立于内核调度程序。

缺点

  • 无法随系统中 CPU 的数量进行扩展。
  • 实现正确的代码很困难。

I/O 完成端口和 .NET 线程池

这是目前在 Windows 的 .NET 框架内实现异步 I/O 的最佳机制。它也被设计为 Windows 框架中性能最高的模型,如《Windows 内部机制(第六版)》第 8 章(I/O 完成端口)中所述。

不幸的是,关于如何在 .NET 中使用 I/O 完成端口的文档和代码示例非常少。MSDN 文档也非常有限。此处提供文档。下面将详细介绍其用法和陷阱。

与先前模型的优势

  • 与现有的异步流概念(例如 FileStream 类)相似;
  • 可用于实现您自己的异步流,因为编程模型非常相似;
  • 能够很好地随机器 CPU 数量进行扩展;
  • 高性能。
缺点
  • MSDN 文档不足,网络上示例很少(且不正确);
  • 内存泄漏可能难以查找;
  • 可能发生内存损坏。

I/O 完成端口和 .NET 线程池

我们将更详细地介绍最后一个选项。建议使用 .NET 反射器(如 ILSpy 或 Reflector)来研究 Microsoft 对 SerialPort FileStream 类的实现。它们也在内部实现中使用 I/O 完成端口和 ThreadPools

使用 I/O 完成端口有四个步骤。虽然本文与 BeefyCode 的博客相似,但在使用 ReadFile() WriteFile() API 方面并不兼容。稍后将澄清在使用 Win32 API“发起异步 I/O”时。

创建重叠 I/O

CreateFile() CreateNamedPipe() 方法提供了一个标志选项来指定 FILE_FLAG_OVERLAPPED。但是,CreatePipe() 方法不允许指定这些标志,因此不能用于重叠 I/O。重叠 I/O 必须在任何类型的重叠 I/O(使用 Events 进行通知或 I/O 完成端口时)中使用。

作为 Process 类的解决方法,从 Dave Hart 创建了一个端口。C# 等效实现可以在下载的代码中找到,作为 Win32.CreatePipeEx() 方法。它使用 CreateNamedPipe() 创建命名管道以进行入站二进制数据,并使用 CreateFile() API 打开写入端。

下面指定了方法及其原型,改编自 PInvoke.NET。枚举可以在下载源代码的 Native.cs 文件中找到。

internal static class UnsafeNativeMethods {
  [DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Unicode)]
  public static extern SafeFileHandle CreateFile(
    string lpFileName,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileAccess dwDesiredAccess,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileShare dwShareMode,
    ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.CreationDisposition dwCreationDisposition,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.FileAttributes dwFlagsAndAttributes,
    IntPtr hTemplateFile);
  [DllImport("kernel32.dll", SetLastError = true)]
  public static extern SafeFileHandle CreateNamedPipe(string lpName, 
    [MarshalAs(UnmanagedType.U4)] NativeMethods.PipeOpenMode dwOpenMode,
    [MarshalAs(UnmanagedType.U4)] NativeMethods.PipeMode dwPipeMode, 
    uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
    uint nDefaultTimeOut, ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes);
}

可以这样创建一个可以使用重叠 I/O 的文件句柄:

SafeFileHandle hFile;

string pipeName = string.Format("\\\\.\\Pipe\\{0:X}.{1},
  SafeNativeMethods.GetCurrentProcessId(), Guid.NewGuid().ToString("D"));

hReadFile = UnsafeNativeMethods.CreateNamedPipe(pipeName, 
  NativeMethods.PipeOpenMode.PIPE_ACCESS_INBOUND | 
    (NativeMethods.PipeOpenMode)NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED,
  NativeMethods.PipeMode.PIPE_TYPE_BYTE | NativeMethods.PipeMode.PIPE_WAIT,
  1, nSize, nSize, 120 * 1000, ref lpPipeAttributes);

if (hReadFile.IsInvalid) return false;

在使用 CreateFile() API 时,等效操作如下:

hWriteFile = UnsafeNativeMethods.CreateFile(pipeName, 
  NativeMethods.FileAccess.GENERIC_WRITE,
  NativeMethods.FileShare.FILE_SHARE_NONE, 
  ref lpPipeAttributes, 
  NativeMethods.CreationDisposition.OPEN_EXISTING,
  NativeMethods.FileAttributes.FILE_ATTRIBUTE_NORMAL | 
    NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED, 
  IntPtr.Zero);

重要的是在创建文件句柄时提供 FILE_FLAG_OVERLAPPED 标志作为选项之一。

绑定句柄和 I/O 完成端口

这是本文中最神秘的部分。有关更多信息,请参阅《Windows 内部机制(第六版)》第 8 章“I/O 完成端口”。这与《Windows 内部机制(第五版)》第 7 章相同。

I/O 完成端口是 Windows Executive 公开的一个对象,可以与多个文件句柄关联。也就是说,一个完成端口,多个文件句柄。与 I/O 完成端口关联的任何重叠文件操作都会导致 Windows 向完成端口发送一个完成包。多个线程可以等待同一个完成端口。完成端口提供了并发的优势,因此通常只有n个与 I/O 关联的线程在任何给定时间等待一个完成端口。

.NET 子系统根据系统中的 CPU 线程数指定并发线程数(例如,具有超线程的 4 核 CPU 有 8 个 CPU 线程)。这在创建完成端口时指定。

Windows 函数 CreateIoCompletionPort() 用于创建完成端口以及将文件句柄与完成端口关联。这可以通过 .NET 框架中的 Thread.BindHandle(SafeHandle handle) 方法完成。使用 Rohitab 的 API Monitor,我们可以看到以下内容:

模块
API
返回值
clr.dll
CreateIoCompletionPort ( 0xffffffffffffffff, NULL, 0, 8 ) 0x0000000000000204
KERNELBASE.dll NtCreateIoCompletion ( 0x00000000010ae528, IO_COMPLETION_ALL_ACCESS, NULL, 8 ) STATUS_SUCCESS
clr.dll
CreateIoCompletionPort ( 0x00000000000001f0, 0x0000000000000204, 8792377772672, 8 ) 0x0000000000000204
KERNELBASE.dll NtSetInformationFile ( 0x00000000000001f0, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) STATUS_SUCCESS
clr.dll
CreateIoCompletionPort ( 0x00000000000001f4, 0x0000000000000204, 8792377772672, 8 ) 0x0000000000000204
KERNELBASE.dll NtSetInformationFile ( 0x00000000000001f4, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) STATUS_SUCCESS

有两个对 Thread.BindHandle() 的调用,一个使用句柄 0x1f0,第二个使用句柄 0x1f4。可以看到,第一个调用中,.NET 框架调用了两次 CreateIoCompletionPort() ,一次创建完成端口,第二次将句柄绑定到完成端口 0x204。

因此,在创建文件句柄后,应将其绑定到完成端口,如 Process.AsyncConsoleReader.ctor() 中所述。如果忽略此步骤,.NET 框架将无法稍后发出回调以指示异步操作已完成。

public unsafe AsyncConsoleReader(SafeFileHandle streamHandle, string name) : base(name) {
  m_StreamHandle = streamHandle;
  ThreadPool.BindHandle(streamHandle);

  ConsoleAsyncResult ar = new ConsoleAsyncResult(this);
  DoReadOperation(ar);
}

需要 unsafe 关键字,因为构造函数会启动使用指针的读取操作。

创建 Overlapped 结构

支持重叠 I/O 的 Windows API 通常有一个参数 LPOVERLAPPED lpOverlapped。此参数可以通过 Pack() 方法由 .NET Overlapped 类构成。

假设您有一个数组 byte[] buffer,数据应异步读取到其中。您应该使用 Overlapped.Pack(IOCallback, buffer) 的第二个形式。MSDN 文档明确说明了原因:

运行时会在 I/O 操作的持续时间内固定(pin)缓冲区或缓冲区中指定的缓冲区。如果应用程序域被卸载,运行时会在 I/O 操作完成之前保持内存固定。

因此,未将 I/O 操作正在修改的缓冲区传递给 Pack() 方法是错误的。虽然可以使用 GC 固定缓冲区,但如果应用程序域在结束时结束(例如,您的程序在异步 I/O 操作期间结束),您仍然可能遇到数据损坏。创建 NativeOverlapped 结构的方法更简单且安全。buffer 将在调用 Overlapped.Unpack() 之前保持固定,并且 NativeOverlapped 结构占用的内存将在调用 Overlapped.Free() 之前保持。

在创建 NativeOverlapped 结构之前,需要先创建一个 Overlapped 对象。对于 Process 类的实现,我们不需要使用 Events(如果您实现支持 BeginRead() EndRead() Stream ,则可能需要)。应使用 Overlapped 构造函数的最通用形式,该形式允许提供一个类型为 IAsyncResult 的通用对象。

IAsyncResult 是必需的,以允许将通用数据从启动异步 I/O 的调用者传递到回调,回调应实现为静态方法(因此无法访问 this 对象)。

举个例子:

private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
  AsyncConsoleReader acr = (AsyncConsoleReader)ar.AsyncState;
  // Buffer.EndArray is fixed automatically by the Pack() method.
  NativeOverlapped* noverlapped =

    new Overlapped(0, 0, IntPtr.Zero, ar).Pack>(ReadCompletionCallback, acr.Buffer.EndArray);
  ..
}

发起异步 I/O

创建 NativeOverlapped 结构后,可以将其传递给执行异步 I/O 的方法,例如 ReadFile() 函数。为了简化封送,使用了 fixed 关键字,这在 C# 中需要 unsafe

ReadFile() 的原型定义为:

[DllImport("kernel32.dll", SetLastError = true)]
public unsafe static extern bool ReadFile(SafeFileHandle hFile, byte* lpBuffer,
  uint nNumberOfBytesToRead, IntPtr lpNumberOfBytesRead, NativeOverlapped* lpOverlapped);

这允许高效地使用内存缓冲区,直接写入队列,避免了复制操作。实现方式是始终写入队列中的下一个可用字节,而不是写入特殊缓冲区,然后再复制到队列中。通过实现一个由数组组成的链表队列,我们可以限制 GC 锁定的内存量,并允许 GC 本身进行高效的内存处理。请参阅提供的源代码中的 PagedQueue<> 实现。

然后使用以下命令启动写入队列的读取操作:

private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
  ..

  bool result;
  fixed (byte* pBuf = acr.Buffer.EndArray) {
    result = UnsafeNativeMethods.ReadFile(acr.m_StreamHandle, pBuf + acr.Buffer.End, 
      (uint)acr.Buffer.WriteLength, IntPtr.Zero, noverlapped);
  }
 
  if (!result) {
    int error = Marshal.GetLastWin32Error();
    if (error != 997) {
      ReadCompletionCallback((uint)error, 0, noverlapped);
    }
  }
  // else, the callback is still executed
}

如果发生错误且该错误不指示 ERROR_IO_PENDING(错误代码 997),则应释放 NativeOverlapped 结构。这是通过调用 ReadCompletionCallback (参见下一节)并提供错误来完成的。这为错误处理提供了一个中心位置。

实现异步 Stream 时,上述所有内容都将在 BeginRead() 方法中完成。

陷阱

需要特别说明的是,没有明显的文档说明 ReadFile() 操作的行为,并且在《Windows 内部机制》中只有一点提示。

ReadFile() 方法在操作是*同步*且成功时返回 true。如果成功,则无需执行任何操作,I/O 完成端口仍会收到 I/O 完成包并执行回调。在这种情况下,错误是释放结构,如 BeefyCode 所述。

第一次实现是在成功时调用 Overlapped.Unpack() Overlapped.Free() ,然后再次调用 ReadFile() 以启动新操作。然而,观察到(至少在 Windows 8 上)在同步读取操作之后,不再会发生回调。

如果您实现自己的 Stream ,则应在 IAsyncResult 中设置适当的字段,以指示操作是同步且成功的。回调仍会被调用。

至于《Windows 内部机制》中的提示,有一个函数 SetFileCompletionNotificationModes() API 可以改变这种行为。但是,作者没有使用此函数,也没有对其进行测试,因为它似乎是 Windows Vista 及更高版本的新增功能(例如,它不适用于 Windows XP)。其次,《Windows 内部机制》和 MSDN 的文档不匹配。

回调处理

异步 I/O 操作完成后,会排队一个 I/O 完成包。.NET 线程池中与 I/O 完成端口关联的 .NET 线程会收到通知,并执行 Pack() 方法提供的回调。所有相关信息都以错误代码、读取(或写入)的字节数以及 NativeOverlapped 结构的形式提供。

NativeOverlapped 结构中,可以使用 Unpack() 方法获取原始 Overlapped 结构。这会解除 Pack() 方法中提供的 buffer 的固定。从 Overlapped 结构中,可以获取 AsyncState 对象,该对象是 IAsyncResult 类型。

如果实现异步 Stream Stream 将调用用户回调,允许他们调用 EndRead() 并可能启动对 BeginRead() 的新调用。由于 Process 类完全在内部管理异步 I/O,将其数据作为同步流呈现给外部,因此 Process 类创建的 AsyncState 对象包含启动新读取操作所需的所有必要信息。

对于每个异步操作,确保调用 Overlapped.Unpacked(nativeOverlapped) 然后调用 Overlapped.Free(nativeOverlapped) 非常重要。Microsoft 的 SerialPort 实现通过在回调中调用 Unpacked() ,并在用户调用 EndRead() 时调用 Free() 来做到这一点。这就是为什么 Microsoft 声明如果不调用异步流的 EndRead(),可能会发生内存泄漏。

Process 类中完成回调的代码如下所示:

private static unsafe void ReadCompletionCallback(uint errorCode, uint numBytes, 
    NativeOverlapped* nativeOverlapped) {
  ConsoleAsyncResult ar;
  try {
    // Unpin the NativeOverlapped structure by unpacking it
    ar = (ConsoleAsyncResult)Overlapped.Unpack(nativeOverlapped).AsyncResult;
 
    if (errorCode == 0) {
      if (numBytes > 0) {
        AsyncConsoleReader cr = (AsyncConsoleReader)ar.AsyncState;
        cr.WriteBuffer((int)numBytes);
      }
    } else {
      //   6 - ERROR_INVALID_HANDLE
      // 109 - ERROR_BROKEN_PIPE
      // 995 - ERROR_OPERATION_ABORTED
      if (errorCode == 109 || errorCode == 6 || errorCode == 995) {
        cr.m_StreamHandle.Close();
      } else {
        System.Diagnostics.Trace.WriteLine("ReadCompletionCallback: error " + errorCode + " for " + cr.Name);
      }
    }
  } finally {
    Overlapped.Free(nativeOverlapped);
  }
  if (errorCode == 0) DoReadOperation(ar);
}

我们看到我们总是 Unpack() 该结构并释放与 NativeOverlapped 结构关联的内存。如果没有错误,我们启动一个新的读取操作,本质上是读取直到远程管道关闭,或者直到 I/O 被取消。

观察

在 ReadCompletionCallback() 方法中,有一个对用户事件的隐藏调用,作为 cr.WriteBuffer((int)numBytes) 的一部分。由于调用了用户委托,该用户委托可以阻止进一步的 I/O,因为下一个 DoReadOperation() 仅在用户委托完成后才会发生。

这是 ReadLine(int timeout) 方法在 DataReceived 事件中调用用户委托时遇到的一个初始问题。理论上,委托可以调用 ReadLine(1000)。如果一行不完整,它应该等待更多数据直到 1000ms 超时到期。但是,如果不再有 I/O,它就无法做到这一点。因此,OnDataReceived 事件在 .NET 线程池中的一个线程上启动事件。如果在事件执行期间有新数据到达,它将被记住,如果仍有数据可用,则会触发新事件。

取消 I/O

Dispose Process 对象会导致调用 CancelIoEx() ,以确保取消所有异步 I/O 操作的关闭。这将导致回调被调用,并附带错误 995 ERROR_OPERATION_ABORTED

SafeHandles

SafeHandle 是使用异步 I/O 的关键功能。它可以在句柄被关闭时保护我们(通过使用 hFile.Close() 而不是 CloseHandle() )。如果句柄当前被 Windows API 使用,由于内部引用计数,它不会立即关闭。如果关闭的句柄传递给 Windows 函数,将引发一个异常,指示句柄已关闭。

更多信息

如正文中提供,可以通过反射学到很多东西:

后续

以下项目可供后续跟进:

  1. 如何发送 I/O 包以指示线程应中止,如《Windows 内部机制》中所述?
  2. 取消 I/O 的最有效方法是什么?AsyncConsoleReader 和 AsyncConsoleWriter 中的 Dispose() 方法调用 CancelIoEx(),这可能已被其他地方关闭(事实上,try { } catch { } 块涵盖了这种情况。
    1. MS Serial Port 通常检查错误代码 6,ERROR_INVALID_HANDLE

关注点

花了大约两天时间,进行了大量嗅探,最后是偶然的运气,才找到了正确使用信息以及处理 ReadFile() 指示同步成功(返回 true)的情况,以了解为什么不再发生读取操作。

历史

  • 2013 年 1 月 7 日:初始版本
© . All rights reserved.