65.9K
CodeProject 正在变化。 阅读更多。
Home

线程的简洁概述

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (56投票s)

2018年8月2日

CPOL

35分钟阅读

viewsIcon

68991

downloadIcon

484

讨论了各种线程方法,包括锁、互斥体、信号量、并发集合、工作队列、线程、PLINQ、TPL、异常处理和取消令牌

目录

引言

最近有人要求我提供关于 C# 中如何使用线程的培训。我以为会是一次相当简单的讨论,结果却变成了对不同线程方法的详细分析,以及取消任务和异常处理等重要主题。所以,就别指望我用我 30 多年的经验来判断一个项目的范围了,即使那个“项目”是一篇文章!这里的新之处在于,虽然这些内容以前都讨论过,但我希望这篇文章能提供一些有趣的见解和覆盖面,这些内容可能以前都没有在一篇文章中涵盖过。线程、任务、取消、异常等等是一个庞大的主题,关于它已经写了整本书。这篇文章提供了一个不完整但希望能做到简洁的概述。有很多例子,重点是你在 90% 的时间里需要知道的基本知识,而不是在剩下的 10% 的时间里可能需要的所有其他无关紧要的东西。

什么是线程?

我想我应该先简单描述一下什么是线程,尽管说实话,我假设你actually知道什么是线程。但是,如果有人问你“什么是线程?”,你是否能够提供一个简洁而正确的定义?维基百科11将线程定义为:“……执行线程是调度程序可以独立管理的最小程序指令序列,调度程序通常是操作系统的一部分。”

所以,首先,“线程”这个词实际上是“执行线程”的缩写。其次,它包含了调度程序在“程序指令序列”之间切换的概念。这个《Windows 内部机制,第 5 版》12的示例章节,是对 Windows 中调度程序的详细技术描述(请注意,此示例章节写于 2009 年,而《Windows 内部机制》现已出到第 7 版)。

什么是纤程?

再次,根据维基百科13:“……纤程使用协作式多任务处理,而线程使用抢占式多任务处理。”

协作式多任务处理与抢占式多任务处理

协作式多任务处理:“是一种计算机多任务处理的风格,在这种风格中,操作系统从不启动从正在运行的进程到另一个进程的上下文切换。相反,进程会定期自愿让出控制权,或者在空闲或逻辑阻塞时让出控制权,以便能够并发运行多个应用程序。”14

抢占式多任务处理:“……涉及使用中断机制来挂起当前执行的进程,并调用调度程序来确定下一个应执行的进程。因此,所有进程在任何给定时间都会获得一定的 CPU 时间。”15

CPU密集型线程与 I/O 密集型线程

