65.9K
CodeProject 正在变化。 阅读更多。
Home

可等待的并发优先队列

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2017 年 12 月 30 日

CPOL

5分钟阅读

viewsIcon

14326

downloadIcon

1052

可选可等待的、易于使用的并发优先级队列。

引言

一般的优先级队列是一个非常吃力不讨好且庞大的话题。那些了解高级数据结构的人会首先想到通用实现的计算复杂性,而不了解的人则会被“队列”这个词弄糊涂,以为是先进先出 (FIFO) 的意义,但它们比 FIFO 更复杂。

为了立即简化问题,我的目的不是涵盖通用的优先级队列,而是实现具有有限数量的不同强类型优先级级别的优先级队列。限制通用优先级队列的功能,如果体现在实现中,会带来有趣的复杂性降低,更重要的是,会降低入队/出队(enqueue/dequeue)操作的时间复杂性。

这里提出的优先级队列实现将是像带宽管理、命令执行优先级排序和来自不同源事件的基于优先级的合并等场景的绝佳选择。简单地说,就是我们在开发过程中就知道优先级级别的场景。能够在许多线程的重度并发环境中茁壮成长是锦上添花。

这里提出的实现将不适用于经典的通用优先级队列应用程序,这些应用程序中的优先级在代码开发过程中通常是未知的,例如离散事件模拟、Dijkstra 算法、Huffman 编码等。

要求

  • 无锁 (Lock-Free),
  • 高度并发 (Highly Concurrent),
  • 存储项类型通用 (generic in stored item type),
  • 优先级类型通用,但限制为 .net枚举表示的优先级,强类型优先级 (generic in priority type, but constrained to priorities represented by .net enum, strongly typed priority),
  • 在构造时显式定义降序优先级 (explicitly defined descending order of priorities during construction),
  • 能够检测项计数和每个优先级级别的项计数 (ability to detect items count and per priority level items count),
  • 能够出队 - 降序优先级 (ability to dequeue - descending order of priorities),
  • 能够覆盖出队优先级级别 (ability to override dequeue priority level),
  • 潜在的可等待 (potentially awaitable),
  • 潜在的基于优先级的可等待 (potentially priority based awaitable),

这些要求能够产生一个受限制的优先级队列,能够控制每个优先级级别的项计数,从而避免溢出问题。

实现

实现将分为两个类,第一个类PriorityQueueUC将涵盖并发优先级队列,不带可等待功能,第二个类PriorityQueueNotifierUC将继承PriorityQueueUC类并扩展该类以启用对入队数据的等待。将可等待部分移到自己的类中是为了性能考虑。它增加了额外的同步层,在某些情况下是不必要的。

两种实现都需要一个表示优先级的枚举

public enum QueuePriority
{
    Lower,
    Normal,
    Higher
}

两种实现都需要显式定义降序的优先级顺序

之所以必须显式进行此操作,是因为反射不保证在枚举块中以与编码相同的顺序反映项的顺序。
我们不能依赖枚举元素的数值,因为它们可以被覆盖!

构造函数将降序优先级作为IEnumerable<TPriority>来接收,因此我们可以使用数组等。

QueuePriority[] descendingPriorities =
{
    QueuePriority.Higher,
    QueuePriority.Normal,
    QueuePriority.Lower
};

由于继承关系,两个类都实现了这个接口。

public interface IPriorityQueueUC<TPriority, TItem> where TPriority : struct
{
    void Enqueue(TPriority priority, TItem item);
    void Enqueue(TPriority priority, IList<TItem> items);
    void Enqueue(TPriority priority, IEnumerable<TItem> items);
    bool TryDequeu(out TItem item, TPriority? priority = null);
    bool TryDequeu(out TItem item, out TPriority? priority, TPriority? requestedPriority = null);
    int Count(TPriority? requestedPriority = null);
    bool HasItems(TPriority? requestedPriority = null);
}

Enqueue

操作很简单,接收TPriority和项作为TItemILista<TItem>IEnumerable<TItem>,以确保从不同集合高效加载。

TryDequeu (尝试出队)

bool TryDequeu(out TItem item, TPriority? priority = null);
如果成功则返回 true,结果存储在out TItem中,如果第二个参数TPriority?为 null,则使用降序优先级直到找到项或返回 false。
如果第二个参数不为 null,则请求覆盖的优先级。

bool TryDequeu(out TItem item, out TPriority? priority, TPriority? requestedPriority = null);
如果成功则返回 true,结果存储在out TItem中,结果存储在out TPriority?中,并且能够用请求的优先级TPriority覆盖降序优先级。


Count (计数)

Count 操作返回整个集合的项数,或者如果TPriority?被覆盖,则返回特定优先级的计数。
使用此功能会受到并发执行的入队和出队操作的影响,因为它们会影响该值。
请记住,Count 和 HasItems 与 .Net ConcurrentQueue 的 Count 非常相似。

HasItems (是否包含项)
HasItems 根据集合或请求的优先级返回 true 或 false。
使用此功能会受到并发执行的入队和出队操作的影响,因为它们会影响该值。
请记住,Count 和 HasItems 与 .Net ConcurrentQueue 的 Count 非常相似。

