65.9K
CodeProject 正在变化。 阅读更多。
Home

强大的 .NET TPL 替代方案

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2024 年 7 月 29 日

MIT

12分钟阅读

viewsIcon

11446

Armat.Threading 是一个基于 .NET TPL 机制的异步代码执行库

引言

异步代码执行机制一直是编程中最复杂领域之一。随着 .NET 框架引入 TPL,并行编程技术变得对更广泛的开发者可用,因此也使 .NET 成为许多人选择的框架。

作为一种通用的异步编程框架,TPL 解决了大多数现实生活中的问题。它可以用于

  • 异步运行用户发起的*操作* - 从而提供流畅的用户体验,
  • 触发*并行*代码执行 - 从而充分利用硬件资源,
  • 构建*相互依赖的任务图* - 从而显著降低代码复杂度,
  • 以及可能还有更多我不想赘述的场景。

虽然上述情况足以满足大多数应用程序的需求,但偶尔我也会遇到标准 .NET 工具难以克服的限制。在我开发 C# 项目的过程中,我选择退一步,为其中一些问题提供一个通用的解决方案。特别是,我需要

  • 一种在*进程内创建多个线程池*的方法,
  • 一种*跨异步方法调用传递用户上下文*的机制,
  • *异步代码执行的日志记录/可追溯性*,
  • 以及最重要的是,*更好地理解 TPL 内部原理*。

本文将深入探讨 Armat.Threading 库,并提供其实际使用示例。此外,它还将阐明 .NET 中异步编程的实现细节,这一领域通常缺乏文档,特别是与 TPL 内部原理相关的部分。

Armat.Threading 库

本文分为两部分。第一部分侧重于 Armat.Threading 库的*使用*,并附带一些*实际示例*,第二部分旨在揭示 .NET 中用于支持 async / await 语法的*异步编程机制*。

使用 Armat.Threading 执行并行代码

Job 类

Armat.Threading.Job 类对应于 TPL 中的 System.Threading.Tasks.Task。它具有与 Task 类相似的接口,并且是 Armat.Threading 库的核心异步执行单元。Armat.Threading.Job<T> 类对应于 TPL 中的 System.Threading.Tasks.Task<T>。它继承自非泛型的 Job 类,并具有与 Task<T> 相似的接口。Job<T> 类从 Job 驱动大部分功能,用于执行最终返回 T 类型结果的异步方法。

以下是使用 Armat.Threading 库*异步执行代码*的一些示例

    // Run a Job with no args and no return value
    await Job.Run(
        () ->
        {
            Console.WriteLine("Running job asynchronously");
        }).ConfigureAwait(false);

    // Run a Job with args but no return value
    await Job.Run(
        (double number) ->
        {
            Console.WriteLine("Running job asynchronously with arg {0}", number);
        }, 0.1248).ConfigureAwait(false);

    // Run a Job with no args and returning T
    T result = await Job<T>.Run(
        () ->
        {
            Console.WriteLine("Running job asynchronously");
            return default(T);
        }).ConfigureAwait(false);

    // Run a Job with args and returning T
    T result = await Job<T>.Run(
        (double number) ->
        {
            Console.WriteLine("Running job asynchronously with arg {0}", number);
            return new T(0.1248));
        }, 0.1248).ConfigureAwait(false);

运行 Job 有多种方式。最简单的方式是使用静态 Run 方法(如上所示)。也可以使用构造函数之一创建 Job 对象实例。以这种方式创建的 Job 必须稍后通过调用 Run 方法的相应重载来执行,如下所示

    public Int32 Sum(Int32[] array)
    {
        // Create the Job
        Job<Int32> asyncJob = new Job<Int32>(SumFn, array);

        // Run asynchronously
        asyncJob.Run();

        // Wait for it to finish and return the result
        return asyncJob.Result;
    }

    private Int32 SumFn(Object? args)
    {
        Int32[] array = (Int32[])args;

        return array.Sum();
    }

