异步输入/输出和 .NET 的异步编程模型






4.43/5 (8投票s)
一篇解释线程如何异步执行 I/O 的文章
引言
与其它处理相比,输入和输出本质上是缓慢的。这种缓慢是由于几个因素造成的
- 由随机存取设备(如硬盘和 CD-ROM)的磁道和扇区寻道时间引起的延迟。
- 由物理设备和系统内存之间缓慢的数据传输速率引起的延迟。
- 使用文件服务器、存储区域网络等进行的网络数据传输延迟。
Process Explorer 和性能计数器等工具可以以速率或实际数量来计算每秒字节数,以此作为评估数据传输的标准。在 Windows 中,当 I/O 是线程同步的,线程会一直等待直到整个 I/O 操作完成。如果比较硬盘的速度和 CPU 的速度,那么这些 I/O 操作本质上是缓慢的,可能足以引起 I/O 冲突。当调度程序不得不执行另一个进程中的另一个线程时,就会发生上下文切换。这是一个开销很大的操作:处理器中的页表需要重置;处理器的缓存需要重置。转换查找缓冲区会发生变化。更重要的是,这些缓存经常用于存储要执行的操作系统代码,而不是从内存位置获取它们。虽然系统代码执行不是线性的,并且会被抢占,但处理器架构决定了算法会计算有多少系统代码可以存储在微处理器的缓存和管道中。开发人员发现有必要将应用程序分解为多线程应用程序。多线程应用程序可以利用双核微处理器并实现可扩展性。例如,如果您打印 Microsoft Word 文档,一个线程会运行后台打印程序,但您可以在打印时编辑文档。这些操作实际上是计算操作与 I/O 操作的重叠。当执行异步计算密集型操作时,它可以使用其他线程执行。当执行异步 I/O 密集型操作时,您有一个 Microsoft 设备驱动程序在为您工作,而不需要任何线程。本文旨在解释线程如何在不等待操作完成的情况下继续执行。也就是说,线程可以执行异步 I/O。
严格来说,线程是一种开销。创建线程并不经济高效:需要分配和初始化一个线程内核对象,因为每个线程都有 1 MB 的地址空间(按需分配)用于其用户模式堆栈,另有 20 KB(或根据 MSDN 为 12 KB)分配给其内核模式堆栈。然后,在创建线程后,Windows 会调用进程中每个 DLL 的一个函数,通知每个 DLL 新线程已创建。销毁线程也是一种开销:进程中的每个 DLL 都会收到线程即将终止的通知,并且需要释放内核对象和堆栈。请记住,当 Windows 停止一个线程的代码执行并开始执行另一个线程的代码时,我们称之为上下文切换(功能的变化)。请记住,上下文切换涉及到进入内核模式。您必须将 CPU 的寄存器保存到当前执行线程的内核对象中。然后,系统必须获取一个自旋锁,确定要调度哪个线程,然后释放自旋锁。要准确计算 CPU 消耗,请使用 Process Explorer。转到“视图”菜单,选择“性能”,然后勾选三个选项:CPU 周期、CSwitch Delta 和上下文切换。这将提供有关哪些线程正在消耗 CPU 的更多信息,以便可以右键单击并暂停它们。请注意,任何操作系统的优势都取决于应用程序软件与系统软件的协同工作和接口程度。准确计算消耗过多 CPU 的线程可以防止这些线程浪费过多内存资源。或者过多的线程会降低系统性能。
CLR 的线程池
为了改善这种情况,CLR 包含了管理其自身线程池的代码。CLR 的线程池就像一组可供应用程序使用的线程。每个进程有一个线程池,并且该线程池被进程中的所有应用程序域共享。当 CLR 初始化时,线程池中没有线程。CLR 内部维护着一个操作请求队列。什么是队列?它有点像一条编织的绳子,存储项目,就像一个等待列表(粗略地说)。当应用程序想要执行异步操作时,您会调用一个方法,该方法会将条目添加到线程池的队列中。线程池的代码将从该队列中提取条目,并将条目分派给一个线程池线程。从 .NET 2.0 开始,工作线程和 I/O 完成线程的默认最大数量分别为 25 和 1000。一个设计良好的应用程序不应该需要每处理器 25 个线程。1000 个 I/O 完成线程相当于无限。如果 CLR 内部代码将条目分派给线程池线程时没有线程,则会创建一个线程。虽然创建线程会有性能损失,但当线程池线程完成任务时,这种损失会被抵消:线程不会被销毁,而是被返回到线程池,在那里它会空闲等待响应另一个请求。由于线程没有被销毁,性能损失就更小了。CPU 制造商目前使用两种技术:超线程和多核。两者都允许单芯片在 Windows 和应用程序看来像两个或更多的 CPU。CLR 用于管理线程池的代码能够在大约没有足够线程和有过多的线程之间取得一定的平衡。如果应用程序有很多任务,并且 CPU 可用,线程会创建更多的线程。如果应用程序的工作负载减少,线程池线程会自行终止。
在内部,CLR 将其线程分为工作线程或 I/O 线程。当应用程序请求线程池执行异步计算密集型操作(这可能包括初始化 I/O 密集型操作)时,会使用工作线程。I/O 线程用于在异步 I/O 密集型操作完成时通知您的代码。这意味着您正在使用 APM 进行 I/O 请求,例如访问文件、网络服务器、数据库、Web 服务或其他硬件设备。简而言之,线程池通过共享和回收线程来减少开销,从而允许在更细粒度的级别上应用多线程,而不会产生性能损失。进入线程池的最简单方法是调用 ThreadPool.QueueUserWorkItem
,而不是实例化和启动 Thread
对象。
using System;
using System.Threading;
class Test {
static void Main() {
ThreadPool.QueueUserWorkItem (Go);
ThreadPool.QueueUserWorkItem (Go, 123);
Console.ReadLine();
}
static void Go (object data)
{
Console.WriteLine("From the thread pool!" + data);
}
}
输出
From the thread pool!
From the thread pool!123
使用线程池执行异步计算密集型操作
计算密集型操作需要计算或执行计算。计算 Excel 电子表格中的单元格、校对 Word 文档中的语法,都是计算密集型操作的例子。它们不应执行任何同步 I/O 操作,因为所有同步 I/O 操作都会挂起调用线程,同时底层硬件(硬盘、网卡等)执行工作。挂起的线程是未运行但正在使用系统资源的线程。要将异步计算密集型操作排队到线程池,通常需要调用 ThreadPool
类的一个方法:
-
static Boolean QueueUserWorkItem(WaitCallback callBack);
-
static Boolean QueueUserWorkItem(WaitCallback, callBack, Object, state);
-
static Boolean UnSafeQueueUserWorkItem(WaitCallback callBack, Object, state);
这些方法会将一个“工作项”排队到线程池的队列中,然后所有这些方法都会立即返回。工作项是由 callBack 参数标识的一个方法,该方法将被线程池调用。请看以下代码示例:
using System;
using System.Threading;
public static class Program {
public static void Main() {
Console.WriteLine("Main thread: queuing an asynchronous operation");
ThreadPool.QueueUserWorkItem(ComputeBoundOp, 5);
Console.WriteLine("Main thread: Doing other work here...");
Thread.Sleep(10000); // Simulating other work (10 seconds)
Console.ReadLine();
}
// This method's signature must match the WaitCallback delegate
private static void ComputeBoundOp(Object state) {
// This method is executed by a thread pool thread
Console.WriteLine("In ComputeBoundOp: state={0}", state);
Thread.Sleep(1000); // Simulates other work (1 second)
// When this method returns, the thread goes back
// to the pool and waits for another task
}
}
理解异步编程
异步编程基本上是允许代码的一部分在单独的线程上执行。这被称为异步编程模型。要启动异步操作,您需要调用某个 BeginXxx 方法。所有这些方法都会将所需的操作排队,并返回一个 IAsyncResult
对象,该对象标识了待处理的操作。要获取操作的结果,您需要调用相应的 EndXxx 方法,并将 IAsyncResult
对象传递给它。例如,System.IO
命名空间下的 FileStream
类有一个 Read
方法,用于从流中读取数据。为了支持 APM 模型,它还支持 BeginRead
和 EndRead
方法。使用 BeginReadXxx 和 EndReadXxx 方法的模式允许您异步执行方法。请看下面的代码:
using System;
using System.Threading;
using System.IO;
class Test {
static void Main() {
byte[] buffer = new byte[100];
string filename = string.Concat(Environment.SystemDirectory, "\\kernel32.dll");
FileStream fs = new FileStream(filename,
FileMode.Open, FileAccess.Read, FileShare.Read, 1024, FileOptions.Asynchronous);
// make the IAsyncResult call
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, null, null);
// do some work while you wait
// Calling EndRead will block until the Async work is complete
int numBytes = fs.EndRead(result);
fs.Close();
Console.WriteLine("Read {0} Bytes", numBytes);
Console.WriteLine(BitConverter.ToString(buffer));
}
}
输出
Read 100 Bytes
4D-5A-90-00-03-00-00-00-04-00-00-00-FF-FF-00-00-B8-00-00-00-00-00-00-00-40-00-00
-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-0
0-00-00-00-00-00-00-F0-00-00-00-0E-1F-BA-0E-00-B4-09-CD-21-B8-01-4C-CD-21-54-68-
69-73-20-70-72-6F-67-72-61-6D-20-63-61-6E-6E-6F-74-20-62-65
会合模型:等待完成技术
如果我们没有在调用 BeginRead
和 EndRead
之间放置一些代码,下面这段代码将不会有效地使用 APM。APM 的一些价值体现在,当字节正在从文件流中读取时,这段其他代码会执行。当您调用 BeginXxx 然后立即调用 EndXxx 方法时,APM 的使用效率不高,因为调用线程只是休眠以等待操作完成。
using System;
using System.IO;
using System.Threading;
public static class Program {
public static void Main() {
// Open the file indicating asynchronous I/O
FileStream fs = new FileStream(@"C:\windows\system32\autoexec.NT", FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
Byte[] data = new Byte[100];
// Initiate an asynchronous read operation against the FileStream
IAsyncResult ar = fs.BeginRead(data, 0, data.Length, null, null);
// Executing some other code here would be useful...
// Suspend this thread until the asynchronous
// operation completes and get the result
Int32 bytesRead = fs.EndRead(ar);
// No other operations to do, close the file
fs.Close();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead);
Console.WriteLine(BitConverter.ToString(data, 0, bytesRead));
}
private static void ReadMultipleFiles(params String[] pathnames) {
AsyncStreamRead[] asrs = new AsyncStreamRead[pathnames.Length];
for (Int32 n = 0; n < pathnames.Length; n++) {
// Open the file indicating asynchronous I/O
Stream stream = new FileStream(pathnames[n], FileMode.Open,
FileAccess.Read, FileShare.Read, 1024,
FileOptions.Asynchronous);
// Initiate an asynchronous read operation against the Stream
asrs[n] = new AsyncStreamRead(stream, 100);
}
// All streams have been opened and all read requests have been
// queued; they are all executing concurrently!
// Now, let's get and display the results
for (Int32 n = 0; n < asrs.Length; n++) {
Byte[] bytesRead = asrs[n].EndRead();
// Now, it is OK to access the byte array and show the result.
Console.WriteLine("Number of bytes read={0}", bytesRead.Length);
Console.WriteLine(BitConverter.ToString(bytesRead));
}
}
private sealed class AsyncStreamRead {
private Stream m_stream;
private IAsyncResult m_ar;
private Byte[] m_data;
public AsyncStreamRead(Stream stream, Int32 numBytes) {
m_stream = stream;
m_data = new Byte[numBytes];
// Initiate an asynchronous read operation against the Stream
m_ar = stream.BeginRead(m_data, 0, numBytes, null, null);
}
public Byte[] EndRead() {
// Suspend this thread until the asynchronous
// operation completes and get the result
Int32 numBytesRead = m_stream.EndRead(m_ar);
// No other operations to do, close the stream
m_stream.Close();
// Resize the array to save space
Array.Resize(ref m_data, numBytesRead);
// Return the bytes
return m_data;
}
}
}
输出
Number of bytes read=100
40-65-63-68-6F-20-6F-66-66-0D-0A-0D-0A-52-45-4D-20-41-55-54-4F-45-58-45-43-2E-42
-41-54-20-69-73-20-6E-6F-74-20-75-73-65-64-20-74-6F-20-69-6E-69-74-69-61-6C-69-7
A-65-20-74-68-65-20-4D-53-2D-44-4F-53-20-65-6E-76-69-72-6F-6E-6D-65-6E-74-2E-0D-
0A-52-45-4D-20-41-55-54-4F-45-58-45-43-2E-4E-54-20-69-73-20
APM 的轮询技术和回调技术
轮询
方法与等待完成技术类似,只是代码会轮询 IAsyncResult
以查看它是否已完成。以下代码提供了一个示例:
using System;
using System.IO;
using System.Threading;
class Test {
static void Main() {
byte[] buffer = new byte[100];
string filename = string.Concat(Environment.SystemDirectory, "\\kernel32.dll");
FileStream fs = new FileStream(filename, FileMode.Open,
FileAccess.Read, FileShare.Read, 1024, FileOptions.Asynchronous);
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length, null, null);
//Poll to see if complete
while (!result.IsCompleted)
{
// do some more work if it isn't completed
Thread.Sleep(100);
}
//Finished, so we can call EndRead and it will return without blocking
int numBytes = fs.EndRead(result);
// close the stream
fs.Close();
Console.WriteLine("Read {0} Bytes", numBytes);
Console.WriteLine(BitConverter.ToString(buffer));
}
}
通过调用 BeginRead
返回的 IAsyncResult
对象上的 IsCompleted
属性,我们可以继续执行必要的工作,直到操作完成。回调模型要求我们指定一个回调方法,并包含回调中所需的任何状态。请看以下代码:
static byte[] buffer = new byte[100];
static void TestCallbackAPM()
{
string filename = string.Concat(Environment.SystemDirectory, "\\kernel32.dll");
FileStream fs = new FileStream(filename, FileMode.Open,
FileAccess.Read, FileShare.Read, 1024, FileOptions.Asynchronous);
IAsyncResult result = fs.BeginRead(buffer, 0, buffer.Length,
new AsyncCallback(CompleteRead), fs);
}
在此模型中,我们创建了一个新的 AsyncCallback
委托,指定了一个操作完成时要调用的方法(在另一个线程上)。此外,我们还指定了一个可能需要的对象作为调用的状态。在此示例中,我传入了 stream
对象,因为我需要调用 EndRead
并关闭 stream
。
参考文献和推荐阅读
- CLR via C#,作者:Jeffrey Richter
历史
- 2009 年 3 月 28 日:首次发布