然后,当你开始迭代 2(这是构建迭代的开始)时,你可能想要复制测试用例并将它们重新分类到迭代 2。这还允许对测试用例进行粒度跟踪,并允许你说某个测试用例在一个迭代中是准备好的,但在另一个迭代中不是。同样,如何做到这一点取决于你以及你希望如何报告。 “场景”部分提供了更多细节。

两种版本的类构造都很简单。

public enum QueuePriority { Lower, Normal, Higher }

QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };

var pQueue = new PriorityQueueUC<QueuePriority , string>(descendingPriorities);
var pQueueNotifier = new PriorityQueueNotifierUC<QueuePriority, string>(descendingPriorities);

 

可等待的并发优先队列

现在,可等待版本的区别在于它扩展了原始接口IPriorityQueueUC<TPriority, TItem>

public interface IPriorityQueueNotifierUC<TPriority, TItem> : IPriorityQueueUC<TPriority, TItem>
where TPriority: struct
{
       ICompletionUC EnqueuedItemsAsync();
       ICompletionUC EnqueuedItemsAsync(TPriority priority);
}

EnqueuedItemsAsync返回的数据类型是ICompletionUC,一个基于接口的 .Net 可等待对象。
ICompletionUC允许等待优先级队列中的项(按降序排列)或等待特定优先级的项。

EnqueuedItemsAsync操作的结果是信息,表明优先级队列中可能有某个(些)项(按降序排列或特定优先级)。
出队操作不会执行,并且调用不会改变任何内部状态!
等待之后的下一步是 TryDequeue 方法!

如果优先级队列中已经有任何项,EnqueuedItemsAsync() 将返回一个已完成的ICompletionUC可等待对象。
这样,等待将在调用 await 之后立即完成并同步执行代码!
正如情况一样

await Task.CompletedTask;

如果对于请求的TPriority,优先级队列中已经有任何项,EnqueuedItemsAsync(TPriority) 将返回一个已完成的 ICompletionUC 可等待对象。
这样,等待将在调用 await 之后立即完成并同步执行代码!
正如情况一样

await Task.CompletedTask;

EnqueuedItemsAsync 不以任何方式影响优先级队列中项的处理!

 

使用 PriorityQueueUC

在这种情况下,它是构造,然后从相同或不同的线程调用 Enqueue 和 TryDequeue,以及可能调用的 Count 和 HasItems。

 

public enum QueuePriority { Lower, Normal, Higher }
QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };
var pQueue = new PriorityQueueUC<QueuePriority, string>(descendingPriorities);
pQueue.Enqueue(QueuePriority.Higher, "An item with higher priority");

 

string dequeued;
if (pQueue.TryDequeu(out dequeued)) Console.WriteLine($"Dequeued: {dequeued}");
string dequeued;
QueuePriority? priority;
if (pQueue.TryDequeu(out dequeued, out priority)) Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
string dequeued;
QueuePriority? priority;
if (pQueue.TryDequeu(out dequeued, out priority, QueuePriority.Lower)) Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");

 

使用 PriorityQueueNotifierUC

构造几乎相同。

public enum QueuePriority { Lower, Normal, Higher }
QueuePriority[] descendingPriorities = { QueuePriority.Higher, QueuePriority.Normal, QueuePriority.Lower };
var pQueueNotifier = new PriorityQueueNotifierUC<QueuePriority, string>(descendingPriorities);

Enqueue

pQueueNotifier.Enqueue(QueuePriority.Higher, "An item with higher priority");

 

Task.Run(() => DescendingPriorityConsumer(pQueueNotifier));

private async Task DescendingPriorityConsumer(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
       string dequeued;
       await pQueueNotifier.EnqueuedItemsAsync();
       if (!pQueueNotifier.TryDequeu(out dequeued)) continue;
       Console.WriteLine($"Dequeued: {dequeued}");
    }
}

 

Task.Run(() => DescendingPriorityConsumer2(pQueueNotifier));

private async Task DescendingPriorityConsumer2(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
        string dequeued;
        QueuePriority? priority;
        await pQueueNotifier.EnqueuedItemsAsync();
        if (!pQueueNotifier.TryDequeu(out dequeued, out priority)) continue;
        Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
    }
}

 

Task.Run(() => OverriddenPriorityConsumer(pQueueNotifier));

private async Task OverriddenPriorityConsumer(PriorityQueueNotifierUC<QueuePriority, string> pQueueNotifier)
{
    while (true)
    {
        string dequeued;
        QueuePriority? priority;
        // priority of EnqueuedItemsAsync and TryDequeue must match exactly oherwise we could burn CPU with bussy loop!
        await pQueueNotifier.EnqueuedItemsAsync(QueuePriority.Lower);
        if (!pQueueNotifier.TryDequeu(out dequeued, out priority, QueuePriority.Lower)) continue;
        Console.WriteLine($"Dequeued: {dequeued}, Priority: {priority}");
   }
}

 

 

示例和 GreenSuperGreen 库

本文是基于我的库 GreenSuperGreen 编写的,其源代码可在 github 上找到,并且 .Net 4.6 程序包在 nuget 上。
本文下载源代码的目标是 nuget 包

GreenSuperGreen 库有更多有趣的可探索类,而不仅仅是受限制的优先级队列...

 

© . All rights reserved.