每个 Job 实例都有一个唯一的*自动递增*的 Id,类型为 Int64。这些可用于*调试、日志记录或跟踪*目的。选择使用 Int64 标识符是基于以下几点

  1. 它的大小相对较小,并且不需要堆内存分配
  2. 生成新标识符(递增 Int64 数字)不应影响性能
  3. 即使在*长时间运行的应用程序*中也不会溢出。假设进程内创建 Job 的速率为 1 Job/毫秒,那么需要大约 300,000,000 年才能溢出。

可以通过静态属性 Armat.Threading.Job.Current 来识别*当前运行*的 Job 实例(如果当前代码不在任何 Armat.Threading 线程的上下文中运行,则为 null)。这样,执行代码就可以访问任何 Job 属性(如 AsyncStateCancellationTokenInitiator)来定义自己的行为。

注意:我不会坚持认为公开当前正在运行的 Job 实例并使用静态属性是最好的设计选择。虽然它简化了代码(例如,您不必显式地将 CancellationToken 传递给嵌套方法以支持取消),但它也允许在 Job 上下文中运行的任何方法读取它可能不应该访问的信息。它也可能由于隐式访问数据的方式而使代码变得模糊。无论如何,它解决了许多问题,并为进一步扩展 Armat.Threading 库创造了广泛的机会。我强烈建议使用者尽可能少地使用此访问器,以保持代码的简洁和可读性。

Initiator 属性可用于识别*触发当前 Job* 的 Job。通过递归遍历发起者 Job,可以导航 Job 层级结构,直到 Root

考虑到 Armat.Threading.Job 类的*整个接口*在很大程度上与 .NET CLR 的 System.Threading.Tasks.Task 相同,我将不在此详述。而且,类属性和方法名称似乎也相当具有描述性。

JobScheduler

IJobScheduler(抽象)

Armat.Threading.IJobScheduler 接口定义了 Job 调度程序的行为。下面是 Job 调度程序接口的声明

public interface IJobScheduler : IDisposable
{
    // Static property returning the default instance of IJobScheduler.
    // This is the one to be used for running the Jobs by default.
    static IJobScheduler Default { get; }

    // Static property returning the current instance of IJobScheduler to be used for Jobs execution
    // If not null, it will used for running Jobs instead of IJobScheduler.Default
    static IJobScheduler Current { get; }

    // Enqueues a Job in a scheduler.
    // To successfully enqueue a Job in a JobScheduler one must have Job.Status = JobStatus.Created (never run before).
    void Enqueue(Job job);
    // Cancels Job execution in the JobScheduler before it begins.
    // The method will fail (will return false) if the Job is already running or is finished.
    Boolean Cancel(Job job);
    // The property returns number of jobs currently waiting in the queue.
    // It may be used to monitor the current load on the JobScheduler.
    Int32 PendingJobsCount { get; }

    // Makes IJobScheduler.Current to refer to this instance the for the executing thread.
    // IJobScheduler.Current is reset to the previous value once the returned JobSchedulerScope is Disposed.
    JobSchedulerScope EnterScope();
}

该接口*极简*且*自述*。

使用给定的 IJobScheduler 实例运行异步 Job 的一种方法是使用 IJobScheduler.EnterScope 方法。它允许在*调用方法的范围内*覆盖*默认*的 Job 调度程序。以下示例说明了如何实现这一点

    private async Job<Int64> JobExecutionInCustomScheduler(IJobScheduler otherScheduler)
    {
        // After this line all Jobs will be executed by the otherScheduler (unless overridden by another one)
        // This will make IJobScheduler.Current to refer to the otherScheduler
        // Disposing the otherScope will result in restoring the previous value of IJobScheduler.Current
        using var otherScope = scheduler.EnterScope();

        // create the Job
        Job<Int32> asyncJob = new Job<Int32>(SumFn, new Int32[] { 1, 2, 3 });

        // Run asynchronously
        asyncJob.Run();

        // async wait for it to finish and return the result
        return await asyncJob.ConfigureAwait(false);
    }

    private Int32 SumFn(Object? args)
    {
        Int32[] array = (Int32[])args;

        return array.Sum();
    }