总的来说,有两种类型的线程或任务。CPU 密集型任务(或线程)指的是 100% 在本地 CPU 上执行的代码并且该代码不等待任何其他东西。这里“任何其他东西”指的是各种事情:

  • 硬件中断(通常您不再需要处理这个层面的事情了)
  • 远程服务器请求的完成(非常常见——例如,从数据库请求数据和调用 Web API)
  • 等待另一个线程(相当常见)
  • 等待 I/O 完成事件(通常指将数据作为流写入硬件设备,或将数据作为流从硬件设备读取,例如串行端口流类18

I/O 密集型线程或任务则相反——虽然它可能进行一些处理,但最有可能花费大部分时间等待其他某事发出信号,表明数据已准备好进行处理。“其他某事”就是上面的列表。

构思一个好的线程示例

当然,第一个问题是构思一个好的例子,其中工作可以分解成独立的块,并且需要合理的计算时间,这样你才能真正看到不同方法的区别。我选择了蛮力“判断数字是否为素数”的算法。我真的指的是蛮力

static bool IsPrime(int n)
{
  bool ret = true;
  for (int i = 2; i <= n / 2 && ret; ret = n % i++ != 0);
  return ret;
}

是的,实际上 for 循环中没有内容——它以 ; 结束,因为当 ret 变为 false 时,循环就终止了。

显然,你永远不会这样写素数计算(一个更好的方法是埃拉托斯特尼筛法16),但这个算法的优点是:

  • 速度慢
  • 为每个数字返回 true/false

这在撰写关于线程细微差别的文章时是一个优势!

计时算法

为了计时,我写了一个简单的“测量此函数需要多长时间”的程序

static void DurationOf(Func<int> action, string section)
{
  var start = DateTime.Now;
  int numPrimes = action();
  var stop = DateTime.Now;

  lock (locker)
  {
    Console.WriteLine(section);
    Console.WriteLine("Number of primes is : " + numPrimes);
    Console.WriteLine("Total seconds = " + (stop - start).TotalSeconds);
  }
}

这个程序相当针对本文中的示例。

现在我们来谈谈锁。请注意上面代码中的 lock 语句。lock 确保每个 Console.WriteLine 不会被另一个也写入控制台的线程“中断”。lock 语句需要一个所有线程都可以访问的 object,它充当线程之间的同步对象。通常,同步对象是一个根级别的 Object

static object locker = new object();

历史上,锁的概念被称为“关键部分”,意味着一次只有一个线程可以进入该代码。原生 Windows 程序员应该熟悉 CRITICAL_SECTION4 结构和相关方法。

锁可能会很危险

  • 如果锁中的操作耗时很长,你的线程将会因为等待另一个线程释放锁而降低性能。
  • 很容易造成死锁,即线程 A 等待锁被释放,而线程 B 当前在锁内,却在等待线程 A 完成某项任务。

lock 语句的主体永远不应包含任何等待线程执行某些工作的内容。但是,锁在进行简单同步时很有用,特别是调试输出或同步对“事物”的访问(如硬件端口)时。否则,如果你的代码中存在 lock,那可能是一个很大的危险信号。

蛮力算法的辅助方法

这个方法被用来演示线程的各种蛮力算法变体,所以一次实现它很有用。

static int NumPrimes(int start, int end)
{
  int numPrimes = 0;
  for (int i = start; i < end; numPrimes += IsPrime(i++) ? 1 : 0);
  return numPrimes;
}

是的,又一个什么都不做循环,工作在 for 循环的迭代器部分完成。

蛮力算法

在这里,我们想找出 2 到 500,000 之间有多少个素数。我选择 500,000 作为上限,因为它在我机器上需要大约 35 秒来确定有 41,538 个素数。

调用算法

DurationOf(BruteForce, "Brute force:");

实现

static int BruteForce()
{
  int numPrimes = NumPrimes(2, MAX);

  return numPrimes;
}

结果

Brute force:
Number of primes is : 41538
Total seconds = 30.1119874

带线程的蛮力算法

让我们尝试通过将工作分解成块,并为每个线程(最多到处理器数量)分配一块来优化它。我们将平均分配这些块。精明的读者会意识到这并非最优,但我们将说明原因,并讨论多线程的一个重要方面——确保你的工作负载是均衡的!

请记住,我接下来要展示的内容是相当老派的!

调用算法

DurationOf(ThreadedBruteForce, "Threaded brute force:");

实现

首先,设置线程

static int ThreadedBruteForce()
{
  List<(Thread thread, int threadNum, int start, int end)> threads = 
                 new List<(Thread thread, int threadNum, int start, int end)>();

  int numProcs = Environment.ProcessorCount;

  for (int i = 0; i < numProcs; i++)
  {
    int start = Math.Max(1, i * (MAX / numProcs)) + 1;
    int end = (i + 1) * (MAX / numProcs);
    var thread = new Thread(new ParameterizedThreadStart(BruteForceThread));
    thread.IsBackground = true;
    threads.Add((thread, i, start, end));
  }

  totalNumPrimes = 0;
  threads.ForEach(t => t.thread.Start((t.threadNum, t.start, t.end)));
  threads.ForEach(t => t.thread.Join());

  return totalNumPrimes;
}

工作线程

static void BruteForceThread(object parms)
{
  (int threadNum, int start, int end) parm = (ValueTuple<int, int, int>)parms;
  DurationOf(() =>
  {
    int numPrimes = NumPrimes(parm.start, parm.end);
    Interlocked.Add(ref totalNumPrimes, numPrimes);
    return numPrimes;
  }, $"Thread {parm.threadNum} processing {parm.start} to {parm.end}");
}

结果

Thread 0 processing 2 to 125000
Number of primes is : 11734
Total seconds = 3.8519907
Thread 1 processing 125001 to 250000
Number of primes is : 10310
Total seconds = 9.0879819
Thread 2 processing 250001 to 375000
Number of primes is : 9860
Total seconds = 12.963975
Thread 3 processing 375001 to 500000
Number of primes is : 9634
Total seconds = 16.4079704
Threaded brute force:
Number of primes is : 41538
Total seconds = 16.4119713

好的,很棒,我们将一个 35 秒的过程缩短到了 16 秒。但请注意,工作负载并未均衡分配。线程花费的时间不同。原因显而易见——我们要判断的数字越大,需要执行的除法就越多。所以对于较低范围内的数字,线程完成得更快

Thread 0 processing 2 to 125000
Number of primes is : 11734
Total seconds = 3.8519907

vs

Thread 3 processing 375001 to 500000
Number of primes is : 9634
Total seconds = 16.4079704

线程 Join

请注意,一旦线程启动,就有这条语句

threads.ForEach(t => t.thread.Join());

在这里,Join 方法会暂停执行,直到调用 Join 的线程完成。

请注意

  • 这可能导致当前线程无限期地暂停其操作。
  • 你不能在 UI 线程上这样做,因为 UI 将不再响应用户操作。

后台线程

请注意这条语句

thread.IsBackground = true;

指示线程是后台线程可以确保在应用程序退出时它会被终止。

如果你有一个不是后台线程的线程,并且你关闭了应用程序或以其他方式终止了它,该线程将继续作为进程存在(并运行)。

线程参数

请注意这条语句

var thread = new Thread(new ParameterizedThreadStart(BruteForceThread));

在这里,我们设置了实现线程以接受参数的方法。签名必须是[methodName](object parameters),这要求你在启动线程时将对象强制转换为正在传递的相同类型。

启动线程(我使用值元组作为参数)

threads.ForEach(t => t.thread.Start((t.threadNum, t.start, t.end)));

在实现线程的方法中强制转换对象(在此情况下为值元组)

(int threadNum, int start, int end) parm = (ValueTuple<int, int, int>)parms;

使用 Lambda 表达式避免强制转换 Object 参数

你可以使用 lambda 表达式来避免强制转换,它为当前参数值提供了闭包,如下所示:

for (int i = 0; i < numProcs; i++)
{
  int j = i;
  int start = Math.Max(1, i * (MAX / numProcs)) + 1;
  int end = (i + 1) * (MAX / numProcs);
  var thread = new Thread(() => BruteForceThread(j, start, end));
  thread.IsBackground = true;
  threads.Add((thread, i, start, end));
}

请注意,int j = i; 是闭包所必需的——否则,线程号(不要与线程 ID 混淆)始终是处理器数量(在我的示例中为 4,通常是)。

同样请注意,这样做是有效的,但并非必需

var thread = new Thread(new ThreadStart(() => BruteForceThread(j, start, end)));

为什么?因为 ThreadStart 被定义为一个委托:public delegate void ThreadStart();,所以 lambda 表达式是完全有效的。

实现线程的方法现在可以在方法签名中接受参数,从而避免了强制转换。

static void BruteForceThread(int threadNum, int start, int end)

使用 ThreadStart 委托会消耗大量时间!

这两行代码不等价

var thread = new Thread(() => BruteForceThread(j, start, end));

vs.

var thread = new Thread(new ThreadStart(() => BruteForceThread(j, start, end)));

请注意第二个版本中的时间差异

Thread 0 processing 2 to 125000
Number of primes is : 11734
Total seconds = 4.3607559
Thread 1 processing 125001 to 250000
Number of primes is : 10310
Total seconds = 10.5416041
Thread 2 processing 250001 to 375000
Number of primes is : 9860
Total seconds = 14.9788427
Thread 3 processing 375001 to 500000
Number of primes is : 9634
Total seconds = 19.2458287
Threaded brute force:
Number of primes is : 41538
Total seconds = 19.2458287

大约

  • 线程 0 耗时增加 0.5 秒
  • 线程 1 和 2 耗时增加 1.5 秒
  • 线程 3 耗时增加近 3 秒

这不仅仅是异常,这是一个持续可重复的差异。我没有找到任何关于为什么会这样讨论。

平衡蛮力算法的线程

请注意,工作线程的平衡性不佳——工作负载看起来是均衡分配的,因为我们将每 1/n(n 为处理器数量)的工作分配给每个线程,但由于工作的性质,CPU 核心的利用率并不高——数字范围较小的线程比数字范围较大的线程完成得快得多。

主动索取工作 vs. 指示线程做什么工作

关于你的线程是否优化的一个线索是:你是告诉你的线程做什么工作,还是你的线程主动索取工作?在上面的带线程的蛮力算法中,我们是告诉线程它应该做什么工作。在接下来的迭代中,线程将在工作可用时主动索取工作。

调用算法

DurationOf(ThreadedGetNextWorkItemBruteForce, "Threaded get next work item brute force:");

实现

static int ThreadedGetNextWorkItemBruteForce()
{
  List<(Thread thread, int threadNum)> threads = new List<(Thread thread, int threadNum)>();
  int numProcs = Environment.ProcessorCount;

  for (int i = 0; i < numProcs; i++)
  {
    var thread = new Thread(new ParameterizedThreadStart(NextWorkItemBruteForceThread));
    thread.IsBackground = true;
    threads.Add((thread, i));
  }

  totalNumPrimes = 0;
  nextNumber = 1;
  threads.ForEach(t => t.thread.Start(t.threadNum));
  threads.ForEach(t => t.thread.Join());

  return totalNumPrimes;
}

工作线程

static void NextWorkItemBruteForceThread(object parms)
{
  int threadNum = (int)parms;
  DurationOf(() =>
  {
    int numPrimes = 0;
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (IsPrime(n))
      {
        ++numPrimes;
      }
    }

    Interlocked.Add(ref totalNumPrimes, numPrimes);

    return numPrimes;
  }, $"Thread: {threadNum}");
}

结果

Thread: 3
Number of primes is : 10446
Total seconds = 13.2079996
Thread: 2
Number of primes is : 10378
Total seconds = 13.2079996
Thread: 0
Number of primes is : 10437
Total seconds = 13.2079996
Thread: 1
Number of primes is : 10277
Total seconds = 13.2079996
Threaded get next work item brute force:
Number of primes is : 41538
Total seconds = 13.2079996

请注意,现在每个线程的实际运行时间完全相同(这实际上是该特定测试运行的巧合,它们确实会略有不同)。另外,请注意我们现在有效地利用了线程——我们将处理时间缩短了 6 秒,因为每个核心都得到了充分利用,并且核心利用率要高得多。

Interlocked 和 Atomic

请注意这两行

while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
...
Interlocked.Add(ref totalNumPrimes, numPrimes);

这个工作线程索取工作的方式是简单地获取下一个要处理的数字——没有排队工作、信号量或其他复杂性。但是,为了获取下一个数字并更新总数,每个线程都必须确保它暂时阻止其他线程。否则,另一个线程可能会获得完全相同的数字,或者总数可能会同时更新,从而导致计数不正确。

