65.9K
CodeProject 正在变化。 阅读更多。
Home

C# .NET 4.5 中的多线程(第 2 部分)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (51投票s)

2016年9月26日

CPOL

14分钟阅读

viewsIcon

83136

downloadIcon

1404

最新线程技术和模式的高级概述,附带示例。

引言

第一部分确立了一些关于多线程如何提高性能的基本原理,以及多线程设计元素中性能的得失。在第二部分中,我的目标是介绍最新的线程技术以及它们的一些高级用法。这些只是入门技巧,并非旨在深入探讨任何单一的关注领域或技术。

背景

如果您不熟悉基本的线程概念,请回顾 第 1 部分

在阅读本文之前,您应该熟悉

  • 集合
  • 基本多线程
  • 线程切片
  • 线程成本
  • 设计模式

对象的编织历史

尽管线程和线程安全需要不同的工具,但恰当的线程管理可以说是确保线程性能优化的最关键因素。第一个提供线程管理的 .NET 1.1 对象是 ThreadPool

与许多技术一样,构建在已完成工作的基础上使我们能够做得更多,这在多线程方面绝对是如此。ThreadPool 和线程管理确实是大多数工作的基础,因此本文将重点关注线程管理。

请不要误会我的意思。还有许多其他很棒的 .NET 1.1 线程对象,包括

  • Mutex/Semaphore
  • 显示器
  • ManualResetEvent/AutomaticResetEvent

当您进行高级线程优化时,这些都是有价值的工具,但出于两个主要原因,我不会在本文中重点介绍它们:

  1. 这超出了 80% 用例的范围。
  2. 大多数现代线程技术都会为您处理其用途。

快进到 .NET 4.0 出现,我们通过引入任务并行库 (TPL) 和并行 LINQ (PLINQ) 获得了线程方面的飞跃。虽然这些为线程功能和开发便利性带来了大量新增功能,但您可以将这些库真正归结为引入了 Task。为了尽可能简单地描述 Task,它是一个已自动分配到 ThreadPoolThread

让系统管理线程应该是默认行为,但使用 ThreadPool,您仍然需要显式创建和分配资源。在 Task 中固有地处理好这一点,只是使我们本应做的事情更容易。我经常开玩笑说,我作为程序员的工作就是尽量少写代码,但这些默认设置也是为了帮助您避免“弄巧成拙”。

抛开玩笑不说,虽然 TPL 的变化不仅仅是为了少写代码,但 PLINQ 确实只是为了少写代码。

下面是一个 PLINQ 的例子

Parallel.For(
    0,
    1000,
    new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
    (i) => { /* Do work */ });

这会创建 1,000 个 Task 对象,这可以用一个简单的 for 循环完成;但是,您需要以线程安全的方式管理索引,并且还需要自己添加许多标准的线程控件。注意到 ParallelOptions 参数了吗?它会将并发线程的数量限制为 CPU 核心数 - 这是一个很好的通用建议。

ParallelOptions 是现代线程技术利用旧技术的一个很好的例子。并发线程执行限制是通过信号量实现的。所以这部分解释了为什么我现在不专门介绍信号量和其他 .NET 1.1 线程对象的使用。

除了 TPL 和 PLINQ 之外,.NET 4.0 还引入了 IObservable。虽然在标准的 .NET 库中没有直接支持 IObservable,但可以通过 NuGet 轻松使用响应式扩展 (Rx)。响应式扩展还包含许多可以提高多线程工作开发和管理便利性的功能。IObservable 实际上也利用了 TPL。

接着 .NET 4.5 引入了异步任务协议 (ATP),每个人都在热议它;然而,类似于 Task 是建立在旧的 ThreadPool 管理之上的,ATP 实际上是建立在现有的延续 Task 能力之上的。

通过延续 Task 提高效率的好处是,您不必因为等待后续进程完成而让线程占用分配的切片时间。此外,回到线程同步上下文并调用原始线程或创建更多线程来处理延续比创建更多线程更有效。

百分之六十的时间,它一直有效

不幸的是,ATP 并非万能药,很多缺乏线程经验的人却这样认为。要有效地进行多线程处理,仍然有很多东西需要学习,而不仅仅是 ATP。

无论可用的技术多么出色,您都必须确保您的代码是线程安全的。如果您不确定正在使用的某个特定项是否线程安全,那么它很可能不是。在对其进行任何其他操作之前,请仔细检查对象的线程安全性。

例如,List{T} 不是线程安全的。这意味着如果您在多个线程上操作集合,它可能不会如您所愿。丢失的元素、应该被删除的元素、错误的计数等。运行下面的示例程序将进行演示。

class Program
{
    static void Main(string[] args)
    {
        ListThreadSafety();

        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }

