强大的 .NET TPL 替代方案





5.00/5 (6投票s)
Armat.Threading 是一个基于 .NET TPL 机制的异步代码执行库
引言
异步代码执行机制一直是编程中最复杂领域之一。随着 .NET 框架引入 TPL,并行编程技术变得对更广泛的开发者可用,因此也使 .NET 成为许多人选择的框架。
作为一种通用的异步编程框架,TPL 解决了大多数现实生活中的问题。它可以用于
- 异步运行用户发起的*操作* - 从而提供流畅的用户体验,
- 触发*并行*代码执行 - 从而充分利用硬件资源,
- 构建*相互依赖的任务图* - 从而显著降低代码复杂度,
- 以及可能还有更多我不想赘述的场景。
虽然上述情况足以满足大多数应用程序的需求,但偶尔我也会遇到标准 .NET 工具难以克服的限制。在我开发 C# 项目的过程中,我选择退一步,为其中一些问题提供一个通用的解决方案。特别是,我需要
- 一种在*进程内创建多个线程池*的方法,
- 一种*跨异步方法调用传递用户上下文*的机制,
- *异步代码执行的日志记录/可追溯性*,
- 以及最重要的是,*更好地理解 TPL 内部原理*。
本文将深入探讨 Armat.Threading 库,并提供其实际使用示例。此外,它还将阐明 .NET 中异步编程的实现细节,这一领域通常缺乏文档,特别是与 TPL 内部原理相关的部分。
Armat.Threading 库
本文分为两部分。第一部分侧重于 Armat.Threading 库的*使用*,并附带一些*实际示例*,第二部分旨在揭示 .NET 中用于支持 async / await 语法的*异步编程机制*。
使用 Armat.Threading 执行并行代码
Job 类
Armat.Threading.Job
类对应于 TPL 中的 System.Threading.Tasks.Task
。它具有与 Task
类相似的接口,并且是 Armat.Threading
库的核心异步执行单元。Armat.Threading.Job<T>
类对应于 TPL 中的 System.Threading.Tasks.Task<T>
。它继承自非泛型的 Job
类,并具有与 Task<T>
相似的接口。Job<T>
类从 Job
驱动大部分功能,用于执行最终返回 T
类型结果的异步方法。
以下是使用 Armat.Threading
库*异步执行代码*的一些示例
// Run a Job with no args and no return value
await Job.Run(
() ->
{
Console.WriteLine("Running job asynchronously");
}).ConfigureAwait(false);
// Run a Job with args but no return value
await Job.Run(
(double number) ->
{
Console.WriteLine("Running job asynchronously with arg {0}", number);
}, 0.1248).ConfigureAwait(false);
// Run a Job with no args and returning T
T result = await Job<T>.Run(
() ->
{
Console.WriteLine("Running job asynchronously");
return default(T);
}).ConfigureAwait(false);
// Run a Job with args and returning T
T result = await Job<T>.Run(
(double number) ->
{
Console.WriteLine("Running job asynchronously with arg {0}", number);
return new T(0.1248));
}, 0.1248).ConfigureAwait(false);
运行 Job
有多种方式。最简单的方式是使用静态 Run
方法(如上所示)。也可以使用构造函数之一创建 Job
对象实例。以这种方式创建的 Job 必须稍后通过调用 Run
方法的相应重载来执行,如下所示
public Int32 Sum(Int32[] array)
{
// Create the Job
Job<Int32> asyncJob = new Job<Int32>(SumFn, array);
// Run asynchronously
asyncJob.Run();
// Wait for it to finish and return the result
return asyncJob.Result;
}
private Int32 SumFn(Object? args)
{
Int32[] array = (Int32[])args;
return array.Sum();
}
每个 Job
实例都有一个唯一的*自动递增*的 Id
,类型为 Int64
。这些可用于*调试、日志记录或跟踪*目的。选择使用 Int64
标识符是基于以下几点
- 它的大小相对较小,并且不需要堆内存分配
- 生成新标识符(递增
Int64
数字)不应影响性能 - 即使在*长时间运行的应用程序*中也不会溢出。假设进程内创建 Job 的速率为 1
Job/毫秒
,那么需要大约 300,000,000 年才能溢出。
可以通过静态属性 Armat.Threading.Job.Current
来识别*当前运行*的 Job
实例(如果当前代码不在任何 Armat.Threading
线程的上下文中运行,则为 null)。这样,执行代码就可以访问任何 Job
属性(如 AsyncState
、CancellationToken
、Initiator
)来定义自己的行为。
注意:我不会坚持认为公开当前正在运行的 Job
实例并使用静态属性是最好的设计选择。虽然它简化了代码(例如,您不必显式地将 CancellationToken
传递给嵌套方法以支持取消),但它也允许在 Job 上下文中运行的任何方法读取它可能不应该访问的信息。它也可能由于隐式访问数据的方式而使代码变得模糊。无论如何,它解决了许多问题,并为进一步扩展 Armat.Threading
库创造了广泛的机会。我强烈建议使用者尽可能少地使用此访问器,以保持代码的简洁和可读性。
Initiator
属性可用于识别*触发当前 Job* 的 Job
。通过递归遍历发起者 Job,可以导航 Job 层级结构,直到 Root
。
考虑到 Armat.Threading.Job
类的*整个接口*在很大程度上与 .NET CLR 的 System.Threading.Tasks.Task
相同,我将不在此详述。而且,类属性和方法名称似乎也相当具有描述性。
JobScheduler
IJobScheduler(抽象)
Armat.Threading.IJobScheduler
接口定义了 Job 调度程序的行为。下面是 Job 调度程序接口的声明
public interface IJobScheduler : IDisposable
{
// Static property returning the default instance of IJobScheduler.
// This is the one to be used for running the Jobs by default.
static IJobScheduler Default { get; }
// Static property returning the current instance of IJobScheduler to be used for Jobs execution
// If not null, it will used for running Jobs instead of IJobScheduler.Default
static IJobScheduler Current { get; }
// Enqueues a Job in a scheduler.
// To successfully enqueue a Job in a JobScheduler one must have Job.Status = JobStatus.Created (never run before).
void Enqueue(Job job);
// Cancels Job execution in the JobScheduler before it begins.
// The method will fail (will return false) if the Job is already running or is finished.
Boolean Cancel(Job job);
// The property returns number of jobs currently waiting in the queue.
// It may be used to monitor the current load on the JobScheduler.
Int32 PendingJobsCount { get; }
// Makes IJobScheduler.Current to refer to this instance the for the executing thread.
// IJobScheduler.Current is reset to the previous value once the returned JobSchedulerScope is Disposed.
JobSchedulerScope EnterScope();
}
该接口*极简*且*自述*。
使用给定的 IJobScheduler
实例运行异步 Job 的一种方法是使用 IJobScheduler.EnterScope
方法。它允许在*调用方法的范围内*覆盖*默认*的 Job 调度程序。以下示例说明了如何实现这一点
private async Job<Int64> JobExecutionInCustomScheduler(IJobScheduler otherScheduler)
{
// After this line all Jobs will be executed by the otherScheduler (unless overridden by another one)
// This will make IJobScheduler.Current to refer to the otherScheduler
// Disposing the otherScope will result in restoring the previous value of IJobScheduler.Current
using var otherScope = scheduler.EnterScope();
// create the Job
Job<Int32> asyncJob = new Job<Int32>(SumFn, new Int32[] { 1, 2, 3 });
// Run asynchronously
asyncJob.Run();
// async wait for it to finish and return the result
return await asyncJob.ConfigureAwait(false);
}
private Int32 SumFn(Object? args)
{
Int32[] array = (Int32[])args;
return array.Sum();
}
JobSchedulerBase(构建块)
抽象类 JobSchedulerBase
实现 IJobScheduler
接口。它被设计为所有可能的 Job 调度程序实现的*基类*。JobSchedulerBase
提供了设置 IJobScheduler
接口的 Default
属性的方法,并且还为派生类提供了*受保护的* Job
执行 API。
public abstract class JobSchedulerBase : IJobScheduler
{
// Gets or sets the default <code>IJobScheduler</code>. If not set, the <code>JobScheduler.Default</code> is returned.
// Note: To protect setting the default <code>JobScheduler</code> by an arbitrary code, the setter is made protected, thus requiring a public setter in a derived class.
// Note: <code>JobScheduler.Default</code> can be set only once during process lifetime (do this at the application initialization phase if necessary).
public static IJobScheduler Default { get; protected set; }
// updates the status of Job
// can be used in derived scheduler implementations to control Job status (Job.UpdateStatus is internal to Armat.Threading library)
protected Boolean UpdateJobStatus(Job job, JobStatus newStatus, JobStatus prevStatus);
// this method must be used when executing jobs in a scheduler
// in sets the IJobScheduler.Current property to this during the job execution
// can be used in derived scheduler implementations to control Job status (Job.ExecuteProcedure is internal to Armat.Threading library)
protected JobStatus ExecuteJobProcedure(Job job);
// this method must be used when executing job continuations in a scheduler
// in sets the IJobScheduler.Current property to this during the job execution
// can be used in derived scheduler implementations to control Job status (Job.ExecuteJobContinuations is internal to Armat.Threading library)
protected Int32 ExecuteJobContinuations(Job job);
}
注意:提供 JobScheduler 的抽象基类的决定似乎与最佳设计实践相悖。特别是,这使得在不继承 JobSchedulerBase
的情况下提供任何其他 IJobScheduler
实现成为不可能。我选择此解决方案是为了使用 internal
访问修饰符来保护某些 Job
方法,然后通过相应的 JobSchedulerBase
方法使其可访问。这确保了 Job
执行方法*仅在同一程序集中*可访问,从而提供额外的安全层和对如何使用它们的控制。它还保证在 Job
执行期间*设置和重置* IJobScheduler.Current
属性。
JobScheduler(实现)
JobScheduler
类继承自 JobSchedulerBase
,并提供了异步 Job 调度机制的*默认实现*。它管理*线程池*和已提交*Jobs*的*队列*,并提供有关 Job
执行的*一些统计信息*。
通过使用构造函数的相应重载,可以配置 JobScheduler
,为其指定*名称*,使用*特定的最小/最大线程数*,并*限制队列中待处理 Jobs 的大小*。JobScheduler
将*常规*和*长时间运行*的 Job 分别管理在*不同的队列*中,并使用*不同的线程池*执行它们,从而确保不会因长时间运行的 Job 而*阻塞*较小的 Job。
以下是*显式使用* JobScheduler
*运行 Job* 的两个*相同*的示例
public class AsyncExecutor
{
private JobScheduler myScheduler = new("Scheduler of ProcA");
public async void RunInOwnScheduler_Example1()
{
// create a job
Job job = new(ProcA);
// run it within own scheduler
job.Run(myScheduler);
// wait for the job to complete
await job.ConfigureAwait(false);
}
public async void RunInOwnScheduler_Example2()
{
// create a job
Job job = new(ProcA);
// enqueue the job in own scheduler
myScheduler.Enqueue(job);
// wait for the job to complete
await job.ConfigureAwait(false);
}
private async void ProcA()
{
// do something
}
}
JobRuntimeScope
范围的定义
Armat.Threading.JobRuntimeScope
类代表*异步操作的范围*。该范围在 JobRuntimeScope
对象*实例化*时开始,并在其*处理*(通常在异步操作完成后)时结束。它代表*字符串键*和*对象值*的*对*。JobRuntimeScope
对象*可以在异步代码执行期间*(无论异步调用的数量和深度如何)通过*静态访问器*检索。
使用 JobRuntimeScope
可以将*参数传递*给嵌套的异步方法,从而提供有关调用的*上下文信息*。JobRuntimeScope
的一些*好的使用示例*包括
- 识别 Job 的*相关性*,
- *跟踪*异步代码执行,
- 将*上下文信息*传递给嵌套方法。
Armat.Threading.JobRuntimeScope
由以下成员组成
public static JobRuntimeScope Enter(String key, Func<Object> factory)
使用给定的*键*实例化JobRuntimeScope
类型的对象,并使用工厂方法初始化*值*属性。
注意:如果*键*已在当前范围内定义,则会返回现有的JobRuntimeScope
实例,并且*永远不会调用*工厂方法来创建新的实例。public static JobRuntimeScope Enter<T>(String key, Func<T> factory)
表示JobRuntimeScope.Enter
方法的*重载泛型版本*,它可以存储*类型 T 的值*。public static JobRuntimeScope Enter<T>(Func<T> factory)
表示JobRuntimeScope.Enter
方法的*重载泛型版本*,它使用*类型 T* 作为创建范围的*键*。public static JobRuntimeScope EnterNew(String key, Func<Object> factory)
使用给定的*键*实例化JobRuntimeScope
类型的对象,并使用工厂方法初始化*值*属性。
注意:如果*键*已在当前范围内定义,则返回JobRuntimeScope.Null
以指示失败结果,并且*永远不会调用*工厂方法来创建新的实例。public static JobRuntimeScope EnterNew<T>(String key, Func<T> factory)
表示JobRuntimeScope.EnterNew
方法的*重载泛型版本*,它可以存储*类型 T 的值*。public static JobRuntimeScope EnterNew<T>(Func<T> factory)
表示JobRuntimeScope.EnterNew
方法的*重载泛型版本*,它使用*类型 T* 作为创建范围的*键*。public static Object? GetValue(String key)
返回当前范围中*给定键*的*值*。如果*未找到键*的作用域,则返回*null*。public static T? GetValue<T>(String key)
返回当前范围中*给定键*的*值*,如果*未找到键*的作用域,则返回*null*。如果*值*不能赋值给*类型 T*,则会引发异常。public static T? GetValue<T>()
表示JobRuntimeScope.GetValue
方法的*重载泛型版本*,它使用*类型 T* 作为*键*。public void Leave()
通过*移除相应键*来*退出*当前范围。public void Dispose()
如Leave()
方法所述,*退出*当前范围。它被设计为与*using 关键字*一起调用,以确保在*退出方法范围*时进行*正确处理*。public Boolean IsNull
对于*null(无效)范围实例*,返回*true*。在上述EnterNew
方法中,当*无法进入给定范围*时,可以返回 Null 范围。- public String Key { get; }
返回JobRuntimeScope
对象的*键*。 - public Object Value { get; }
返回JobRuntimeScope
对象的*值*。
我将不尝试在此文中描述 JobRuntimeScope
类的*实现细节*,因为它在阅读源代码本身时似乎很清楚。主要思想在于将 JobRuntimeScope
对象*存储在 ThreadLocal 缓存中*,并确保通过在*该线程中*实例化的 Job 来*传播*它。
JobRuntimeScope
的*一些使用示例*作为*单元测试*在此处提供 。以下代码说明了如何使用它
private async Job DoSomething()
{
// run some user scoped operation
await RunUserScopedOperation().ConfigureAwait(false);
// user data is null here because there's JobRuntimeScope object
// has been defined and disposed within RunUserScopedOperation method
UserData? userData = JobRuntimeScope.GetValue<UserData>();
}
private async Job RunUserScopedOperation()
{
// create the scope with some UserData information
// 'using' keyword guarantees to have the scope Disposed when exiting the method
using var scope = JobRuntimeScope.Enter<UserData>(() => new UserData("abc", "123"));
// user data refers to the one declared above
UserData? userData1 = JobRuntimeScope.GetValue<UserData>();
// run any asynchronous operation
// UserData will be accessible in all inner synchronous or asynchronous methods
// irrespective of the thread running the method
await AsyncOperation().ConfigureAwait(false);
// user data remains the same as above
UserData? userData2 = JobRuntimeScope.GetValue<UserData>();
}
private async Job AsyncOperation()
{
// user data remains the same as created in the caller method
UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
// running some asynchronous operations
await Job.Yield();
// user data remains the same as created in the caller method
UserData? userData3 = JobRuntimeScope.GetValue<UserData>();
}
因此,JobRuntimeScope
提供了一种*通用的机制*,用于将*用户定义的上下文信息*传递给在 Job
上下文中运行的代码。
CorrelationIdScope
Armat.Threading.CorrelationIdScope
类是 JobRuntimeScope
的*可能值类型之一*。它生成*自动递增的 64 位 ID*,用于*跨异步操作的相关性*(用于日志记录、跟踪或任何其他需求)。它还提供方便的*工厂方法*,用于使用*新的 CorrelationIdScope 值*实例化 JobRuntimeScope
,如以下示例所示(相应的单元测试*在此处*提供 )
private async Job RunCorrelationIDTest(Int32 testNum)
{
// this will create correlation ID and the appropriate scope
using var scope = CorrelationIdScope.Create();
// any asynchronous code execution
await Job.Yield();
// correlation ID is available here (after async method invocation)
CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
Output.WriteLine("RunCorrelationIDTest: Correlation ID for test {0} is {1}",
testNum,
currentCorrId.CorrelationID);
// nested async method invocations
await NestedAsyncMethodCall(testNum, CorrelationIdScope.Current()!.CorrelationID, 1).ConfigureAwait(false);
}
private async Job NestedAsyncMethodCall(Int32 testNum, Int64 expectedCorrID, Int32 depth)
{
// any asynchronous code execution
await Job.Yield();
// correlation ID remains the same as in the caller method above
CorrelationIdScope currentCorrId = CorrelationIdScope.Current()!;
Output.WriteLine("NestedAsyncMethodCall<{0}>: Correlation ID for test {1} is {2}",
depth,
testNum,
currentCorrId.CorrelationID);
// go even deeper
if (depth < 3)
await NestedAsyncMethodCall(testNum, expectedCorrID, depth + 1).ConfigureAwait(false);
}
注意:在*相同的操作*上下文中,*只有一个* CorrelationIdScope
实例将被创建和使用,直到异步操作*完成*。在同一操作的上下文中*每次*调用 CorrelationIdScope.Create()
时,都将返回*已存在的* CorrelationIdScope
实例。因此,它*仅在第一次调用时*生成标识符,并在整个操作期间保持*可访问*。通过这种方式,可以*跟踪*并*轻松过滤*单个操作在应用程序日志文件中的执行。
C# 中的并行代码执行机制
在本节中,我将尝试*阐明 .NET 中 async / await 的实现细节*。它不会非常详细,而是一个*高层次的执行流程分解*。希望借助 Armat.Threading
库的*源代码*,*调试和理解* TPL 背后的*魔法*应该会更容易。
当使用 await 关键字*调用*一个 async 方法时,它会使用*相应的 MethodBuilder* 开始*异步代码执行*。为此,该方法被*分解成段*,这些段由* await 语句*分区,如下所示
在 Armat.Threading
库中,Job
类被*装饰*为 [System.Runtime.CompilerServices.AsyncMethodBuilder(typeof(JobMethodBuilder))]
属性,该属性*指示*用于*返回 Job 类型*的异步方法的*MethodBuilder 类型*。
以下*序列*由 .NET 运行时在*异步方法执行期间*启动
- .NET 运行时会创建一个*方法构建器对象*来*编排方法执行*。请参阅
JobMethodBuilder.Create()
实现以获取参考。 - 接下来,会调用
MethodBuilder.Start(ref TStateMachine stateMachine)
来*开始函数执行*。委托stateMachine.MoveNext()
指向*执行方法的第一个段*。实现必须*确保*在方法完成之前*恢复*调用线程的 ExecutionContext 和 SynchronizationContext。
为此,我*重用了*AsyncTaskMethodBuilder.Start(stateMachine)
,它代表 TPL Task 的*方法构建器*(请参阅void JobMethodBuilder.Start<TStateMachine>(ref TStateMachine stateMachine)
)。
注意:在上图*中*,它对应于调用“*段 1*”委托。它*不*异步运行,而是*阻塞*调用线程直到*第一个 await 语句*。 - 一旦代码执行*到达第一个 await 语句*,就会创建一个
Awaiter
对象。为此,.NET 运行时会调用 await 语句*右侧*对象的GetAwaiter()
方法。Awaiter 对象*旨在*异步调用*延续*(或*下一个段*)。Awaiter 的*职责*也是*捕获*调用线程的 ExecutionContext,并*选择性地*在*延续方法返回*后*恢复*它。
注意:在上图*中*,来自“*段 1*”的 Job(或 Task)的 awaiter 用于*异步触发“段 2”延续*。 - 创建 awaiter 对象后,.NET 运行时会调用*
MethodBuilder.AwaitOnCompleted
*或*MethodBuilder.AwaitUnsafeOnCompleted
*中的一个。Awaiter 会与*适当的 stateMachine* 一起*传递*给该方法。此时,委托stateMachine.MoveNext()
指向*执行方法的下一部分*。Awaiter 的*职责*是*异步调用*该委托(在*线程池的某个线程*上)。
注意:在*段内*存在循环的情况下(如“*段 3*”),*相同的延续方法*会*多次调用*,并具有*适当的堆栈状态*。同样重要的是要注意,*对方法*的分段*分解*并*不*精确地反映*实际情况*。事实上,执行会*一直持续到*到达*下一个 await 语句*,并且*可能跨越*“*段边界*”。例如,在提供的示例中,“*段 3*”的执行可能会*循环回*在 for 循环内执行“*段 2*”中的代码。 - 一旦* async* 方法执行*完成*,.NET 运行时会调用*
MethodBuilder.SetResult
*或*MethodBuilder.SetException
*中的一个,以*报告方法完成结果*。此结果*设置*到由MethodBuilder.Task
属性公开的*结果 Job* 中。
注意:在方法*中间*返回值或抛出异常*会*中断执行,并将*使用 MethodBuilder 的相应设置器*将*适当的值*设置到*结果 Job* 中。
在上述每个步骤中都有*无数的细节*,一次性回忆所有这些细节可能并不可行。理解支撑 TPL 的 .NET 源代码*极具挑战性*;然而,如果您想*深入研究*,探索 Armat.Threading
库中的*几个类*可能是一种*更好的方法*。
摘要
我希望 Armat.Threading
库能*激发*您将其*集成*到您的项目中。我有一个,它对我来说*效果相当好*。请注意,我*几年前*创建了这个库,但直到现在才有机会或时间发布它。我在撰写本文时对其进行了一些*改进*,但它仍然*需要额外的润色*才能成为“*理想*”的版本 :)
历史
- 版本 1:*第一个版本*。
- 版本 2:*修复了文章标签和 GitHub 仓库 URL*。
- 版本 3:*轻微修正了方法构建器执行流程*。