.NET 有一个 Interlocked 类(在此处阅读更多内容 ),它确保操作是原子执行的,这意味着即使在更改多个字节的数据时,该操作也被视为一个单一的、同步的更改。本质上,它就像在操作周围写了一个 lock 语句,但它性能更高,因为生成的 IL 代码(以及最终的汇编代码)可以利用 CPU 指令来交换内存中的值,因为你正在做一些非常具体的事情。使用 lock 语句,编译器不知道你实际上在做什么,也无法为你优化代码。事实上,Interlocked 实现的方法与实际的 Intel CPU 指令1非常相似,这些指令是原子的,因此可以锁定。

更多“现代”的线程工作方式

如今,初级程序员可能甚至不会被教导有关 Thread 类的内容,而是被教导一种或多种启动线程的其他方法。然而,了解底层的 Thread 类很重要,因为在某些情况下,你绝对希望自己管理线程,而不是让 .NET Framework 为你管理线程,特别是与 .NET 的 ThreadPool 交互的线程(稍后会详细介绍)。以下是最常见的方法:

  • AsParallel - PLINQ(并行 LINQ)的 Enumerable 扩展
  • Task.Run - 强制异步方法
  • Task.Factory.StartNew8 - 等同于 Task.Run,但带有一些默认值
    • 没有取消令牌
    • 子任务无法附加
    • 默认任务调度程序
  • await - 调用具有续程的异步方法
  • QueueUserWorkItem - 排队工作
  • BackgroundWorker

大多数这些方法都使用 .NET 的 ThreadPool,除了“await”语法可能有所不同,在使用该语法时,你可以更精确地控制任务的设置方式。每种方法都有其细微之处,涉及:

  • 异常处理
  • 取消令牌
  • 进度报告
  • ThreadPool 的最佳实践

一个人确实需要理解这些细微差别以及它们如何影响性能和错误处理。

AsParallel

AsParallel 是并行 LINQ (PLINQ) 库中的几种查询扩展方法之一。我不会详细介绍 ParallelEnumerable 类中的各种方法,而是只看 AsParallel

调用算法

DurationOf(AsParallelGetNextWorkItemBruteForce, "AsParallel get next work item brute force:");

实现

static int AsParallelGetNextWorkItemBruteForce()
{
  int totalNumPrimes = 0;

  var nums = Enumerable.Range(2, MAX);
  nums.AsParallel().ForAll(n =>
  {
    if (IsPrime(n))
    {
      Interlocked.Increment(ref totalNumPrimes);
    }
  });

  return totalNumPrimes;
}

注意代码 nums.AsParallel().ForAll,它尝试对可枚举对象中的每个项,并行执行 ForAll 中声明的操作。

结果(不要与上面的结果进行比较,我目前在一个似乎比我的笔记本电脑原生运行更快的 VM 上)

AsParallel get next work item brute force:
Number of primes is : 41538
Total seconds = 11.7537395

与平衡的蛮力线程计时进行比较

Thread: 1
Number of primes is : 10326
Total seconds = 11.7124344
Thread: 2
Number of primes is : 10517
Total seconds = 11.7124344
Thread: 0
Number of primes is : 10502
Total seconds = 11.7124344
Thread: 3
Number of primes is : 10193
Total seconds = 11.7134212
Threaded get next work item brute force:
Number of primes is : 41538
Total seconds = 11.7304613

所以这很酷——计时基本相同。

请记住,AsParallel.ForAll 是一个同步操作——在所有数字都处理完之后,代码执行才会继续。虽然处理本身是异步的,但你的应用程序会等待 ForAll 完成。将 AsParallel 与可等待的任务(接下来讨论)混合使用毫无意义。

Task.Run

Task.Run 是实现任务化异步模式3(TAP)以快速创建工作线程的方法之一。这是所谓的“计算密集型”任务的一个例子,意思是任务是由 CPU 执行的,而不是程序等待设备(如指纹读取器)或连接(例如到数据库)返回结果。理解计算密集型和 I/O 密集型任务2之间的区别非常重要,特别是与由 ThreadPool 管理的线程有关,这些线程不应长时间阻塞。

调用算法

DurationOf(TaskRunGetNextWorkItemBruteForce, "Task.Run get next work item brute force:");

实现

static int TaskRunGetNextWorkItemBruteForce()
{
  int numProcs = Environment.ProcessorCount;
  totalNumPrimes = 0;
  nextNumber = 1;
  List<Task> tasks = new List<Task>();

  for (int i = 0; i < numProcs; i++)
  {
    var task = Task.Run(() => NextWorkItemBruteForceThread(i));
    tasks.Add(task);
  }

  Task.WaitAll(tasks.ToArray());

  return totalNumPrimes;
}

请注意 Task.WaitAll,它类似于线程的 Join——当前线程会阻塞直到所有任务完成。与 Join 一样,请谨慎使用!另外请注意,我们可以调用与平衡蛮力线程设置例程中执行计算的 NextWorkItemBruteForceThread(i) 相同的代码。

结果(再次不要与前面的结果进行比较,我目前正在另一台机器上运行)——这些结果与 AsParallel 和平衡的蛮力算法非常相似。

Thread: 4
Number of primes is : 10369
Total seconds = 11.9374993
Thread: 4
Number of primes is : 10351
Total seconds = 11.9374993
Thread: 4
Number of primes is : 10293
Total seconds = 11.9374993
Thread: 4
Number of primes is : 10525
Total seconds = 11.9374993
Task.Run get next work item brute force:
Number of primes is : 41538
Total seconds = 11.953198

使用 await

这是一个非常简单的例子,其中上述代码经过轻微修改以使用 await 关键字。使用 await(以及 async 关键字)具有以下效果:

  1. 该方法启动异步过程。
  2. 然后它立即返回给调用者。
  3. 当异步过程完成后,代码执行将继续执行 await 语句之后的代码。

这里有几个“陷阱”。首先,了解续程的执行上下文很有用。对于在应用程序线程(运行 UI 的线程)上调用 await 的 WinForm 应用程序,执行会被编组到应用程序(UI)线程上继续执行。如果你运行的是非 UI 应用程序,续程可能发生在分配给异步方法的同一线程上,也可能发生在新的线程上!此行为由 SynchronizationContext5,6 控制,并且讨论此类超出了本文的范围。有一系列很好的 Code Project 文章讨论了该主题:第一部分第二部分第三部分

其次,我认为“返回给调用者然后稍后继续”的心理上的弯路很难理解。原因在于,执行调用 async 方法的方法无法清楚地知道该方法会立即返回。这就是为什么对这些方法名的指导建议是在其后附加“Async”。

第三,当存在嵌套的 await 调用时,心理上的弯路会变得更加困难。

相反,Task 类提供了非常实用的功能,特别是关于返回结果以及在任务执行过程中发生异常时将异常传递回来的功能。

首先,非常简单的示例的实现

static int TaskAwaitGetNextWorkItemBruteForce()
{
  int numProcs = Environment.ProcessorCount;
  totalNumPrimes = 0;
  nextNumber = 1;
  List<Task> tasks = new List<Task>();

  for (int i = 0; i < numProcs; i++)
  {
    var task = DoWorkAsync(i);
    tasks.Add(task);
  }

  Task.WaitAll(tasks.ToArray());
  
  return totalNumPrimes;
}

static async Task DoWorkAsync(int threadNum)
{
  await Task.Run(() => NextWorkItemBruteForceThread(threadNum));
}

在这里,DoWorkAsync 启动工作,await 返回给调用者 TaskAwaitGetNextWorkItemBruteForce。这个示例过于简化,因为它没有续程。另外请注意 DoWorkAsync 返回一个 Task,但没有显式的 return myTask; 语句。async 方法可以返回 Task<TResult>Taskvoid,或(C# 7)实现公共 GetAwaiter 方法的类型7

接下来,让我们看一个更有用的例子——在这里,你会看到我们可以移除 NextWorkItemBruteForceThread 使用的全局变量,而是将算法的结果返回到 TResult 泛型类型中。这还消除了 Interlocked 调用:Interlocked.Add(ref totalNumPrimes, numPrimes),极大地改进了线程的封装性。相反,我们将通过对每个异步任务找到的素数求和来计算素数的总数。

算法实现

static int AwaitableBruteForceAlgorithm(object parms)
{
  int threadNum = (int)parms;
  int numPrimes = 0;

  DurationOf(() =>
  {
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (IsPrime(n))
      {
        ++numPrimes;
      }
    }

    return numPrimes;
  }, $"Thread: {threadNum}");

  return numPrimes;
}