JobSchedulerBase(构建块)

抽象类 JobSchedulerBase 实现 IJobScheduler 接口。它被设计为所有可能的 Job 调度程序实现的*基类*。JobSchedulerBase 提供了设置 IJobScheduler 接口的 Default 属性的方法,并且还为派生类提供了*受保护的* Job 执行 API。

public abstract class JobSchedulerBase : IJobScheduler
{
    // Gets or sets the default <code>IJobScheduler</code>. If not set, the <code>JobScheduler.Default</code> is returned.
    // Note: To protect setting the default <code>JobScheduler</code> by an arbitrary code, the setter is made protected, thus requiring a public setter in a derived class.
    // Note: <code>JobScheduler.Default</code> can be set only once during process lifetime (do this at the application initialization phase if necessary).
    public static IJobScheduler Default { get; protected set; }

    // updates the status of Job
    // can be used in derived scheduler implementations to control Job status (Job.UpdateStatus is internal to Armat.Threading library)
    protected Boolean UpdateJobStatus(Job job, JobStatus newStatus, JobStatus prevStatus);

    // this method must be used when executing jobs in a scheduler
    // in sets the IJobScheduler.Current property to this during the job execution
    // can be used in derived scheduler implementations to control Job status (Job.ExecuteProcedure is internal to Armat.Threading library)
    protected JobStatus ExecuteJobProcedure(Job job);

    // this method must be used when executing job continuations in a scheduler
    // in sets the IJobScheduler.Current property to this during the job execution
    // can be used in derived scheduler implementations to control Job status (Job.ExecuteJobContinuations is internal to Armat.Threading library)
    protected Int32 ExecuteJobContinuations(Job job);
}

注意:提供 JobScheduler 的抽象基类的决定似乎与最佳设计实践相悖。特别是,这使得在不继承 JobSchedulerBase 的情况下提供任何其他 IJobScheduler 实现成为不可能。我选择此解决方案是为了使用 internal 访问修饰符来保护某些 Job 方法,然后通过相应的 JobSchedulerBase 方法使其可访问。这确保了 Job 执行方法*仅在同一程序集中*可访问,从而提供额外的安全层和对如何使用它们的控制。它还保证在 Job 执行期间*设置和重置* IJobScheduler.Current 属性。

JobScheduler(实现)

JobScheduler 类继承自 JobSchedulerBase,并提供了异步 Job 调度机制的*默认实现*。它管理*线程池*和已提交*Jobs*的*队列*,并提供有关 Job 执行的*一些统计信息*。

通过使用构造函数的相应重载,可以配置 JobScheduler,为其指定*名称*,使用*特定的最小/最大线程数*,并*限制队列中待处理 Jobs 的大小*。JobScheduler 将*常规*和*长时间运行*的 Job 分别管理在*不同的队列*中,并使用*不同的线程池*执行它们,从而确保不会因长时间运行的 Job 而*阻塞*较小的 Job。

以下是*显式使用* JobScheduler*运行 Job* 的两个*相同*的示例

public class AsyncExecutor
{
    private JobScheduler myScheduler = new("Scheduler of ProcA");

    public async void RunInOwnScheduler_Example1()
    {
        // create a job
        Job job = new(ProcA);

        // run it within own scheduler
        job.Run(myScheduler);

        // wait for the job to complete
        await job.ConfigureAwait(false);
    }

    public async void RunInOwnScheduler_Example2()
    {
        // create a job
        Job job = new(ProcA);

        // enqueue the job in own scheduler
        myScheduler.Enqueue(job);

        // wait for the job to complete
        await job.ConfigureAwait(false);
    }

    private async void ProcA()
    {
        // do something
    }
}

JobRuntimeScope

范围的定义

