TPL:使用 BlockingCollection 解决生产者和消费者问题






4.71/5 (5投票s)
Task Parallel Library - Blocking Collection入门。
引言
生产者-消费者问题是一个典型的多进程同步问题,其中两个进程共享一个固定大小的共享缓冲区,也称为队列。
假设一个或多个生产者正在生成一系列任务,一个或多个消费者正在消耗生成的任务进行处理。这里的缓冲区或队列是两者的共享区域,生产者将在此添加任务,消费者将在此移除并处理任务。
选择合适的排队集合机制非常重要。
典型问题
- 生产者和消费者线程安全的数据访问
- 生产者将继续向队列添加元素,而不考虑队列容量
- 生产者不知道队列中项的处理状态
- 消费者将运行无限循环来查看队列中是否有可供处理的项
背景
生产者的工作是生成数据并放入队列,同时消费者尝试从队列中移除数据并进一步处理。
可能的解决方案
-
使用线程安全队列集合(请参阅我的文章https://codeproject.org.cn/Articles/1112510/TPL-Producer-Consumer-Pattern-Thread-Safe-Queue-Co)
-
当队列已满时,生产者应丢弃数据或进入睡眠模式
- 当缓冲区为空时,消费者应进入更长时间的睡眠,并在队列中有可供处理的元素时恢复其操作
- 当生产者完成生成元素并且不再向队列添加元素时,应通知消费者结束处理
- (可选)当从队列中移除项时,消费者应向生产者发出通知,以便生产者可以再次开始添加元素(如果队列已满)。
是的,有多种方法可以使用通知、锁定和其他 .Net 支持的旧类来实现上述解决方案。我们可以使用 Blocking Collection 吗?它是 .Net Framework 4.0 及更高版本引入的平滑、简单且经过验证的实现。
使用线程安全队列集合
BlockingCollection<T> 是一个线程安全的集合类,它提供了以下功能:生产者-消费者模式的实现。来自多个线程的元素的并发添加和获取。可选的最大容量。当集合为空或已满时会阻塞的插入和移除操作。
参考: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.110).aspx
由于这是一个线程安全的集合,我们的第一个问题就解决了;这意味着我们不需要担心生产者和消费者都访问共享的队列数据,无论是单个消费者还是多个消费者。
阻塞与有界
您是在考虑BlockingCollection而不是ConcurrentQueue吗?是的,这两者之间的主要区别在于BlockingCollection支持有界和阻塞,而ConcurrentQueue作为内置实现不支持这些。
当队列已满时,生产者应丢弃数据或进入睡眠模式
可以定义有界并通过设置集合可以容纳的最大元素容量。
如果此值设置为 2786,并且假设您的队列中有 2786 个项;这意味着下一个向队列添加项的请求将被阻塞,直到队列中的一个或多个元素被移除。一旦队列中有可用空间,就会添加项。
这里的添加元素的请求将被阻塞,这意味着正在向消费者添加数据的生产者线程将在没有自定义实现的情况下处于等待模式。
这看起来不错,因为现在生产者在等待,但超时呢?它应该在一段时间后丢弃请求。这里我们需要使用 TryAdd 方法,它支持超时作为参数之一,这意味着 TryAdd 方法将等待一段时间来向队列添加数据,在此期间消费者可以从队列中移除元素。
当缓冲区为空时,消费者应进入更长时间的睡眠,并在队列中有可供处理的项时恢复其操作
Blocking Collection 还提供了 GetConsumingEnumerable 方法,使用此方法进行枚举会在队列中没有元素时阻塞队列,即将其置于等待或睡眠模式
当生产者完成生成元素并且不再向队列添加元素时,应通知消费者结束处理
Blocking Collection 还提供了 CompleteAdding 方法,此方法用于指示队列生产者已完成向队列添加所有项,或取消 GetConsumingEnumerable 枚举,这意味着不再需要 while 循环来检查队列中的元素,也无需“在没有数据的情况下尝试读取队列”。
有关 Blocking Collection 中在示例中使用的一些重要方法
Blocking Collection,命名空间: System.Collections.Concurrent,程序集: System(在 System.dll 中)
- BlockingCollection<T>.TryAdd 方法 (T) - 尝试在给定时间段内将指定的项添加到 BlockingCollection<T>。
- BlockingCollection<T>.GetConsumingEnumerable - 当队列中没有元素可用时,枚举 GetConsumingEnumerable 会阻塞。
- BlockingCollection<T>.CompleteAdding 方法 – 取消枚举,即队列不再接受任何添加。
使用代码
讨论够多了,需要看看代码吗?
随附 TPL.BlockingCollection.zip 文件,其中实现了使用 Blocking Collection 的生产者-消费者模式。我使用了 Visual Studio 2015 和 Microsoft.Net Framework 4.5
文件 - TaskProducer.cs
- 负责生成任务。在这里,生产者将生成关于 Populating the QueuedObject 类的任务,将其与要执行的 Action 关联,然后在出队后执行,最后将其添加到队列。
namespace TPL.BlockingCollection
{
/// <summary>
/// Produces and Enqueues the Tasks
/// </summary>
public class TaskProducer
{
private TaskQueue _taskQueue;
public TaskProducer(TaskQueue taskQueue) { _taskQueue = taskQueue; }
/// <summary>
/// Produces the tasks.
/// </summary>
public void ProduceTasks()
{
Random random = new Random();
for (int i = 1; i <= 5; i++)
{
// Prepare Queue Object (Hold the Test Data)
var queue = new QueuedObject
{
QueueID = i,
ProducerThreadID = Thread.CurrentThread.ManagedThreadId,
EnqueueDateTime = DateTime.Now,
// Used to Generate Random String
RandomString = new string(Enumerable.Repeat("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 5).Select(s => s[random.Next(s.Length)]).ToArray())
};
// Add Task to Queue with Action
_taskQueue.EnqueueTask(() => { ReverseString(queue); });
Console.WriteLine
(
"Enqueued: " + queue.QueueID +
"\t" + "Producer ThreadID :" + queue.ProducerThreadID +
"\t" + queue.EnqueueDateTime.ToLongTimeString() +
"\t" + "RandomString :" + queue.RandomString
);
}
}
/// <summary>
/// Reverses the string. (This is an Associated Action with Task)
/// </summary>
/// <param name="queue">The queue.</param>
public static void ReverseString(QueuedObject queue)
{
string reversedString = new string(Enumerable.Range(1, queue.RandomString.Length).Select(i => queue.RandomString[queue.RandomString.Length - i]).ToArray());
Console.WriteLine
(
"Dequeued: " + queue.QueueID +
"\t" + "Consumer ThreadID :" + Thread.CurrentThread.ManagedThreadId +
"\t" + DateTime.Now.ToLongTimeString() +
"\t" + "ReversedString :" + reversedString
);
}
}
}
TaskQueue.cs
- 充当队列,它使用 BlockingCollection 并执行 Task Producer 添加的任务的入队和出队操作。
public class TaskQueue
{
private BlockingCollection<Task> _workTaskQueue;
public delegate void TaskEventsHandler(TaskProcessingArguments e);
public event TaskEventsHandler TaskStatus;
public TaskQueue(IProducerConsumerCollection<Task> workTaskCollection, int QueueSize, int timeout)
{
_workTaskQueue = new BlockingCollection<Task>(workTaskCollection);
}
/// <summary>
/// Enqueues the task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="cancelToken">The cancel token.</param>
public void EnqueueTask(Action action, CancellationToken cancelToken = default(CancellationToken))
{
var task = new Task(action, cancelToken);
if (_workTaskQueue.TryAdd(task))
{
TaskStatus?.Invoke
(new TaskProcessingArguments
{
ISTaskAdded = true,
Message = "Task Added to Queue",
PendingTaskCount = _workTaskQueue.Count,
});
}
else
{
TaskStatus?.Invoke
(new TaskProcessingArguments
{
ISTaskAdded = false,
Message = "Timedout while adding Task to Queue",
PendingTaskCount = _workTaskQueue.Count,
});
}
}
/// <summary>
/// Dequeues the task.
/// </summary>
public void DequeueTask()
{
foreach (var task in _workTaskQueue.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
// if (!task.IsCanceled) task.Start();
}
catch (Exception ex)
{
}
}
/// <summary>
/// CompleteAdding : Will notify Consumer / Queue - Task Addition from Producer is Completed
/// </summary>
public void Close()
{
_workTaskQueue.CompleteAdding();
}
}
最后,Program.cs 调用生产者和消费者,即多个生产者生成任务,多个消费者消耗任务。请注意 taskQueue.Close() 方法的用法,该方法在所有生产者线程完成向队列添加项后调用,以便消费者知道何时停止或取消枚举。
class Program
{
static void Main(string[] args)
{
// Initialize Task Queue and Specify Capacity and timeout
TaskQueue taskQueue = new TaskQueue(new ConcurrentQueue<Task>(), 1000, 10);
// Subscrive to Queue Processing Events
taskQueue.TaskStatus += WorkQueue_TaskStatus;
//Setup Producers - To Produce Tasks and Associated Action
TaskProducer producerOne = new TaskProducer(taskQueue);
TaskProducer producerTwo = new TaskProducer(taskQueue);
//Start Producing Tasks (Here we have 2 Producers)
Task producerTaskOne = Task.Run(() => producerOne.ProduceTasks());
Task producerTaskTwo = Task.Run(() => producerTwo.ProduceTasks());
//Start Consumers
Task consumerTaskOne = Task.Run(() => taskQueue.DequeueTask());
Task consumerTaskTwo = Task.Run(() => taskQueue.DequeueTask());
//Wait for all Producers to Complete Producing Tasks
Task.WaitAll(producerTaskOne, producerTaskTwo);
//Call Queue Close Method (Indicate Producers have stopped producing tasks)
taskQueue.Close();
//Wait for Consumer to Process Tasks
Task.WaitAll(consumerTaskOne, consumerTaskTwo);
Console.WriteLine("Tasks Processed");
Console.ReadLine();
}
/// <summary>
/// Trigged when attempt is made to Add Task to Queue (Either Success or Timeout)
/// See TaskProcessingArguments
/// </summary>
/// <param name="e">The e.</param>
private static void WorkQueue_TaskStatus(TaskProcessingArguments e)
{
//Console.WriteLine(e.ISTaskAdded);
}
}
我在此附上完整工作源代码的初始版本,并正在进行进一步优化,因为总有改进的空间。
要运行代码,请在 TaskProducer.cs 中更改 for 循环的开始和结束值以进行更大的迭代示例。此外,有界和阻塞可以通过 Program.cs 中 TaskQueue.cs 构造函数传递的相关参数来控制。
在我下一篇关于 Task Parallel Library - Data Flows 的文章中,我将提供更多关于生产者-消费者问题的有趣复杂示例。