    static void ListThreadSafety()
    {
        int count = 1000000;

        var list = new List<int>(count);
    
        Parallel.For(0, count, (i) => list.Add(i));

        Console.WriteLine($"The total count should be {count}.  The list count is {list.Count}.");
    }
}

Add 不是原子操作 - 原子操作是指可以被视为单个操作的操作。在底层,它通过一系列步骤操作其他字段和属性将元素添加到集合中。这因此意味着它容易发生竞态条件。

竞态条件是取决于线程执行顺序的非确定性结果。类似于代码错误同时具有编译器错误和逻辑错误,竞态条件也可能来自逻辑错误。例如,如果您创建两个线程,它们依赖于按创建顺序完成,即使这两个线程内部的一切都已正确同步。

无论 async/await 多么强大,它们都无法防止竞态条件。这意味着您必须特别注意您正在处理什么以及如何处理它。

有什么问题?为什么不让所有东西都线程安全呢?嗯,线程同步的成本很高,所以除非需要,否则不要使用它,这让我们来到效率……

将未进行多线程处理的内容使用 async 进行更新,不一定会使其有实质性改进。这在很大程度上取决于线程正在做什么类型的工作以及它运行的环境。

当您的计算机承担繁重工作时,async 的好处最大。繁重计算、文件 IO 等。原因是,在 CPU 使用率高的情况下,您能限制浪费线程切片的时间越多,就能完成更多的繁重任务。如果 CPU 空闲,等待外部调用返回,那么过多的线程切片实际上不会对性能产生太大影响。

这里的经验是,对于您已经多线程处理的任务,如果您想切换到 async/await,请确保在花费开发时间和冒着 bug 的风险来更新代码之前有可行的好处。同时请记住,对于执行简单任务,通常更有效率的是单线程完成任务,而不是承担多线程的开销。

并发集合

System.Collections.Concurrent 命名空间在 .NET 4.0 中添加(正如您可能猜到的),它们是针对多线程操作进行了优化的线程安全集合。有五个并发集合:

不那么明显的是,这些集合针对生产者-消费者模式进行了优化。生产者-消费者与一般多线程之间的非常重要的区别在于,生产者-消费者旨在支持同时向同一个集合添加和从中取出。

我个人在一般多线程中使用 ConcurrentBag{T}ConcurrentDictionary{TKey,TValue},这样我就不必维护一个同步根对象,但对于性能关键的多线程场景,我默认不推荐这样做。在这种情况下,您应该研究针对特定实现优化代码。

为了证明使用生产者-消费者模式的合理性,生产和消费都需要是一项昂贵的工作。这里的昂贵仅仅意味着值得为创建线程付出额外的成本和开销。外部调用、数据库操作、文件 IO 等。

要正确实现生产者-消费者模式,需要创建线程以同时进行生产和消费,如下所示:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerExample();

            Console.WriteLine("Press any key to exit...");
            Console.ReadKey();
        }

        static void ProducerConsumerExample()
        {
            var count = 100;

            var productionTasks = new ConcurrentBag<Task>();
            var consumptionTasks = new ConcurrentBag<Task>();

            // Creating blocking collection with bounding to free up threads for consumption
            var blockingCollection = new BlockingCollection<int>(10);

            var consumerMain = new TaskFactory().StartNew(() =>
            {
                while (true)
                {
                    try
                    {
                        // For indeterminable operations use cancellation token
                        var consumed = blockingCollection.Take();

                        consumptionTasks.Add(new TaskFactory().StartNew(() =>
                        {
                            // Expensive operation, i.e. calling an API based on data
                            Thread.Sleep(100);
                            Console.WriteLine($"Consumed {consumed} from blocking collection.");
                        }, TaskCreationOptions.LongRunning));
                    }
                    catch (InvalidOperationException)
                    {
                        // Done
                        Console.WriteLine($"Completed consumption task generation.");
                        break;
                    }
                };
            }, TaskCreationOptions.LongRunning);

            Parallel.For(0, count, (i) =>
            {
                productionTasks.Add(new TaskFactory().StartNew(() =>
                {
                    // Expensive operation, i.e. database retrieval
                    Thread.Sleep(100);                  
                    blockingCollection.Add(i);
                    Console.WriteLine($"Added {i} to blocking collection.");
                }, TaskCreationOptions.LongRunning));
            });

            // If tasks are determinable like this example...
            Task.WaitAll(productionTasks.ToArray());
            blockingCollection.CompleteAdding();

            // Wait on the main consumer to finish adding tasks, then wait on tasks...
            consumerMain.Wait();
            Task.WaitAll(consumptionTasks.ToArray());
        }
    }        
}

在这个特定的例子中,消费者以一种简单的方式结束——通过异常。Take() 是一个阻塞调用,如果(或正在进行)在 BlockingCollection{T} 已完成时调用,它将抛出 InvalidOperationException。如果需要,还有更优雅的方法来通过 CancellationToken 或许多其他线程机制来终止处理。