Armat.Threading.JobRuntimeScope 类代表*异步操作的范围*。该范围在 JobRuntimeScope 对象*实例化*时开始,并在其*处理*(通常在异步操作完成后)时结束。它代表*字符串键*和*对象值*的*对*。JobRuntimeScope 对象*可以在异步代码执行期间*(无论异步调用的数量和深度如何)通过*静态访问器*检索。

使用 JobRuntimeScope 可以将*参数传递*给嵌套的异步方法,从而提供有关调用的*上下文信息*。JobRuntimeScope 的一些*好的使用示例*包括

  • 识别 Job 的*相关性*,
  • *跟踪*异步代码执行,
  • 将*上下文信息*传递给嵌套方法。

Armat.Threading.JobRuntimeScope 由以下成员组成

  • public static JobRuntimeScope Enter(String key, Func<Object> factory)
    使用给定的*键*实例化 JobRuntimeScope 类型的对象,并使用工厂方法初始化*值*属性。
    注意:如果*键*已在当前范围内定义,则会返回现有的 JobRuntimeScope 实例,并且*永远不会调用*工厂方法来创建新的实例。
  • public static JobRuntimeScope Enter<T>(String key, Func<T> factory)
    表示 JobRuntimeScope.Enter 方法的*重载泛型版本*,它可以存储*类型 T 的值*。
  • public static JobRuntimeScope Enter<T>(Func<T> factory)
    表示 JobRuntimeScope.Enter 方法的*重载泛型版本*,它使用*类型 T* 作为创建范围的*键*。
  • public static JobRuntimeScope EnterNew(String key, Func<Object> factory)
    使用给定的*键*实例化 JobRuntimeScope 类型的对象,并使用工厂方法初始化*值*属性。
    注意:如果*键*已在当前范围内定义,则返回 JobRuntimeScope.Null 以指示失败结果,并且*永远不会调用*工厂方法来创建新的实例。
  • public static JobRuntimeScope EnterNew<T>(String key, Func<T> factory)
    表示 JobRuntimeScope.EnterNew 方法的*重载泛型版本*,它可以存储*类型 T 的值*。
  • public static JobRuntimeScope EnterNew<T>(Func<T> factory)
    表示 JobRuntimeScope.EnterNew 方法的*重载泛型版本*,它使用*类型 T* 作为创建范围的*键*。
  • public static Object? GetValue(String key)
    返回当前范围中*给定键*的*值*。如果*未找到键*的作用域,则返回*null*。
  • public static T? GetValue<T>(String key)
    返回当前范围中*给定键*的*值*,如果*未找到键*的作用域,则返回*null*。如果*值*不能赋值给*类型 T*,则会引发异常。
  • public static T? GetValue<T>()
    表示 JobRuntimeScope.GetValue 方法的*重载泛型版本*,它使用*类型 T* 作为*键*。
  • public void Leave()
    通过*移除相应键*来*退出*当前范围。
  • public void Dispose()
    Leave() 方法所述,*退出*当前范围。它被设计为与*using 关键字*一起调用,以确保在*退出方法范围*时进行*正确处理*。
  • public Boolean IsNull
    对于*null(无效)范围实例*,返回*true*。在上述 EnterNew 方法中,当*无法进入给定范围*时,可以返回 Null 范围。
  • public String Key { get; }
    返回 JobRuntimeScope 对象的*键*。
  • public Object Value { get; }
    返回 JobRuntimeScope 对象的*值*。

我将不尝试在此文中描述 JobRuntimeScope 类的*实现细节*,因为它在阅读源代码本身时似乎很清楚。主要思想在于将 JobRuntimeScope 对象*存储在 ThreadLocal 缓存中*,并确保通过在*该线程中*实例化的 Job 来*传播*它。