实现

static int TaskAwaitGetNextWorkItemBruteForceWithReturn()
{
  int numProcs = Environment.ProcessorCount;
  nextNumber = 1;
  List<Task<int>> tasks = new List<Task<int>>();

  for (int i = 0; i < numProcs; i++)
  {
    var task = DoWorkWithReturnAsync(i);
    tasks.Add(task);
  }

  Task.WaitAll(tasks.ToArray());

  return tasks.Sum(t => t.Result);
}

static async Task<int> DoWorkWithReturnAsync(int threadNum)
{
  return await Task.Run(() => AwaitableBruteForceAlgorithm(threadNum));
}

请注意,这里的 DoWorkWithReturnAsync 方法现在需要一个 return 关键字!另外,我们不再使用全局的 totalNumPrimes,因为一旦所有任务完成,它们的总结果将被求和并返回。

让我们通过添加一个指示线程已完成的续程来使这个示例更有用一些,看看会发生什么。

实现

static int TaskAwaitGetNextWorkItemBruteForceWithReturnAndContinuation()
{
  int numProcs = Environment.ProcessorCount;
  nextNumber = 1;
  List<Task<int>> tasks = new List<Task<int>>();

  for (int i = 0; i < numProcs; i++)
  {
   Console.WriteLine("Starting thread " + i + " at " + 
                    (DateTime.Now - start).TotalMilliseconds + " ms");
   var task = DoWorkWithReturnAsyncAndContinuation(i);
    tasks.Add(task);
  }

  Task.WaitAll(tasks.ToArray());

    return tasks.Sum(t => t.Result);
}

static async Task<int> DoWorkWithReturnAsyncAndContinuation(int threadNum)
{
  var t = await Task.Run(() => AwaitableBruteForceAlgorithm(threadNum));

  lock (locker)
  {
    Console.WriteLine("Thread number " + threadNum + " finished.");
  }

  return t;
}

结果

Starting thread 0 at 0 ms
Starting thread 1 at 0 ms
Starting thread 2 at 0 ms
Starting thread 3 at 0 ms
Thread: 3
Number of primes is : 10431
Total seconds = 13.4916976
Thread number 3 finished.
Thread: 2
Number of primes is : 10205
Total seconds = 13.4916976
Thread number 2 finished.
Thread: 0
Number of primes is : 10442
Total seconds = 13.4916976
Thread number 0 finished.
Thread: 1
Number of primes is : 10460
Total seconds = 13.4916976
Thread number 1 finished.
await Task.Run get next work item with return brute force and continuation:
Number of primes is : 41538
Total seconds = 13.5073256

所以这段代码应该让你觉得不直观

static async Task<int> DoWorkWithReturnAsyncAndContinuation(int threadNum)
{
  var t = await Task.Run(() => AwaitableBruteForceAlgorithm(threadNum));

  lock (locker)
  {
    Console.WriteLine("Thread number " + threadNum + " finished.");
  }

  return t;
}

await 应该返回给调用者。它确实返回了——我们知道这一点,因为我们注意到任务是同时创建的——开始时间之间没有延迟。但是方法返回什么?它返回一个 Task<int>,而 return 语句是在续程之后。它不是续程的一部分,而当你仔细想想,续程本身不可能返回任何东西!此外,当在续程内部使用 t 时,它是我们等待的方法的返回(一个 int)。所以如果我们想输出素数的数量:我们会写

Console.WriteLine("Count = " + t);

而不是 t.Result。但在接收 Task<int> 的调用者中,我们必须使用 t.Result。哇。我只能说哇。是的,如果你的任务创建了另一个任务,则外部返回类型为 Task<Task>。如果是三层嵌套任务,则 return 类型将是 Task<Task<Task>>>。看到了我说的“脑力扭曲”了吗?

控制台输出(您的体验可能有所不同)

Starting thread 0 at 0 ms
Starting thread 1 at 6.3902 ms
Starting thread 2 at 6.3902 ms
Starting thread 3 at 6.3902 ms
Thread: 0
Number of primes is : 10428
Total seconds = 13.3906041
Thread number 0 finished.
Count = 10428
Thread: 2
Number of primes is : 10270
Total seconds = 13.3916069
Thread: 3
Number of primes is : 10382
Total seconds = 13.3916069
Thread number 3 finished.
Count = 10382
Thread: 1
Number of primes is : 10458
Total seconds = 13.3916069
Thread number 1 finished.
Count = 10458
Thread number 2 finished.
Count = 10270
await Task.Run get next work item with return brute force and continuation:
Number of primes is : 41538
Total seconds = 13.4070216

在 WinForm 应用程序中使用 await

在上面的控制台应用程序中,DurationOf 是一个阻塞线程的方法,它等待所有任务完成。这在 WinForm 应用程序中是完全不合适的

  1. 阻塞主 UI 线程会阻止应用程序的消息泵处理 Windows 消息。
  2. 从非 UI 线程调用 Invoke 来执行 UI 更新,会使用应用程序的消息泵将调用编组到主 UI 线程。
  3. 当主 UI 线程被阻塞时,消息不会被处理,并且在阻塞的 UI 线程和请求 Invoke 的线程之间会发生死锁。
  4. 调用 BeginInvoke 是异步执行 UI 线程,因此不应发生死锁。

但是

  1. 关键在于,希望使用 await 和 task而无需调用 Invoke 或 BeginInvoke
  2. await应该将控制权返回给 UI 线程
  3. 我们唯一需要使用 InvokeBeginInvoke 的情况是当我们更新非 UI 线程中的 UI 时,这种情况应尽量避免。
  4. 最重要的是,主 UI 线程不应被阻塞!

为实现这一点,还需要特别注意异步方法的嵌套方式,以便 await 立即返回到期望的调用者。我认为这是一个重要的观点——使用 asyncawait 要求你的方法调用层次结构具有正确的结构来处理立即返回给调用者。

任务初始化

我们将尽快启动线程,但这不能阻塞 UI 线程

public Form1()
{
  InitializeComponent();
  Shown += OnFormShown;
}

private async void OnFormShown(object sender, EventArgs e)
{
  List<Task<int>> tasks = TaskAwaitGetNextWorkItemBruteForceWithReturnAndContinuation();
  await Task.WhenAll(tasks);
  int numPrimes = tasks.Sum(t => t.Result);
  tbOutput.AppendLine("Number of primes is : " + numPrimes);
}

OnFormShown 中,启动所有任务的方法(TaskAwaitGetNextWorkItemBruteForceWithReturnAndContinuation)返回这些任务。然后我们等待这些任务完成 Task.WhenAll(tasks)。我们await 这个,它会立即退出 OnFormShown 方法——我们的 UI 线程没有被阻塞!请注意 Task.WhenAllTask.WaitAll 之间的区别!一旦任务完成,续程就会执行,对每个任务的结果进行求和,并显示找到的素数的总数。由于同步上下文是 WinForm 应用程序,续程本身会被编组回主 UI 线程,所以当我们向 tbOutput 添加行时,我们不需要自己使用 InvokeBeginInvoke 进行编组。

创建每个任务

protected List<Task<int>> TaskAwaitGetNextWorkItemBruteForceWithReturnAndContinuation()
{
  int numProcs = Environment.ProcessorCount;
  nextNumber = 1;
  List<Task<int>> tasks = new List<Task<int>>();
  DateTime start = DateTime.Now;

  for (int i = 0; i < numProcs; i++)
  {
    tbOutput.AppendLine("Starting thread " + i + " at " + 
                       (DateTime.Now - start).TotalMilliseconds + " ms");
    var task = DoWorkWithReturnAsyncAndContinuation(i);
    tasks.Add(task);
  }

  return tasks;
}