WPF 模式

WPF 实现的一个主要目标是让所有繁重的工作都脱离 Dispatcher 线程。虽然这看起来很简单,但复杂之处在于如何将工作结果协调回 Dispatcher

下面是实现此目的的基本示例:

示例 1 - async/await

public DelegateCommand CheckAsyncCommand { get; set; }

CheckAsyncCommand = new DelegateCommand(async () => await CheckAsync());

public async Task CheckAsync()
{
    await new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
                 Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);
    }, TaskCreationOptions.LongRunning);

    IncrementCount();
}

示例 2 - ContinueWith

public DelegateCommand CheckContinuationCommand { get; set; }

CheckContinuationCommand = new DelegateCommand(() => CheckContinuation());

public void CheckContinuation()
{
    new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
                Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);
    }, TaskCreationOptions.LongRunning).ContinueWith((task) => 
    IncrementCount(), TaskScheduler.FromCurrentSynchronizationContext());

示例 3 - Dispatcher.Invoke

public DelegateCommand CheckInvokeCommand { get; set; }

CheckInvokeCommand = new DelegateCommand(() => CheckInvoke());

public void CheckInvoke()
{
    new TaskFactory().StartNew(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
              Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);

        Application.Current.Dispatcher.Invoke(() => IncrementCount());
    }, TaskCreationOptions.LongRunning);
}

示例 4 - Observable

private Subject<object> _subject = new Subject<object>();

_subject.ObserveOnDispatcher().Subscribe(_ => IncrementCount());
CheckObservableCommand = new DelegateCommand(() => CheckObservable());

public void CheckObservable()
{
    Observable.Start(() =>
    {
        if (Thread.CurrentThread.ManagedThreadId == 
           Application.Current.Dispatcher.Thread.ManagedThreadId)
        {
            throw new InvalidOperationException();
        }

        Thread.Sleep(2000);

        _subject.OnNext("Finished.");
    }, NewThreadScheduler.Default);
}

理解这些示例的第一个要点是 DelegateCommand。它是 Prism 对 ICommand 接口的实现。然后,这些命令绑定到应用程序中的 Button。因此,DelegateCommand 将在 Dispatcher 上执行。

如果您不将命令中的工作转交给新线程,该命令将仅在 Dispatcher 上执行。如果您关心 UI 的响应性,这是不好的……

在演示项目中,有一个 ProgressBar,其 IsIndeterminate 已设置为 true。Dispatcher 上的任何工作都会随后导致进度条暂停。目的是在演示中轻松清晰地识别 Dispatcher 何时被使用。

您可以通过“运行同步”Button 来验证命令在 Dispatcher 上的执行会冻结进度条。其他使用上述示例的命令将在新线程上执行其工作,因此不应显着冻结进度条。

注意:运行线程命令时,进度条可能仍会闪烁,因为创建新 Task 的工作是在 Dispatcher 上进行的。

注意:大多数创建托管线程的机制不能保证它们实际上会在新线程上运行。如果您想确保它在新线程上运行,请使用相应的创建选项。

async/await 实现应该是默认标准,因为它相对简单易实现,并且成功地释放了 Dispatcher。这种方法的缺点也是它的简单性。对于自然地挂接复杂执行或通知,确实没有太多空间。当然,您可以构建它们,但这正是其他示例的意义所在。

ContinueWith 方法的好处是,您可以将 Task 组合成一种策略模式。对链中的任何单个 Task 都可以进行直接控制/影响。这种方法的缺点是增加了复杂性,迫使所有内容都变成 Task

Dispatcher.Invoke 很简单,可以处理大多数情况,并且可以从任何线程调用。相比之下,在不明确确保我们在 Dispatcher 上的情况下调用 Increment() 时,假设调用始终来自 Dispatcher。通过命令绑定,这是默认预期,尽管可能需要从其他后台进程进行调用,而这可以处理。缺点是 Invoke 调用成本更高。

Observable 是我最喜欢的之一,因为它有大量的丰富功能支持;然而,它需要更多的“管道”连接,并增加了代码的复杂性。当我需要复杂执行时,我倾向于使用这种方法而不是 ContinueWith;然而,这引入了另一个并非每个人都可能熟悉的.NET 概念。

Reactive Extensions

响应式扩展 (Rx) 是一个支持观察者模式异步实现的库。它有大量的有用功能,并且是作为单子构建的,因此您可以像 IEnumerableIQueryable 一样链接功能。

观察者模式有两个基本参与者:主题和观察者。主题是数据的来源,观察者是监视该数据的所有人。