JobRuntimeScope 的*一些使用示例*作为*单元测试*在此处提供 。以下代码说明了如何使用它

    private async Job DoSomething()
    {
        // run some user scoped operation
        await RunUserScopedOperation().ConfigureAwait(false);

        // user data is null here because there's JobRuntimeScope object 
        //   has been defined and disposed within RunUserScopedOperation method
        UserData? userData = JobRuntimeScope.GetValue<UserData>();
    }
    private async Job RunUserScopedOperation()
    {
        // create the scope with some UserData information
        // 'using' keyword guarantees to have the scope Disposed when exiting the method
        using var scope = JobRuntimeScope.Enter<UserData>(() => new UserData("abc", "123"));

        // user data refers to the one declared above
        UserData? userData1 = JobRuntimeScope.GetValue<UserData>();

        // run any asynchronous operation
        // UserData will be accessible in all inner synchronous or asynchronous methods
        //   irrespective of the thread running the method
        await AsyncOperation().ConfigureAwait(false);

        // user data remains the same as above
        UserData? userData2 = JobRuntimeScope.GetValue<UserData>();
    }
    private async Job AsyncOperation()
    {
        // user data remains the same as created in the caller method
        UserData? userData3 = JobRuntimeScope.GetValue<UserData>();

        // running some asynchronous operations
        await Job.Yield();

        // user data remains the same as created in the caller method
        UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
    }

因此,JobRuntimeScope 提供了一种*通用的机制*,用于将*用户定义的上下文信息*传递给在 Job 上下文中运行的代码。

CorrelationIdScope

