使用 Thread.BindHandle 进行异步 I/O






4.25/5 (7投票s)
描述了使用 I/O 完成端口进行异步 I/O 的用法,适用于 .NET。
引言
高性能 I/O 通常是 Windows 开发人员留给 C 和 C++ 程序员的挑战。.NET 框架也提供了高性能 I/O 的形式,即绑定到 I/O 完成端口的线程池。虽然存在相关文档,但互联网上以及 MSDN API 目录中都很难找到。本文解释了如何在 .NET 中使用 I/O 完成端口来实现高性能 I/O。
最终产品是一个类,用于启动一个进程并使用 StdIn、StdOut 和 StdErr 的重定向与该进程通信。该类本身很有用,可以不理解本文任何内容而使用。目的是描述允许高性能 I/O 的 IPC 机制。类似的概念可应用于所有重叠 I/O。
背景
读者应熟悉 .NET 的 Streams 及其异步 I/O 的用法(FileStream 类是一个特别好的例子);通过 P/Invoke 机制在 .NET 中使用 WinAPI;以及了解 .NET 中的线程池。
使用代码
该类提供了以下优势:
- 进程的
StdIn
、StdOut
和StdErr
无限缓冲。Windows 的缓冲区大小通常只有几 KB,这可能导致进程在写入其Std*
句柄时被阻塞。 - 缓冲是分页的,以便在 .NET GC 中进行灵活的内存管理。
启动进程
该类应尽可能易于使用。要启动一个新进程,您的主进程随后可以对其进行监视,请使用以下代码:
Process p = Process.Execute("notepad.exe", false);
将返回一个新的 Process 对象,该对象可用于终止进程、等待其结束以及监视重定向的句柄。第二个参数允许您在继续之前等待进程结束。
如果您将一个进程作为“助手”运行,即一个执行特定任务然后退出的小型进程,您应该使用 C# 的 using 子句来确保无差错的行为(以及底层句柄被释放)。
using (Process p = Process.Execute("cscript /nologo .\\script.vbs", true) {
Console.WriteLine("Exit Code {0}", p.ExitCode);
}
在这种情况下,第二个参数为 true
,表示只有在脚本结束后才会继续执行。该对象仍然有效,允许获取退出代码。进程的任何输出都可以通过 p.StdOut
或 p.StdErr
属性进行解析,通常通过它们提供的 ReadLine()
方法。由于数据在输出时被立即捕获,与您的线程无关,因此即使在程序结束后也可以解析信息。
终止进程
该类仅支持终止由其自身启动的进程。虽然技术上可行(请参阅源代码中提供的单元测试用例 Process_CreateNotepadDispose
),但结束非由主程序以编程方式启动的进程通常是一个不常见的需求。
using (Process p = Process.Execute("notepad.exe", false) {
// Wait one second for the program to end.
bool result = p.Wait(1000);
if (!result) {
p.Terminate(-1);
}
Console.WriteLine("Notepad Exit Code {0}", p.ExitCode);
}
这将启动 notepad.exe 进程,并等待一秒钟使其结束(通常是用户手动关闭程序)。如果在此期间未结束,则该进程将被显式终止,退出代码为 -1(因此 ExitCode
方法通常应返回 -1)。
可以通过 ExitCode
属性获取退出代码。仅当进程已结束时,此值才有效,当以下情况发生时,进程已结束:
-
Wait()
返回; Wait(int timeout)
返回 true;Terminate(int exitcode)
被调用。
异步 I/O
这是本文最有趣的部分的开始,即用于从进程检索输出 StdOut
和 StdErr
,以及在需要时向进程提供输入的机制。这里的技术在实现 Process 类方面具有实际应用,但不仅限于如此简单的示例。虽然本文侧重于接收 StdOut
(和 StdErr
),但对于 StdIn
,概念是相同的。
StdOut
、StdErr
和 StdIn
。管道的写入端通过 CreateProcess()
中的 StartInfo
对象传递给子进程,用于 StdOut
和 StdErr
,第三个管道的读取端用于 StdIn
。读取 I/O 模式
第一个任务是选择如何从流中获取信息。有三种便捷的机制:
- 单个线程,每个文件句柄执行阻塞的
ReadFile()
调用,实现典型的生产者/消费者模式。 - 一个线程处理所有文件句柄,使用重叠 I/O 和
Event
s。 - I/O 完成端口绑定到
ThreadPool
并使用异步回调。
这三种机制将简要介绍,最后一种将详细介绍。实际实现位于 Process.AsyncConsoleReader
类中。该类的设计易于实现模式 1 和 3。
阻塞 I/O
这可以说是实现 I/O 的最简单方法。一个线程(生产者)负责从管道读取数据并将其写入队列。第二个线程(消费者),通常是主线程,从队列中读取数据。每个文件句柄有一个生产者线程,总共三个生产者线程加上主线程。
需要注意的是,对于 Process 类的实现,这可以被认为是一种有效的模式。Windows 方法 CreatePipe()
不支持标志,因此不支持 FILE_FLAG_OVERLAPPED
。
优点
- 这种机制易于理解:
- 它能够以最小的努力移植到多个操作系统。
- 通过创建新线程/对象实例,可以轻松添加额外文件句柄。
缺点
- 线程在使用时需要资源。虽然我们只有三个线程,但这不成问题,但扩展性不实用。
- 存在大量的上下文切换。对于 Windows 上的 x86 处理器,这不太令人担忧,但其他架构效率较低(并且可能与基于 ARM 的架构更相关)。
重叠 I/O 与事件
阻塞 I/O 的缺点是为每个活动文件句柄运行多个线程实例。性能会因上下文切换所需的时间而受到影响,对于大量线程来说,这可能导致所谓的“线程抖动”。即使一个线程已准备好运行,它也可能无法运行。其次,没有策略可以指定哪个线程应该运行,这可能会导致特定文件句柄的 I/O 饥饿,因为 Windows 内核调度程序是为 CPU 性能而不是文件性能设计的,并且是非确定性的。
为了克服这个问题,可以实现一个线程来处理所有文件句柄。当然,性能会有所提高,因为上下文切换减少了,如果一个文件句柄已准备好服务,它可以立即利用 CPU。通过确定测试句柄的顺序,可以实现独立于调度程序的确定性行为(只要调度程序允许 I/O 线程运行)。
此类线程机制的一个示例在开源项目 SerialPortStream
中提供,位于 http://serialportstream.codeplex.com,该项目也由本文作者编写。请参阅 NativeSerialPort_CommOverlappedIo.cs
中的实现。
与阻塞 I/O 相比的优势
- 通过为所有 I/O 使用单个线程,降低了资源开销。
- 确定性行为,独立于内核调度程序。
缺点
- 无法随系统中 CPU 的数量进行扩展。
- 实现正确的代码很困难。
I/O 完成端口和 .NET 线程池
这是目前在 Windows 的 .NET 框架内实现异步 I/O 的最佳机制。它也被设计为 Windows 框架中性能最高的模型,如《Windows 内部机制(第六版)》第 8 章(I/O 完成端口)中所述。
不幸的是,关于如何在 .NET 中使用 I/O 完成端口的文档和代码示例非常少。MSDN 文档也非常有限。此处提供文档。下面将详细介绍其用法和陷阱。
与先前模型的优势
- 与现有的异步流概念(例如
FileStream
类)相似; - 可用于实现您自己的异步流,因为编程模型非常相似;
- 能够很好地随机器 CPU 数量进行扩展;
- 高性能。
- MSDN 文档不足,网络上示例很少(且不正确);
- 内存泄漏可能难以查找;
- 可能发生内存损坏。
I/O 完成端口和 .NET 线程池
我们将更详细地介绍最后一个选项。建议使用 .NET 反射器(如 ILSpy 或 Reflector)来研究 Microsoft 对 SerialPort
或 FileStream
类的实现。它们也在内部实现中使用 I/O 完成端口和 ThreadPools
。
使用 I/O 完成端口有四个步骤。虽然本文与 BeefyCode 的博客相似,但在使用 ReadFile()
和 WriteFile()
API 方面并不兼容。稍后将澄清在使用 Win32 API“发起异步 I/O”时。
创建重叠 I/O
CreateFile()
和 CreateNamedPipe()
方法提供了一个标志选项来指定 FILE_FLAG_OVERLAPPED
。但是,CreatePipe()
方法不允许指定这些标志,因此不能用于重叠 I/O。重叠 I/O 必须在任何类型的重叠 I/O(使用 Events 进行通知或 I/O 完成端口时)中使用。
作为 Process 类的解决方法,从 Dave Hart 创建了一个端口。C# 等效实现可以在下载的代码中找到,作为 Win32.CreatePipeEx()
方法。它使用 CreateNamedPipe()
创建命名管道以进行入站二进制数据,并使用 CreateFile()
API 打开写入端。
下面指定了方法及其原型,改编自 PInvoke.NET。枚举可以在下载源代码的 Native.cs
文件中找到。
internal static class UnsafeNativeMethods {
[DllImport("kernel32.dll", SetLastError = true, CharSet = CharSet.Unicode)]
public static extern SafeFileHandle CreateFile(
string lpFileName,
[MarshalAs(UnmanagedType.U4)] NativeMethods.FileAccess dwDesiredAccess,
[MarshalAs(UnmanagedType.U4)] NativeMethods.FileShare dwShareMode,
ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes,
[MarshalAs(UnmanagedType.U4)] NativeMethods.CreationDisposition dwCreationDisposition,
[MarshalAs(UnmanagedType.U4)] NativeMethods.FileAttributes dwFlagsAndAttributes,
IntPtr hTemplateFile);
[DllImport("kernel32.dll", SetLastError = true)]
public static extern SafeFileHandle CreateNamedPipe(string lpName,
[MarshalAs(UnmanagedType.U4)] NativeMethods.PipeOpenMode dwOpenMode,
[MarshalAs(UnmanagedType.U4)] NativeMethods.PipeMode dwPipeMode,
uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
uint nDefaultTimeOut, ref NativeMethods.SECURITY_ATTRIBUTES lpSecurityAttributes);
}
可以这样创建一个可以使用重叠 I/O 的文件句柄:
SafeFileHandle hFile;
string pipeName = string.Format("\\\\.\\Pipe\\{0:X}.{1},
SafeNativeMethods.GetCurrentProcessId(), Guid.NewGuid().ToString("D"));
hReadFile = UnsafeNativeMethods.CreateNamedPipe(pipeName,
NativeMethods.PipeOpenMode.PIPE_ACCESS_INBOUND |
(NativeMethods.PipeOpenMode)NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED,
NativeMethods.PipeMode.PIPE_TYPE_BYTE | NativeMethods.PipeMode.PIPE_WAIT,
1, nSize, nSize, 120 * 1000, ref lpPipeAttributes);
if (hReadFile.IsInvalid) return false;
在使用 CreateFile()
API 时,等效操作如下:
hWriteFile = UnsafeNativeMethods.CreateFile(pipeName,
NativeMethods.FileAccess.GENERIC_WRITE,
NativeMethods.FileShare.FILE_SHARE_NONE,
ref lpPipeAttributes,
NativeMethods.CreationDisposition.OPEN_EXISTING,
NativeMethods.FileAttributes.FILE_ATTRIBUTE_NORMAL |
NativeMethods.FileAttributes.FILE_FLAG_OVERLAPPED,
IntPtr.Zero);
重要的是在创建文件句柄时提供 FILE_FLAG_OVERLAPPED
标志作为选项之一。
绑定句柄和 I/O 完成端口
这是本文中最神秘的部分。有关更多信息,请参阅《Windows 内部机制(第六版)》第 8 章“I/O 完成端口”。这与《Windows 内部机制(第五版)》第 7 章相同。
I/O 完成端口是 Windows Executive 公开的一个对象,可以与多个文件句柄关联。也就是说,一个完成端口,多个文件句柄。与 I/O 完成端口关联的任何重叠文件操作都会导致 Windows 向完成端口发送一个完成包。多个线程可以等待同一个完成端口。完成端口提供了并发的优势,因此通常只有n个与 I/O 关联的线程在任何给定时间等待一个完成端口。
.NET 子系统根据系统中的 CPU 线程数指定并发线程数(例如,具有超线程的 4 核 CPU 有 8 个 CPU 线程)。这在创建完成端口时指定。
Windows 函数 CreateIoCompletionPort()
用于创建完成端口以及将文件句柄与完成端口关联。这可以通过 .NET 框架中的 Thread.BindHandle(SafeHandle handle)
方法完成。使用 Rohitab 的 API Monitor,我们可以看到以下内容:
模块 |
API |
返回值 |
clr.dll |
CreateIoCompletionPort ( 0xffffffffffffffff, NULL, 0, 8 ) | 0x0000000000000204 |
KERNELBASE.dll | NtCreateIoCompletion ( 0x00000000010ae528, IO_COMPLETION_ALL_ACCESS, NULL, 8 ) | STATUS_SUCCESS |
clr.dll |
CreateIoCompletionPort ( 0x00000000000001f0, 0x0000000000000204, 8792377772672, 8 ) | 0x0000000000000204 |
KERNELBASE.dll | NtSetInformationFile ( 0x00000000000001f0, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) | STATUS_SUCCESS |
clr.dll |
CreateIoCompletionPort ( 0x00000000000001f4, 0x0000000000000204, 8792377772672, 8 ) | 0x0000000000000204 |
KERNELBASE.dll | NtSetInformationFile ( 0x00000000000001f4, 0x00000000010ae600, 0x00000000010ae5f0, 16, FileCompletionInformation ) | STATUS_SUCCESS |
有两个对 Thread.BindHandle()
的调用,一个使用句柄 0x1f0,第二个使用句柄 0x1f4。可以看到,第一个调用中,.NET 框架调用了两次 CreateIoCompletionPort()
,一次创建完成端口,第二次将句柄绑定到完成端口 0x204。
因此,在创建文件句柄后,应将其绑定到完成端口,如 Process.AsyncConsoleReader.ctor()
中所述。如果忽略此步骤,.NET 框架将无法稍后发出回调以指示异步操作已完成。
public unsafe AsyncConsoleReader(SafeFileHandle streamHandle, string name) : base(name) {
m_StreamHandle = streamHandle;
ThreadPool.BindHandle(streamHandle);
ConsoleAsyncResult ar = new ConsoleAsyncResult(this);
DoReadOperation(ar);
}
需要 unsafe
关键字,因为构造函数会启动使用指针的读取操作。
创建 Overlapped 结构
支持重叠 I/O 的 Windows API 通常有一个参数 LPOVERLAPPED lpOverlapped
。此参数可以通过 Pack()
方法由 .NET Overlapped
类构成。
假设您有一个数组 byte[] buffer
,数据应异步读取到其中。您应该使用 Overlapped.Pack(IOCallback, buffer)
的第二个形式。MSDN 文档明确说明了原因:
运行时会在 I/O 操作的持续时间内固定(pin)缓冲区或缓冲区中指定的缓冲区。如果应用程序域被卸载,运行时会在 I/O 操作完成之前保持内存固定。
因此,未将 I/O 操作正在修改的缓冲区传递给 Pack()
方法是错误的。虽然可以使用 GC 固定缓冲区,但如果应用程序域在结束时结束(例如,您的程序在异步 I/O 操作期间结束),您仍然可能遇到数据损坏。创建 NativeOverlapped
结构的方法更简单且安全。buffer
将在调用 Overlapped.Unpack()
之前保持固定,并且 NativeOverlapped
结构占用的内存将在调用 Overlapped.Free()
之前保持。
在创建 NativeOverlapped
结构之前,需要先创建一个 Overlapped
对象。对于 Process 类的实现,我们不需要使用 Event
s(如果您实现支持 BeginRead()
和 EndRead()
的 Stream
,则可能需要)。应使用 Overlapped
构造函数的最通用形式,该形式允许提供一个类型为 IAsyncResult
的通用对象。
IAsyncResult
是必需的,以允许将通用数据从启动异步 I/O 的调用者传递到回调,回调应实现为静态方法(因此无法访问 this
对象)。举个例子:
private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
AsyncConsoleReader acr = (AsyncConsoleReader)ar.AsyncState;
// Buffer.EndArray is fixed automatically by the Pack() method.
NativeOverlapped* noverlapped =
new Overlapped(0, 0, IntPtr.Zero, ar).Pack>(ReadCompletionCallback, acr.Buffer.EndArray);
..
}
发起异步 I/O
创建 NativeOverlapped
结构后,可以将其传递给执行异步 I/O 的方法,例如 ReadFile()
函数。为了简化封送,使用了 fixed
关键字,这在 C# 中需要 unsafe
。
ReadFile()
的原型定义为:
[DllImport("kernel32.dll", SetLastError = true)]
public unsafe static extern bool ReadFile(SafeFileHandle hFile, byte* lpBuffer,
uint nNumberOfBytesToRead, IntPtr lpNumberOfBytesRead, NativeOverlapped* lpOverlapped);
这允许高效地使用内存缓冲区,直接写入队列,避免了复制操作。实现方式是始终写入队列中的下一个可用字节,而不是写入特殊缓冲区,然后再复制到队列中。通过实现一个由数组组成的链表队列,我们可以限制 GC 锁定的内存量,并允许 GC 本身进行高效的内存处理。请参阅提供的源代码中的 PagedQueue<>
实现。
然后使用以下命令启动写入队列的读取操作:
private static unsafe void DoReadOperation(ConsoleAsyncResult ar) {
..
bool result;
fixed (byte* pBuf = acr.Buffer.EndArray) {
result = UnsafeNativeMethods.ReadFile(acr.m_StreamHandle, pBuf + acr.Buffer.End,
(uint)acr.Buffer.WriteLength, IntPtr.Zero, noverlapped);
}
if (!result) {
int error = Marshal.GetLastWin32Error();
if (error != 997) {
ReadCompletionCallback((uint)error, 0, noverlapped);
}
}
// else, the callback is still executed
}
如果发生错误且该错误不指示 ERROR_IO_PENDING
(错误代码 997),则应释放 NativeOverlapped
结构。这是通过调用 ReadCompletionCallback
(参见下一节)并提供错误来完成的。这为错误处理提供了一个中心位置。
实现异步 Stream 时,上述所有内容都将在 BeginRead()
方法中完成。
陷阱
需要特别说明的是,没有明显的文档说明 ReadFile()
操作的行为,并且在《Windows 内部机制》中只有一点提示。
ReadFile()
方法在操作是*同步*且成功时返回 true。如果成功,则无需执行任何操作,I/O 完成端口仍会收到 I/O 完成包并执行回调。在这种情况下,错误是释放结构,如 BeefyCode 所述。
第一次实现是在成功时调用 Overlapped.Unpack()
和 Overlapped.Free()
,然后再次调用 ReadFile()
以启动新操作。然而,观察到(至少在 Windows 8 上)在同步读取操作之后,不再会发生回调。
如果您实现自己的 Stream
,则应在 IAsyncResult
中设置适当的字段,以指示操作是同步且成功的。回调仍会被调用。
至于《Windows 内部机制》中的提示,有一个函数 SetFileCompletionNotificationModes()
API 可以改变这种行为。但是,作者没有使用此函数,也没有对其进行测试,因为它似乎是 Windows Vista 及更高版本的新增功能(例如,它不适用于 Windows XP)。其次,《Windows 内部机制》和 MSDN 的文档不匹配。
回调处理
异步 I/O 操作完成后,会排队一个 I/O 完成包。.NET 线程池中与 I/O 完成端口关联的 .NET 线程会收到通知,并执行 Pack()
方法提供的回调。所有相关信息都以错误代码、读取(或写入)的字节数以及 NativeOverlapped
结构的形式提供。
从 NativeOverlapped
结构中,可以使用 Unpack()
方法获取原始 Overlapped
结构。这会解除 Pack()
方法中提供的 buffer
的固定。从 Overlapped 结构中,可以获取 AsyncState
对象,该对象是 IAsyncResult
类型。
如果实现异步 Stream
,Stream
将调用用户回调,允许他们调用 EndRead()
并可能启动对 BeginRead()
的新调用。由于 Process 类完全在内部管理异步 I/O,将其数据作为同步流呈现给外部,因此 Process 类创建的 AsyncState
对象包含启动新读取操作所需的所有必要信息。
对于每个异步操作,确保调用 Overlapped.Unpacked(nativeOverlapped)
然后调用 Overlapped.Free(nativeOverlapped)
非常重要。Microsoft 的 SerialPort
实现通过在回调中调用 Unpacked()
,并在用户调用 EndRead()
时调用 Free()
来做到这一点。这就是为什么 Microsoft 声明如果不调用异步流的 EndRead(),可能会发生内存泄漏。
Process 类中完成回调的代码如下所示:
private static unsafe void ReadCompletionCallback(uint errorCode, uint numBytes,
NativeOverlapped* nativeOverlapped) {
ConsoleAsyncResult ar;
try {
// Unpin the NativeOverlapped structure by unpacking it
ar = (ConsoleAsyncResult)Overlapped.Unpack(nativeOverlapped).AsyncResult;
if (errorCode == 0) {
if (numBytes > 0) {
AsyncConsoleReader cr = (AsyncConsoleReader)ar.AsyncState;
cr.WriteBuffer((int)numBytes);
}
} else {
// 6 - ERROR_INVALID_HANDLE
// 109 - ERROR_BROKEN_PIPE
// 995 - ERROR_OPERATION_ABORTED
if (errorCode == 109 || errorCode == 6 || errorCode == 995) {
cr.m_StreamHandle.Close();
} else {
System.Diagnostics.Trace.WriteLine("ReadCompletionCallback: error " + errorCode + " for " + cr.Name);
}
}
} finally {
Overlapped.Free(nativeOverlapped);
}
if (errorCode == 0) DoReadOperation(ar);
}
我们看到我们总是 Unpack()
该结构并释放与 NativeOverlapped
结构关联的内存。如果没有错误,我们启动一个新的读取操作,本质上是读取直到远程管道关闭,或者直到 I/O 被取消。
观察
在 ReadCompletionCallback() 方法中,有一个对用户事件的隐藏调用,作为 cr.WriteBuffer((int)numBytes) 的一部分。由于调用了用户委托,该用户委托可以阻止进一步的 I/O,因为下一个 DoReadOperation() 仅在用户委托完成后才会发生。
这是 ReadLine(int timeout)
方法在 DataReceived
事件中调用用户委托时遇到的一个初始问题。理论上,委托可以调用 ReadLine(1000)
。如果一行不完整,它应该等待更多数据直到 1000ms 超时到期。但是,如果不再有 I/O,它就无法做到这一点。因此,OnDataReceived 事件在 .NET 线程池中的一个线程上启动事件。如果在事件执行期间有新数据到达,它将被记住,如果仍有数据可用,则会触发新事件。
取消 I/O
Dispose Process 对象会导致调用 CancelIoEx()
,以确保取消所有异步 I/O 操作的关闭。这将导致回调被调用,并附带错误 995 ERROR_OPERATION_ABORTED
。
SafeHandles
SafeHandle
是使用异步 I/O 的关键功能。它可以在句柄被关闭时保护我们(通过使用 hFile.Close()
而不是 CloseHandle()
)。如果句柄当前被 Windows API 使用,由于内部引用计数,它不会立即关闭。如果关闭的句柄传递给 Windows 函数,将引发一个异常,指示句柄已关闭。
更多信息
如正文中提供,可以通过反射学到很多东西:
- ILSpy 用于分析 FileStream 和 SerialStream。
- 《Windows 内部机制(第六版)》第 8 章“I/O 完成端口”。
- MSDN:同步和重叠管道 I/O
- BeefyCode:从托管代码使用重叠 I/O
- MSDN Blog:应用程序域(2003 年 6 月 4 日的评论),指示了完成端口的使用。
- Windows Developer Center:Overlapped.Pack,Overlapped.ctor
后续
以下项目可供后续跟进:
- 如何发送 I/O 包以指示线程应中止,如《Windows 内部机制》中所述?
- 取消 I/O 的最有效方法是什么?AsyncConsoleReader 和 AsyncConsoleWriter 中的
Dispose()
方法调用 CancelIoEx(),这可能已被其他地方关闭(事实上,try { } catch { } 块涵盖了这种情况。 - MS Serial Port 通常检查错误代码 6,
ERROR_INVALID_HANDLE
。
关注点
花了大约两天时间,进行了大量嗅探,最后是偶然的运气,才找到了正确使用信息以及处理 ReadFile()
指示同步成功(返回 true)的情况,以了解为什么不再发生读取操作。
历史
- 2013 年 1 月 7 日:初始版本