请注意,此方法实际上并未创建任务。它而是调用 DoWorkWithReturnAsyncAndContinuation。原因是让 DoWorkWithReturnAsyncAndContinuation 中的 await 返回给调用者,即上面的代码,以便创建下一个线程。我们甚至无法编写带有嵌入式 Task.Run() 的方法。

即使我们可以,await 也会立即返回给调用者。当任务完成时,续程将执行 for 循环的下一次迭代并启动下一个线程。由于没有其他线程在工作,所有工作都将由第一个线程完成!

相反,是 DoWorkWithReturnAsyncAndContinuation 方法实际上会创建任务。

protected async Task<int> DoWorkWithReturnAsyncAndContinuation(int threadNum)
{
  DateTime start = DateTime.Now;
  var t = await Task.Run(() => BruteForceAlgorithm(threadNum)); //.ConfigureAwait(true);

  DateTime stop = DateTime.Now;

  tbOutput.AppendLine("Continuation: Thread number " + threadNum + " finished.");
  tbOutput.AppendLine("Total seconds = " + (stop - start).TotalSeconds);
  tbOutput.AppendLine("Continuation: Count = " + t);

  return t;
}

由于 Task.Run 是从应用程序的 UI 线程调用的,因此续程在“捕获的上下文”——UI 线程上执行。这等同于添加 .ConfigureAwait(true)。如果我们使用 .ConfigureAwait(false),续程可能在捕获的上下文线程上执行,也可能不在。在这种情况下,UI 输出必须使用 InvokeBeginInvoke 进行编组。另外值得指出的是,我们不需要 lock UI 输出以确保行以正确的顺序写入。因为我们在捕获的上下文(UI 线程)上继续执行,并且只有一个 UI 线程,所以我们可以保证所有三个 AppendLine 调用是连续发生的,无论是否有其他任务在此期间完成。该任务的续程将被阻塞,直到我们的续程退出。

信号量和排队工作

线程如何获取工作的关键方面是线程如何获取工作。上述示例都使用简单的机制来获取下一个要测试“素数”的整数。

while ((n = Interlocked.Increment(ref nextNumber)) < MAX)

在现实世界中,这通常是不现实的。线程通常被实例化

  1. 异步执行某项任务,并在任务完成后,线程退出。
  2. 等待队列中出现工作,然后进行处理,处理完后,再次进入休眠状态。

第二种形式,等待工作出现,通常涉及使用信号量(来自铁路信号9)来指示工作已准备好。这里产生的复杂性是在应用程序不再需要线程执行工作时终止线程。信号量可以与“旧式”线程以及创建线程的更现代技术一起使用。我们将在以下示例中使用信号量与 Task 类。这些示例还引入了 System.Collections.Concurrent 命名空间中的 ConcurrentQueue 类。代码如下:

static void UsingSemaphores()
{
  Semaphore sem = new Semaphore(0, Int32.MaxValue);
  int numProcs = Environment.ProcessorCount;
  var queue = new ConcurrentQueue<int>();
  int numPrimes = 0;

  for (int i = 0; i < numProcs; i++)
  {
    Task.Run(() =>
    {
      while (true)
      {
        sem.WaitOne();

        if (queue.TryDequeue(out int n))
        {
          if( IsPrime(n))
          {
            Interlocked.Increment(ref numPrimes);
          }
        }
      }
    });
  }

  DurationOf(() =>
  {
    Enumerable.Range(2, MAX).ForEach(n =>
    {
      queue.Enqueue(n);
      sem.Release();
    });

    while (!queue.IsEmpty) Thread.Sleep(1);

    return numPrimes;
  }, "Threads using semaphores");
}

上述方法有三个部分。我将它们组合成一个方法的原因是方便闭包。第一部分,实例化任务,有两条重要线:

...
sem.WaitOne();

if (queue.TryDequeue(out int n))
...

在这里,线程被挂起,直到信号量发出工作就绪的信号。一旦收到信号,就从队列中移除工作(在本例中是下一个要处理的数字)。在 ConcurrentQueue 中,TryDequeue 是唯一可用的出队方法,它确保没有其他线程删除了队列项。有信号量的情况下,只有一个线程被释放,所以这不是问题,但如果出队工作之间没有同步,则会是个问题。

接下来,工作被入队。可以一次释放一个信号量,或者在入队工作后,给信号量一个释放计数。在示例中,我选择了“一次一个”的方法,因为我觉得这更典型。

Enumerable.Range(2, MAX).ForEach(n =>
{
  queue.Enqueue(n);
  sem.Release();
});

第三部分等待队列变为空——这是非常糟糕的做法,但我们想讨论为什么它错了,所以这是一个教学示例。

while (!queue.IsEmpty) Thread.Sleep(1);

理想情况下,让线程休眠,同时我们循环等待某事发生,这是一个非常糟糕的主意。相反,任务的完成应该使用信号量或任务的“wait”调用来信号化,但现在我们使用这段代码。

与可等待的 async 方法相比,使用信号量处理线程的复杂性之一是,虽然你可以轻松地将工作入队供线程提取,但你并不知道线程何时完成了工作。例如:

请注意,第二次计算素数的数量少了 4。这是因为,虽然队列为空,表明所有工作都已完成,但线程实际上尚未完成处理!

如前所述,另一个问题是告诉等待工作的线程,它应该终止,因为应用程序不再需要它了。我们可以通过入队一个 0 来作为一个简易标志来指示线程应终止。但我们必须为我们创建的线程数这样做!首先,我们跟踪任务:

...
List<Task> tasks = new List<Task>();