重要的是不要将观察者模式与发布-订阅模式混淆。后者更像是一种架构模式,可以通过利用观察者模式来构建。Rx 将此稍微弄得有些混乱,因为它使用了一些混合术语(例如,您调用 Subscribe 来监视 IObservable{T})。

在 Rx 中您能观察到的任何东西都实现了 IObservable{T} 接口。主题由您猜对了,ISubject{T} 接口支持。ISubject{T} 的开箱即用实现也实现了 IObservable{T}。因此,您正在推送和监听同一个源对象。

Rx 利用 TPL,因此您会立即看到一些类似的功能,例如 IObservable{T} 是可等待的。同样,就像 Task 有一个 TaskScheduler 一样,Rx 有 IScheduler,允许您轻松影响线程的管理方式。如果您注意到 WPF Observable 示例中的内容,甚至还有一个方便的扩展 IObservable{T}.ObserveOnDispatcher(),可以更轻松地加入 Dispatcher

我真正喜欢使用 IObservable 实现的模式之一是抽象化线程细节,例如:

private Subject<object> _mySubject = new Subject<object>();

public IObservable<object> MySubject
{
    get
    {
        return _mySubject.ObserveOn(NewThreadScheduler.Default);
    }
}

如果您正在创建一个应该在新线程上启动的 Task,则需要自己启动 Task 才能确保这种情况发生。问题是,有时让消费者启动 Task 是最优的,例如当您想附加一个延续 Task 时。虽然这当然可以用 Task 来完成,但这是一种更简单的方法。

以下是一些其他非常有用的功能……

  • Observable.Merge:将多个 IObservable{T} 源合并到一个 IObservable{T} 中。
  • Observable.Timer:在指定持续时间后触发单个事件。
  • Observable.Interval:在指定间隔后触发事件。
  • IObservable{T}.Where:谓词,用于确定 IObservable 是否应被触发。
  • IObservable{T}.Throttle:仅在 IObservable 在指定持续时间内停止更改后触发。
  • IObservable{T}.Take(int count):仅触发 IObservable 指定的次数。
  • IObservable{T}.TakeUntil(IObservable{T} source):直到指定的源触发 IObservable
  • IObservable{T}.StartWith(T initialValue):在订阅时,使 IObservable 使用提供的初始值触发。

通过所有这些构建 IObservable{T} 的方法,您可以以多线程方式 Subscribe 它们,并获得开箱即用的功能。这就是我希望在现代 .NET 线程处理方法中讨论 Rx 的原因。

我希望通过这些功能以异步方式观察数据能引发各种潜在用途。不幸的是,我不会将本文变成关于响应式扩展的文章,因此我不会深入介绍这些功能。这里的重点是 Rx 库是异步和多线程编程的另一个强大工具箱。

提示与技巧

每个创建的线程都应该将其逻辑包装在 try {} catch {} 中。我并不是说您应该吞下抛出的 Exception,但在线程内部,未处理的异常很可能要么使您的整个应用程序崩溃,要么使线程静默终止——两者都很糟糕。您需要确保以可预测的方式处理线程内的任何 Exception

注意:PLINQ 循环中的异常将抛出 AggregrateException

任何时候创建 Task,都不能保证它会实际创建一个新线程。如果您想尝试推动新线程的创建,请确保使用 TaskCreationOptions.LongRunning

PLINQ 中停止/取消正在运行的 Task 的方式与通常的工作方式并不直观。通常,任何已生成的 Task 实际上都会运行完成。如果您需要考虑这种行为,请进行充分的测试/调查。

创建前台线程的唯一方法是使用 Thread

Mutex/Semaphore 是机器级别的对象。如果您想协调同一台计算机上多个应用程序之间的资源访问,这就是一种实现方式。

BackgroundWorker 不如 Task。除非您受限于目标 .NET 2.0 或 .NET 3.5,否则不要使用它。

使用 IObservable{T}.Select 选择一个常见的 Type(可能是一个状态),以便您可以使用 Observable.Merge 处理不同的源 Type

默认情况下,使用 CompositeDisposable 来存储来自任何可观察对象的订阅,并相应地 Dispose 它,以帮助确保您的订阅不是内存泄漏的来源。

lock 由线程拥有,因此不要在多个地方使用同一个对象作为 lock,这可能导致意外的通行。

关注点

这些技巧和窍门是我能想到的基本功能。稍后我会尝试添加更多。如果您需要解决问题的技巧/窍门,请随时留下反馈,我会告知您我是否有任何内容。

历史

  • 2016-09-30:更新了代码文件 - 更新了生产者-消费者代码片段 x2
  • 2016-09-28:更新了生产者-消费者示例代码片段 - 修复了有缺陷的完成测试检查
  • 2016-09-27:编辑了文章 - 将背景改为前台
  • 2016-09-26:初始版本
© . All rights reserved.