自定义任务计划程序:将您的任务工作项排队以在您想要的时间运行






4.59/5 (9投票s)
利用.NET任务框架中一些鲜为人知的领域,根据您自己的条件调度任务执行。
引言
.NET中的任务框架和TAP编程模式是为您的应用程序添加并发执行的强大工具。它们通常比线程更容易使用,如果您知道如何使用它们,它们至少能提供同样强大的功能。然而,其中一些最好的功能并没有得到很好的文档。其中一个功能是TaskScheduler
,它控制着您的任务何时执行。
我重新审视这个项目,并使用任务框架和内置线程池,结果陷入了困境。令我震惊的是,.NET(在我的系统上)为CPU密集型**操作分配了2047个潜在线程。我不确定它们是否真的被创建了,或者是否只是占位符,但即便如此,一个理想设计的应用程序应该拥有与核心数量完全相同的CPU密集型操作(或者通常是核心数量减一,因为主核心通常负责管理所有其他内容,可能还有一个UI)。底线是,如果您有占用CPU的操作,那么运行比您拥有的核心数量更多的操作并不能提高任何性能。鉴于此,我获得的每个进程可用线程的.NET值似乎是过高的。事实上,线程越多,调度程序的工作就越困难,这意味着您运行的线程数量超过CPU可以同时运行的数量时,性能会下降。这其中的教训是,创建尽可能少的线程,但不能更少。
一旦我无法从运行更多操作中获得额外性能,我宁愿将这些操作排队。内置的ThreadPool
并不真正关心这一点,并且尝试设置SetMaxThreads()
来改变这一点会影响整个进程,还有其他问题。使用我在上一篇文章中概述的技术是可行的,但它有点复杂,即使抽象化了,如今在.NET中,使用Task框架才是王道。因此,我需要一些东西来允许我使用Tasks,但要以一种我可以控制同时运行多少个任务的方式。最终,我偶然发现了TaskScheduler
,甚至找到了一些示例代码,用于通过子类化它来改变行为,但除此之外,它的文档并不是很完善。在这里,我将向您介绍这种技术。
** CPU密集型操作是指占用处理器,使其进行大量计算的操作。与此相对的是I/O密集型操作,它大部分时间都在等待外部设备交互,例如硬盘的磁盘读取或网络控制器的传入数据包。这种区别很重要,因为根据您与多少设备通信,有充分的理由让I/O密集型操作多于核心数量。I/O密集型操作不占用CPU周期(除了微小的开销使其工作)。
概念化这个混乱的局面
正如我所说,ThreadPool
会预先分配大量的线程(或者至少是它们的占位符),并且基本上取消了对长时间运行操作的排队,因为通常您会有可用的线程,即使您没有空闲的核心来运行它——或者至少这是默认行为。我的系统上的这种设置让我每个核心每个进程在池中大约有1023个线程。这太荒谬了。即使我的应用程序由于某些我无法理解的原因有那么多长时间运行的CPU密集型任务,我也绝不会希望尝试一次性执行所有这些任务!不,我想要做的是分配一定数量的工作线程,然后将这些工作线程专门用于在它们可用时完成任务,以便任务排队等待完成,即使一次正在完成多个任务。这样做的主要原因之一是性能,但也可以简单地是一个可管理性问题。您同时处理的任务越多,您的应用程序就越难驾驭,资源消耗也越大。这不是我们想要的。
幸运的是,如果您愿意使用TaskFactory.StartNew()
而不是Task.Run()
,Task框架为您提供了自定义调度的方法。为了方便这一点,我们必须将自定义的TaskScheduler
作为参数传递给TaskFactory
的构造函数。但是,首先我们需要一个TaskScheduler实现来传递。Microsoft提供了两个,但都不符合我们的要求。然而,我链接的示例代码非常接近。经过一番努力,我能够相当忠实地重现我之前应用程序的行为,但使用了这种完全不同的范例。
基本上,它提供了任务排队和出队,以及正常执行或在当前线程上执行任务的功能。它还包含一个供调试器使用的成员,用于枚举任务。我们将在本文中实现大部分功能。
一旦我们完成,应用程序的其余部分就很简单了。它只是在单击按钮时生成一个新任务,并将任务链接到一个添加到显示屏上的进度控件。调度程序处理任务何时运行。我们使用匿名委托来链接控件,这比我最初的应用程序简单得多,尽管这是一个特殊的使用情况,并且该应用程序的基础设施支持的不仅仅是调度,而这只是一个“一招鲜”。尽管如此,它仍然是一个有用的“小马”和一个很棒的技巧。
编写这个混乱的程序
用户界面
首先让我们看看用户界面。它的核心在于“Enqueue Work”按钮的点击处理程序。
var wpc = new WorkerProgressControl(_nextTaskId++);
ProgressPanel.SuspendLayout();
ProgressPanel.Controls.Add(wpc);
wpc.Dock = DockStyle.Top;
ProgressPanel.ResumeLayout(true);
new TaskFactory(_taskSched).StartNew(()=> {
// TODO: replace with actual work
for(var i = 0;i<50;++i)
{
Thread.Sleep(100);
BeginInvoke(new Action(()=> {
wpc.Value = i * 2;
}));
}
wpc.Value = 100;
});
_taskSched
是我们的自定义任务调度程序实例。请注意我们是如何在加粗行中将其传递给 TaskFactory
的构造函数的。这允许我们将标准调度行为替换为我们限制并发任务数量的行为。在委托中,包含着我们长时间运行任务执行的“工作”。请注意我们如何使用 BeginInvoke()
在其中定期更新 UI。这是为了以线程安全的方式报告我们的进度。这与上一篇文章中的应用程序的工作方式有点不同,但这次它比上一篇文章中更容易。
我们的WorkerProgressControl
基本上只是一个进度条和一个标签,就像上一篇文章中一样。我们将它们停靠,随着添加更多任务而创建一个列表。与上一个应用程序不同,我们可以通过使用其匿名方法提升功能,让C#编译器管理我们控件的查找。基本上,我们只需从匿名方法内部引用wpc
,它就会像魔法一样工作——就像它一直以来一样。
这里的好处是,不需要进行任何任务调度工作。所有这些都由 _taskSched
处理。我们的 UI 代码不需要关心,这使得它非常简单。用户界面应该只是连接器,而不是更多。让我们把调度逻辑放在它该放的地方!
任务调度器
说到我们的自定义TaskScheduler
,现在就来看看它。我们用它来跟踪当前任务,获取下一个任务,执行任务,并收集各种任务统计信息。其中大部分是无耻地抄袭了我前面提到的微软示例代码,但我对其进行了修改,以收集更多统计信息,并允许您在对象生命周期内更改允许的并发量,而不仅仅是在创建时。让我们深入了解一下,从我们的成员字段开始。
// Indicates whether the current thread is processing work items.
[ThreadStatic]
static bool _currentThreadIsProcessingItems;
// The list of tasks to be executed.
readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
// The maximum concurrency level allowed by this scheduler.
int _maximumTaskCount = Math.Max(1,Environment.ProcessorCount-1);
// Indicates the currently processing work items.
int _pendingTaskCount = 0;
// Indicates the waiting task count.
int _waitingTaskCount = 0;
首先,我们有一个标志,指示当前线程是否正在处理工作项。这个标志是按线程设置的,并在我们尝试“内联”任务以在同一线程上运行时进行检查。否则,它对我们没有多大用处。
其次,我们有一个链表来存储我们的任务。我们使用lock
锁定对它的所有访问。我一开始认为这很糟糕,并准备用更精简的同步机制替换整个东西,但当我阅读了一些资料后,我发现在这种情况下,由于线程访问模式,锁在这个场景中的性能与大多数其他选项一样好或更好,并且比任何非平凡的选项都好。曾几何时,我可能发誓只有在枪口下才会使用它,但最终,我们还是用上了。链表是合适的,因为任务的添加和移除频率相同,并且移除发生在第一个节点。
现在,我们有了最大任务数,它会尝试根据您机器的硬件将其设置为理想值。基本上,它希望使用您的所有核心减一,最小值为一。
最后,我们有 _pendingTaskCount 和 _waitingTaskCount,它们分别跟踪正在运行**的项目数量和正在等待的项目数量。
** 某种程度上。实际上,我们在任务完成运行之前,这个值就会被递减。有一种方法可以准确地跟踪任务的结束,但效率要低得多。将此值视为一个估计值。
现在,让我们来利用这些字段。我们遇到的第一个非平凡成员是QueueTask()
,其实现如下:
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
++_waitingTaskCount;
if (_pendingTaskCount < _maximumTaskCount)
{
++_pendingTaskCount;
_NotifyThreadPoolOfPendingWork();
}
}
它所做的只是锁定列表,添加一个任务并更新我们的统计数据。如果它小于我们允许的任务数量,我们就会增加挂起任务的数量并通知线程池,这就会引出_NotifyThreadPoolOfPendingWork()
。
ThreadPool.UnsafeQueueUserWorkItem((object state) =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (0 == _tasks.Count)
{
--_pendingTaskCount;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
--_waitingTaskCount;
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
这更实质一些。首先,我们调用 UnsafeQueueUserWorkItem()
。您可能会想知道为什么我们不调用 QueueUserWorkItem()
,我也曾有过同样的疑问,但后来我找到了这个。我们没有调用 ExecutionContext.Capture()
,这是一个开销很大的调用。缺点是当您的代码在受限环境中运行时,这很重要,因为它可能会提升权限。性能是我能找到的唯一原因,解释了微软示例代码中发现的这个调用。查看他们代码的其余部分,它比我预期的示例实现得更好,所以我坚持使用这个调用。推测性能回报是值得的,尽管对于长时间运行的任务我不会这么认为,或者更重要的是,在这种情况下,使用更传统的方法可能存在我不知道的陷阱。我不能确定,所以讽刺的是,我通过坚持使用 UnsafeQueueUserWorkItem()
来确保安全!
在匿名方法内部,我们必须完成我们的工作,所以我们设置了线程标志,表示我们正在忙于处理,然后我们遍历任务。请注意,我们在循环内部进行了锁定。我们这样做是为了将base.TryExecuteTask(item)
放在锁的外面,这对于性能和防止潜在的死锁非常重要。在锁内部,我们更新统计数据,获取时间,并将其从任务列表中删除。最后,我们退出锁。完成后,我们将线程标志设置回空闲状态。
我们这里不必使用系统线程池。如果您希望它始终阻塞,可以将任务调度为在原始线程甚至同一线程上执行。这一切都取决于您如何实现TaskScheduler
。
接下来,我们有TryExecuteInline()
,它试图在当前线程上执行任务。
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
// Try to run the task.
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
首先,我们在这里做的是检查我们是否正在处理,因为要使这工作,我们必须正在处理,否则我们根本不会从这个线程获得机会。如果我们正在处理,那么我们查看任务是否已在队列中。如果是,我们立即移除并运行它,否则如果它不在队列中,我们返回 false。最后,如果它以前没有排队,我们只需运行任务。移除是为了避免任务运行两次。
TryDequeue()
只是尝试删除一个项。
lock (_tasks)
{
if (_tasks.Remove(task))
{
--_waitingTaskCount;
return true;
}
return false;
}
除了锁定列表和删除之外,我们做的唯一其他事情是更新等待统计数据。
最后,我们有 GetScheduledTasks()
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
关于这个方法,它是为调试器设计的,我们不能在调试器线程上阻塞,所以我们只是尝试进入锁,如果不能,就抛出异常。奇怪的是,调试器处理异常比阻塞调用更好,而且方法的文档也明确指出了这一点。
就是这样。实际上,它并不复杂。这只是一个知道要寻找什么的问题。
历史
- 2020年7月18日 - 首次提交