for (int i = 0; i < numProcs; i++)
{
  tasks.Add(Task.Run(() =>
  ...

其次,当一个 0 被出队时,任务退出。

...
if (queue.TryDequeue(out int n))
{
  if (n == 0)
  {
    break;
  }
...

第三,我们为每个任务入队“0”作为工作,并释放该计数对应的信号量——这是有效的,因为线程会退出,所以每个线程将恰好收到 1 个“0”来指示它应该终止。

for (int i = 0; i < numProcs; i++)
{
  queue.Enqueue(0);
}

sem.Release(numProcs);

最后,我们等待线程完成。

Task.WaitAll(tasks.ToArray());

现在我们已经成功地基于何时完成了工作而不是何时队列中没有要进行的工作来编写代码。这是一个非常重要的区别。另外请注意,我们也可以使用纯线程和 Thread.Join 来编写非常相似的代码。

互斥体、锁和信号量

理解 Mutex10 类和 lock 语句之间的区别也很重要。虽然互斥体(互斥)可能与锁语句相同,但有几个重要区别:

  • 互斥体可以设置超时
  • 互斥体可以跨应用程序边界

这与 lock 语句不同,lock 语句不能跨应用程序边界,也不支持超时。与信号量相反,互斥体强制执行线程标识,这意味着互斥体只能由获取它的线程释放。使用信号量,任何线程都可以 Release 信号量,以便当前或不同的线程从 WaitOne 调用中释放并继续执行。我们可以使用互斥体而不是 lock 语句

static void DurationOf(Func<int> action, string section)
{
  var start = DateTime.Now;
  int numPrimes = action();
  var stop = DateTime.Now;

  lock (locker)
  {
    Console.WriteLine(section);
    Console.WriteLine("Number of primes is : " + numPrimes);
    Console.WriteLine("Total seconds = " + (stop - start).TotalSeconds);
  }
}

更改将如下所示:

...
static Mutex mutex = new Mutex();
...
static void DurationOf(Func<int> action, string section)
{
  var start = DateTime.Now;
  int numPrimes = action();
  var stop = DateTime.Now;

  mutex.WaitOne();
  Console.WriteLine(section);
  Console.WriteLine("Number of primes is : " + numPrimes);
  Console.WriteLine("Total seconds = " + (stop - start).TotalSeconds);
  mutex.ReleaseMutex();
}

互斥体用于确保同一代码的互斥,而信号量通常用于等待工作并通过不同线程同时处理工作。此外,正如你从上面的代码中所看到的,它演示了一个非常简单的互斥,lock 语句更优越,因为它不需要实例化一个全局互斥体。

异常处理

虽然我很想声称我的线程从不抛出异常,但现实情况是必须处理异常。

线程类的异常处理

为了好玩,让我们修改蛮力方法,如果数字是素数就抛出异常!

static void NextWorkItemBruteForceThreadThrowsException(object parms)
{
  int threadNum = (int)parms;
  DurationOf(() =>
  {
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (IsPrime(n))
      {
        throw new Exception("Number is prime: " + n);
      }
    }

    return 0;
  }, $"Thread: {threadNum}");
}

在我们的控制台测试中运行此程序,我们会得到一团糟

但我们想要做的是提供一种更优雅地处理异常的方法。实现 UnhandledException

Thread.GetDomain().UnhandledException += (sndr, exargs) =>
{
  Console.WriteLine("Thread: " + (exargs.ExceptionObject as Exception)?.Message);
};

AppDomain.CurrentDomain.UnhandledException += (sndr, exargs) =>
{
  Console.WriteLine("AppDomain: " + (exargs.ExceptionObject as Exception)?.Message);
};

并不是我们真正想要的,因为控制台仍然会记录整个堆栈跟踪,更重要的是,线程中的未捕获异常将终止应用程序,这可能不是我们想要的。更好的解决方案是将线程方法包装在 try-catch 块中并提供异常处理器。

static void SafeThread(Action action)
{
  try
  {
    action();
  }
  catch (Exception ex)
  {
    Console.WriteLine("Exception: " + ex.Message);
  }
}

static void ThreadExceptionExample()
{
  List<(Thread thread, int threadNum)> threads = new List<(Thread thread, int threadNum)>();
  int numProcs = Environment.ProcessorCount;

  for (int i = 0; i < numProcs; i++)
  {
    // We changed the signature of NextWorkItemBruteForceThreadThrowsException slightly:
    // And we're no longer using a parameterized thread start because we have closure on i.
    var thread = new Thread(new ThreadStart(() => SafeThread(() => 
                            NextWorkItemBruteForceThreadThrowsException(i))));
    thread.IsBackground = true;
    threads.Add((thread, i));
  }

  totalNumPrimes = 0;
  nextNumber = 1;
  threads.ForEach(t => t.thread.Start());
  threads.ForEach(t => t.thread.Join());
}

现在我们得到

请注意,控制台应用程序也没有突然终止。

使用 Task 进行异常处理

这是设置运行会抛出异常的任务的代码,当数字为素数时

static int TaskAwaitGetNextWorkItemBruteForceThrowsException()
{
  int numProcs = Environment.ProcessorCount;
  totalNumPrimes = 0;
  nextNumber = 1;
  List<Task> tasks = new List<Task>();

  for (int i = 0; i < numProcs; i++)
  {
    var task = DoWorkAsyncThrowsException(i);
    tasks.Add(task);
  }

  Task.WaitAll(tasks.ToArray());

  return totalNumPrimes;
}

static async Task DoWorkAsyncThrowsException(int threadNum)
{
  await Task.Run(() => NextWorkItemBruteForceThreadThrowsException(threadNum));
}

这会导致通常的控制台日志混乱,并且控制台应用程序终止。

但是,如果我们这样做:

static async Task DoWorkAsyncThrowsException(int threadNum)
{
  try
  {
     await Task.Run(() => NextWorkItemBruteForceThreadThrowsException(threadNum));
  }
  catch (Exception ex)
  {
    Console.WriteLine(ex.Message);
  }
}

我们现在可以处理异常,应用程序不会终止。

如果你使用 Task.Wait 方法之一,被等待任务抛出的异常实际上非常复杂。在此处阅读更多内容“异常处理(任务并行库)”。稍微修改我们的示例:

static int TaskAwaitGetNextWorkItemBruteForceThrowsException()
{
  int numProcs = Environment.ProcessorCount;
  totalNumPrimes = 0;
  nextNumber = 1;
  List<Task> tasks = new List<Task>();

  for (int i = 0; i < numProcs; i++)
  {
    var task = DoWorkAsyncThrowsException(i);
    tasks.Add(task);
  }

  try
  {
    Task.WaitAll(tasks.ToArray());
  }
  catch (AggregateException ex)
  {
    Console.WriteLine(ex.Message);

    tasks.ForEachWithIndex((t, i) =>
    {
      Console.WriteLine("Task " + i);
      Console.WriteLine("Is canceled: " + t.IsCanceled);
      Console.WriteLine("Is completed: " + t.IsCompleted);
      Console.WriteLine("Is faulted: " + t.IsFaulted);
    });
  }

  return totalNumPrimes;
}

static async Task DoWorkAsyncThrowsException(int threadNum)
{
  await Task.Run(() => NextWorkItemBruteForceThreadThrowsException(threadNum));
}

请注意现在在将 try-catch 块移到 Wait 调用之后看到的输出:

请勿使用 async void!

声明为“async void”的方法没有 Task 对象——“task”中抛出的异常将直接在 SynchronizationContext 上引发。在控制台应用程序中,这意味着我们无法捕获异常,它由 AppDomain.CurrentDomain.UnhandledException 处理。这是一个例子:

static async void ThrowExceptionAsync()
{
  await Task.Run(() => NextWorkItemBruteForceThreadThrowsException(0));
}

static void AsyncVoidExceptionTest()
{
  try
  {
    ThrowExceptionAsync();
  }
  catch (Exception ex)
  {
    Console.WriteLine("AsyncVoidExceptionTest: " + ex.Message);
  }
}

请注意输出:

任务异常由 AppDomain 处理程序处理!将异常处理与此代码进行对比:

static async Task<int> ThrowExceptionAsync()
{
  await Task.Run(() => NextWorkItemBruteForceThreadThrowsException(0));

  return 0;
}

static async Task<int> AsyncVoidExceptionTest()
{
  try
  {
    await ThrowExceptionAsync();
  }
  catch (Exception ex)
  {
    Console.WriteLine("AsyncVoidExceptionTest: " + ex.Message);
  }

  return 0;
}

以及输出:

在这里,异常由我们的外部 await 处理。让人(无意中)困惑的是,当一个方法被声明为非 voidasync 方法时,它必须被 await。否则

但是 await 一个任务意味着该方法必须返回一个 Task 或被声明为 async void,这正是我们试图避免的!这可能导致父方法级联重构为 await 并具有 async Task<T> 的签名,一直到 Main,现在它可以被声明为 async void!如果你发现自己这样做,那么你在 async 方法的结构上做错了。在这个测试案例中,我通过在 Main 中调用来停止了级联:

Task.Run(() => AsyncVoidExceptionTest());

相反,我们可以这样做:

static async Task<int> Main(string[] args)
{
  await AsyncVoidExceptionTest();
  return 0;
}

但不能这样做:

static async void Main(string[] args)

这会导致编译器错误:“error CS4009: A void or int returning entry point cannot be async”(一个返回 void 或 int 的入口点不能是 async)。

重点是,避免(再次无意中)async void,但要仔细查看你的 async 任务是如何调用的,这样你就不会发现自己必须重构调用层次结构中的每个方法签名,一直到 Main

可等待的线程 - 使用 TaskCompletionSource 的混合方法

如果你想使用你自己创建的线程,而不是依赖于线程池,但仍然想利用 await 线程的能力怎么办?这可以通过 TaskCompletionSource 类来实现。此类包装了一个 Task 实例,你可以像使用 Task.Run 或类似的创建任务方法一样使用它。在下面的示例中,我们为每个线程设置一个 TaskCompletionSource,并将 TaskCompletionSource 实例传递给该线程。

static int HybridAwaitableThread()
{
  List<(Thread thread, int threadNum)> threads = new List<(Thread thread, int threadNum)>();
  List<TaskCompletionSource<int>> tasks = new List<TaskCompletionSource<int>>();
  int numProcs = Environment.ProcessorCount;

  for (int i = 0; i < numProcs; i++)
  {
    var thread = new Thread(new ParameterizedThreadStart(HybridThread));
    thread.IsBackground = true;
    threads.Add((thread, i));
    tasks.Add(new TaskCompletionSource<int>());
  }

  nextNumber = 1;
  threads.ForEachWithIndex((t, idx) => t.thread.Start((t.threadNum, tasks[idx])));
  Task.WaitAll(tasks.Select(t=>t.Task).ToArray());

  return tasks.Sum(t=>t.Task.Result);
}

现在,虽然上面的代码使用阻塞的 Task.WaitAll 来进行计时,但在不同的场景(例如,Windows 应用程序)中,调用 await Task.WhenAll(tasks.Select(t=>t.Task)) 以便等待线程完成不是阻塞的,这是完全有效的。

请注意,这一行:

threads.ForEachWithIndex((t, idx) => t.thread.Start((t.threadNum, tasks[idx])));

传递一个包含 TaskCompletionSource 实例的元组。在线程实现中,当线程完成时,我们使用 SetResult,“信号化”任务已完成。

static void HybridThread(object parms)
{
  (int threadNum, TaskCompletionSource<int> tcs) parm = 
                      (ValueTuple<int, TaskCompletionSource<int>>)parms;

  DurationOf(() =>
  {
    int numPrimes = 0;
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (IsPrime(n))
      {
        ++numPrimes;
      }
    }

    parm.tcs.SetResult(numPrimes);
    return numPrimes;
  }, $"Thread: {parm.threadNum}");
}

这里关键的一行是 parm.tcs.SetResult(numPrimes);,它完成了任务。

取消线程和任务

取消线程或任务是一个复杂的主题16,17,其本质在此描述。首先要确定的是你要取消什么:

  1. CPU 密集型线程,它不等待任何互斥体、信号量或其他“延迟”
  2. I/O 密集型线程,它等待异步 I/O 操作的完成
  3. 等待互斥体或信号量释放它的线程

这三种方法都可以利用 CancellationTokenSource 类,但令牌的使用方式根据上述三个选项而有所不同。

  1. CPU 密集型工作:通常轮询取消令牌。你可以选择:
    1. 操作取消:清理并退出线程
    2. 对象取消:设置一个实现 Cancel(或类似)方法的对象,以及对象内部取消工作的机制
  2. I/O 密集型工作:这需要注册一个回调,该回调会进而取消异步 I/O 操作。
  3. 互斥体/信号量:这需要设置一个传递给互斥体或信号量的等待句柄。在等待状态下取消线程会释放线程并允许其终止。

正确使用 OperationCanceledException 很重要,这样你的代码才能与库代码良好交互,反之亦然。要理解的一个重要事情是,取消是协作式的——请求取消并不意味着令牌的监听者必须实际停止。

最后,如果实现任务的代码库不观察 CancellationToken,则无法取消任务!如果你的代码实现了任务,请仔细考虑你是否应该自己观察取消令牌!

对于以下示例,假设我们希望将计算素数的处理时间限制为 1 秒,使用我们的蛮力算法。首先,我们应该为每个操作创建取消令牌。如果执行对象取消,一个令牌可以取消所有必需的对象。在这里,我们将重点关注取消操作,而不是对象。

取消线程

不要通过 thread.Abort() 杀死任务来取消任务。你冒着内存泄漏、死锁和其他恶劣影响的风险,因为你在不给线程清理机会的情况下终止它。相反,在这里,当我们实例化线程时,我们将一个取消令牌传递给每个线程进行检查。

static int CancelThreads()
{
  List<(Thread thread, int threadNum, CancellationTokenSource cts)> threads = 
    new List<(Thread thread, int threadNum, CancellationTokenSource cts)>();
  int numProcs = Environment.ProcessorCount;

  for (int i = 0; i < numProcs; i++)
  {
    var thread = new Thread(new ParameterizedThreadStart(CancellableThread));
    var cts = new CancellationTokenSource();
    thread.IsBackground = true;
    threads.Add((thread, i, cts));
  }

  totalNumPrimes = 0;
  nextNumber = 1;
  threads.ForEach(t => t.thread.Start((t.threadNum, t.cts)));

  // After 1 second, cancel our threads
  threads.ForEach(t => t.cts.CancelAfter(1000));
  threads.ForEach(t => t.thread.Join());

  return totalNumPrimes;
}

然后我们在 1 秒后请求取消每个线程。工作线程在每次迭代中检查其令牌。

static void CancellableThread(object parms)
{
  (int threadNum, CancellationTokenSource cts) parm = (ValueTuple<int, CancellationTokenSource>)parms;

  DurationOf(() =>
  {
    int numPrimes = 0;
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (parm.cts.IsCancellationRequested)
      {
        break;
      }

      if (IsPrime(n))
      {
        ++numPrimes;
      }
    }

    Interlocked.Add(ref totalNumPrimes, numPrimes);
    return numPrimes;
  }, $"Thread: {parm.threadNum}");
}

在上面的屏幕截图中,请注意线程并不完全在 1 秒时终止——蛮力算法在每次迭代之间需要一些时间!

取消任务

使用任务,我们可以优雅地退出任务:

  • 优雅地
  • 通过调用取消令牌的 ThrowIfCancellationRequested 方法
  • 通过自己抛出 OperationCanceledException

让我们看看区别。以下是所有三个取消示例通用的设置:

static int CancelTasks()
{
  int numProcs = Environment.ProcessorCount;
  totalNumPrimes = 0;
  nextNumber = 1;
  List<(Task task, CancellationTokenSource cts)> tasks = 
                        new List<(Task, CancellationTokenSource)>();
  DateTime start = DateTime.Now;

  for (int i = 0; i < numProcs; i++)
  {
    Console.WriteLine("Starting thread " + i + " at " + 
                     (DateTime.Now - start).TotalMilliseconds + " ms");
    var cts = new CancellationTokenSource();
    var task = Task.Run(() => CancellableTask(i, cts), cts.Token);
    tasks.Add((task, cts));
  }

  tasks.ForEach(t => t.cts.CancelAfter(1000));

  try
  {
    Task.WaitAll(tasks.Select(t => t.task).ToArray());
  }
  catch (AggregateException ex)
  {
    Console.WriteLine(ex.Message);

    tasks.ForEachWithIndex((t, i) =>
    {
      Console.WriteLine("Task " + i);
      Console.WriteLine("Is canceled: " + t.task.IsCanceled);
      Console.WriteLine("Is completed: " + t.task.IsCompleted);
      Console.WriteLine("Is faulted: " + t.task.IsFaulted);
    });
  }

  return totalNumPrimes;
}

优雅地取消任务

工作任务的实现

static void CancellableTask(int threadNum, CancellationTokenSource cts)
{
  DurationOf(() =>
  {
    int numPrimes = 0;
    int n;

    while ((n = Interlocked.Increment(ref nextNumber)) < MAX)
    {
      if (cts.IsCancellationRequested)
      {
        // Graceful exit
        break;
      }

      if (IsPrime(n))
      {
        ++numPrimes;
      }
    }

    Interlocked.Add(ref totalNumPrimes, numPrimes);
    return numPrimes;
  }, $"Thread: {threadNum}");
}

请注意取消任务需要多长时间!我们请求在 1 秒后取消任务,但实际上任务需要 4 秒才能取消。正如预期的那样,进行优雅退出,没有抛出异常。

调用令牌的 ThrowIfCancellationRequested

实现的区别在于,不是

if (cts.IsCancellationRequested)
{
  // Graceful exit
  break;
}

我们这样做:

cts.Token.ThrowIfCancellationRequested();

在这个例子中,请注意抛出了一个异常,并且每个任务都处于“已取消”状态并且“已完成”状态。

请注意设置中的这一非常重要的一行:

var task = Task.Run(() => CancellableTask(i, cts), cts.Token);

如果我们在 Task.Run 中不设置取消令牌,那么任务的 IsCanceled 标志将不会被设置,异常将被视为故障!

抛出我们自己的 OperationCanceledException

实现再次略有不同

if (cts.IsCancellationRequested)
{
  throw new OperationCanceledException();
}

请注意,这里的任务的 IsCanceled 标志为 false,但 IsFaulted 标志为 true

取消正在等待信号量的线程

最后,让我们回到我们使用信号量来指示工作已入队的线程示例,看看如何取消线程。在这个例子中,我将特意不入队任何工作,这样我们就可以确定信号量是由于取消而释放的,而不是处理了一些工作。和以前一样,这是一个“内联”方法——请注意:

  • 和以前一样,我们将取消令牌作为参数传递给 Task.Run
  • 这里有新东西——我们使用的是 SemaphoreSlim 而不是 Semaphore,因为 SemaphoreSlim 支持一个接受取消令牌作为参数的 Wait 方法。这就是在发出取消请求时如何释放信号量。

这是实例化和执行任务的代码,然后在 1 秒后取消它们:

static void CancellableSemaphores()
{
  SemaphoreSlim sem = new SemaphoreSlim(0, Int32.MaxValue);
  int numProcs = Environment.ProcessorCount;
  var queue = new ConcurrentQueue<int>();
  int numPrimes = 0;
  List<(Task task, CancellationTokenSource cts)> tasks = 
                       new List<(Task, CancellationTokenSource)>();

  for (int i = 0; i < numProcs; i++)
  {
    var cts = new CancellationTokenSource();

    tasks.Add((Task.Run(() =>
    {
      while (true)
      {
        sem.Wait(cts.Token);

        if (queue.TryDequeue(out int n))
        {
          if (n == 0)
          {
            break;
          }

          if (IsPrime(n))
          {
            Interlocked.Increment(ref numPrimes);
          }
        }
      }
    }, cts.Token), cts));
  }

  DurationOf(() =>
  {
    // Don't enqueue anything. We want the thread to wait and 
    // be released by the cancellation token.
    tasks.ForEach(t => t.cts.CancelAfter(1000));

    try
    {
      Task.WaitAll(tasks.Select(t => t.task).ToArray());
    }
    catch (AggregateException ex)
    {
      Console.WriteLine(ex.Message);

      tasks.ForEachWithIndex((t, i) =>
      {
        Console.WriteLine("Task " + i);
        Console.WriteLine("Is canceled: " + t.task.IsCanceled);
        Console.WriteLine("Is completed: " + t.task.IsCompleted);
        Console.WriteLine("Is faulted: " + t.task.IsFaulted);
      });
    }

    return numPrimes;
  }, "Threads using cancellable semaphores");
}

请注意信号量是如何作为取消请求的结果而被释放的。这段代码,正如上面的截图所示,将在任务上抛出异常,该异常会在 Task.WaitAll 调用中被捕获。而不是

sem.Wait(cts.Token);

我们可以在任务中捕获异常并优雅退出。

try
{
  sem.Wait(cts.Token);
}
catch (OperationCanceledException)
{
  break;
}

但是,我们不知道任务被取消了。我们唯一知道任务被取消的方式是使用某种外部机制,甚至可能是一个指示任务取消状态的返回值。对外人来说,上面的代码看起来就像任务完成了它的所有工作。

我们也可以编写这段有些冗余的代码:

try
{
  sem.Wait(cts.Token);
}
catch (OperationCanceledException)
{
  cts.Token.ThrowIfCancellationRequested();
}

在所有情况下,AggregateException.Exceptions[] 都包含 TaskCanceledException 异常类型的实例。

ThreadPool 类和 CPU 密集型工作

对于 CPU 密集型工作(也就是说,每个线程永远不会处于等待状态),创建比核心数更多的线程没有好处。

线程数多于核心数

例如,当我测试 4 个线程(不是任务,而是实际的线程,不使用 .NET ThreadPool)的性能时,因为我的系统上有 4 个 CPU,大约需要 8 秒。

如果我将线程数增加到 20

处理时间接近 9 秒——长了 2 秒!在线程之间进行所有上下文切换的开销相当明显。

任务数多于核心数

对于任务,如果我创建的任务数量等于核心数,它们几乎同时开始(时间以秒为单位):

现在看看当我在 4 核系统上尝试启动 20 个 CPU 密集型任务时会发生什么:

同样,时间是以为单位的。任务 0-3 按预期开始。然后有 4 秒的延迟,下一个任务才开始,再 5 秒后是下一个任务。到那时,前 4 个任务已经完成了所有工作,剩余的任务创建时没有工作要做,所以它们很快就终止了。前四个任务之后的延迟时间是 ThreadPool 调度程序的结果,当任务数量超过核心数时,它会延迟启动更多 CPU 密集型任务。有各种各样的关于 ThreadPool 行为的文章,特别是当任务数 > CPU 核心数时,后续任务会延迟 500 毫秒。显然,这里有比这更多的延迟!深入研究 ThreadPool 类确实超出了本文的范围——这里的重点是,如果你使用 ThreadPool 类(并且要小心所有与 ThreadPool 类相关的各种创建线程的方法),你需要了解你创建了多少线程、它们在做什么以及它们会做多久。

结论

本文实际上涵盖了很多内容:

  • “老式”线程创建
  • 线程 Join
  • 后台线程
  • 使用线程参数
  • 平衡和优化线程(主动索取,而不是指示)
  • Interlocked 和 Atomic 操作
  • AsParallel
  • Task.RUn
  • 使用 await
  • 排队工作
  • 信号量、互斥体和锁
  • 异常处理
  • 混合方法
  • 取消线程和任务
  • 简要了解 ThreadPool

其他阅读材料

脚注

1 - Intel CPU LOCK 指令支持 ADD、ADC、AND、BTC、BTR、BTS、CMPXCHG、CMPXCH8B、DEC、INC、NEG、NOT、OR、SBB、SUB、XOR、XADD 和 XCHG。

2 - https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean

3 - https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/implementing-the-task-based-asynchronous-pattern

4 - https://msdn.microsoft.com/en-us/library/windows/desktop/ms686908(v=vs.85).aspx

5 - https://msdn.microsoft.com/en-us/library/system.threading.synchronizationcontext(v=vs.110).aspx

6 - https://msdn.microsoft.com/magazine/gg598924.aspx

7 - https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/async-return-types

8 - https://blogs.msdn.microsoft.com/pfxteam/2011/10/24/task-run-vs-task-factory-startnew/

9 - https://en.wikipedia.org/wiki/Railway_semaphore_signal

10 - https://msdn.microsoft.com/en-us/library/system.threading.mutex(v=vs.110).aspx

11 - https://en.wikipedia.org/wiki/Thread_(computing)

12 - https://www.microsoftpressstore.com/articles/article.aspx?p=2233328&seqNum=7

13 - https://en.wikipedia.org/wiki/Fiber_(computer_science)

14 - https://en.wikipedia.org/wiki/Cooperative_multitasking

15 - https://en.wikipedia.org/wiki/Preemption_(computing)#PREEMPTIVE

16 - https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads

17 - https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/task-cancellation

18 - http://www.sparxeng.com/blog/software/must-use-net-system-io-ports-serialport

历史

  • 2018年8月2日:初始版本
© . All rights reserved.