Armat.Threading.CorrelationIdScope 类是 JobRuntimeScope 的*可能值类型之一*。它生成*自动递增的 64 位 ID*,用于*跨异步操作的相关性*(用于日志记录、跟踪或任何其他需求)。它还提供方便的*工厂方法*,用于使用*新的 CorrelationIdScope 值*实例化 JobRuntimeScope,如以下示例所示(相应的单元测试*在此处*提供

    private async Job RunCorrelationIDTest(Int32 testNum)
    {
        // this will create correlation ID and the appropriate scope
        using var scope = CorrelationIdScope.Create();

        // any asynchronous code execution
        await Job.Yield();

        // correlation ID is available here (after async method invocation)
        CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
        Output.WriteLine("RunCorrelationIDTest: Correlation ID for test {0} is {1}",
            testNum,
            currentCorrId.CorrelationID);

        // nested async method invocations
        await NestedAsyncMethodCall(testNum, CorrelationIdScope.Current()!.CorrelationID, 1).ConfigureAwait(false);
    }

    private async Job NestedAsyncMethodCall(Int32 testNum, Int64 expectedCorrID, Int32 depth)
    {
        // any asynchronous code execution
        await Job.Yield();

        // correlation ID remains the same as in the caller method above
        CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
        Output.WriteLine("NestedAsyncMethodCall<{0}>: Correlation ID for test {1} is {2}",
            depth,
            testNum,
            currentCorrId.CorrelationID);

        // go even deeper
        if (depth < 3)
            await NestedAsyncMethodCall(testNum, expectedCorrID, depth + 1).ConfigureAwait(false);
    }

注意:在*相同的操作*上下文中,*只有一个* CorrelationIdScope 实例将被创建和使用,直到异步操作*完成*。在同一操作的上下文中*每次*调用 CorrelationIdScope.Create() 时,都将返回*已存在的* CorrelationIdScope 实例。因此,它*仅在第一次调用时*生成标识符,并在整个操作期间保持*可访问*。通过这种方式,可以*跟踪*并*轻松过滤*单个操作在应用程序日志文件中的执行。

C# 中的并行代码执行机制

在本节中,我将尝试*阐明 .NET 中 async / await 的实现细节*。它不会非常详细,而是一个*高层次的执行流程分解*。希望借助 Armat.Threading 库的*源代码*,*调试和理解* TPL 背后的*魔法*应该会更容易。

当使用 await 关键字*调用*一个 async 方法时,它会使用*相应的 MethodBuilder* 开始*异步代码执行*。为此,该方法被*分解成段*,这些段由* await 语句*分区,如下所示

Armat.Threading 库中,Job 类被*装饰*为 [System.Runtime.CompilerServices.AsyncMethodBuilder(typeof(JobMethodBuilder))] 属性,该属性*指示*用于*返回 Job 类型*的异步方法的*MethodBuilder 类型*。

以下*序列*由 .NET 运行时在*异步方法执行期间*启动

  1. .NET 运行时会创建一个*方法构建器对象*来*编排方法执行*。请参阅 JobMethodBuilder.Create() 实现以获取参考。
  2. 接下来,会调用 MethodBuilder.Start(ref TStateMachine stateMachine) 来*开始函数执行*。委托 stateMachine.MoveNext() 指向*执行方法的第一个段*。实现必须*确保*在方法完成之前*恢复*调用线程的 ExecutionContextSynchronizationContext
    为此,我*重用了* AsyncTaskMethodBuilder.Start(stateMachine),它代表 TPL Task 的*方法构建器*(请参阅 void JobMethodBuilder.Start<TStateMachine>(ref TStateMachine stateMachine))。
    注意:在上图*中*,它对应于调用“*段 1*”委托。它*不*异步运行,而是*阻塞*调用线程直到*第一个 await 语句*。
  3. 一旦代码执行*到达第一个 await 语句*,就会创建一个 Awaiter 对象。为此,.NET 运行时会调用 await 语句*右侧*对象的 GetAwaiter() 方法。Awaiter 对象*旨在*异步调用*延续*(或*下一个段*)。Awaiter 的*职责*也是*捕获*调用线程的 ExecutionContext,并*选择性地*在*延续方法返回*后*恢复*它。
    注意:在上图*中*,来自“*段 1*”的 Job(或 Task)的 awaiter 用于*异步触发“段 2”延续*。
  4. 创建 awaiter 对象后,.NET 运行时会调用*MethodBuilder.AwaitOnCompleted*或*MethodBuilder.AwaitUnsafeOnCompleted*中的一个。Awaiter 会与*适当的 stateMachine* 一起*传递*给该方法。此时,委托 stateMachine.MoveNext() 指向*执行方法的下一部分*。Awaiter 的*职责*是*异步调用*该委托(在*线程池的某个线程*上)。
    注意:在*段内*存在循环的情况下(如“*段 3*”),*相同的延续方法*会*多次调用*,并具有*适当的堆栈状态*。同样重要的是要注意,*对方法*的分段*分解*并*不*精确地反映*实际情况*。事实上,执行会*一直持续到*到达*下一个 await 语句*,并且*可能跨越*“*段边界*”。例如,在提供的示例中,“*段 3*”的执行可能会*循环回*在 for 循环内执行“*段 2*”中的代码。
  5. 一旦* async* 方法执行*完成*,.NET 运行时会调用*MethodBuilder.SetResult*或*MethodBuilder.SetException*中的一个,以*报告方法完成结果*。此结果*设置*到由 MethodBuilder.Task 属性公开的*结果 Job* 中。
    注意:在方法*中间*返回值或抛出异常*会*中断执行,并将*使用 MethodBuilder 的相应设置器*将*适当的值*设置到*结果 Job* 中。

在上述每个步骤中都有*无数的细节*,一次性回忆所有这些细节可能并不可行。理解支撑 TPL 的 .NET 源代码*极具挑战性*;然而,如果您想*深入研究*,探索 Armat.Threading 库中的*几个类*可能是一种*更好的方法*。

摘要

我希望 Armat.Threading 库能*激发*您将其*集成*到您的项目中。我有一个,它对我来说*效果相当好*。请注意,我*几年前*创建了这个库,但直到现在才有机会或时间发布它。我在撰写本文时对其进行了一些*改进*,但它仍然*需要额外的润色*才能成为“*理想*”的版本 :)

历史

  • 版本 1:*第一个版本*。
  • 版本 2:*修复了文章标签和 GitHub 仓库 URL*。
  • 版本 3:*轻微修正了方法构建器执行流程*。
© . All rights reserved.