65.9K
CodeProject 正在变化。 阅读更多。
Home

TPL:生产者消费者模式 - 线程安全队列集合

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (3投票s)

2016年7月24日

CPOL

4分钟阅读

viewsIcon

33898

downloadIcon

626

解决生产者消费者问题 - 共享数据的线程安全数据访问。

引言

本文的目的是演示如何在生产者和消费者场景中使用Concurrent Queue来使用线程安全队列集合。


生产者负责通过队列发送消息,而消费者负责处理这些信号并从中移除,即一次处理一个项目。

本文主要关注Queue 和 Concurrent Queue作为生产者发送消息和消费者处理这些消息的机制。

在我的下一篇文章中,我将演示如何解决典型的生产者和消费者问题。在这里,我们将专注于生产者和消费者之间线程安全的数据访问。

背景

生产者将生成一系列任务,每个任务都包含一些数据和相关的操作;同样,消费者将接收这些任务并根据其关联的工作来处理任务。

假设这里的任务是反转随机生成的字符串,我们有两个参与者:生产者,负责创建多个随机字符串;消费者,负责消费数据并反转字符串。

由于 .Net 框架提供了多种方式,我们可以使用以下其中一种:

  • 队列
  • ConcurrentQueue
  • BlockingCollection

Queue是典型的 FIFO(先进先出)集合,具有 Enqueue(入队)、Dequeue(出队)和 Peek(查看)方法。

  • Enqueue 将新项目添加到 Queue 集合的末尾。
  • Dequeue 将移除并返回 Queue 集合开头的对象。
  • Peek 将在不移除 Queue 集合中的项目的情况下,返回开头的对象。

ConcurrentQueue 也是 FIFO 结构,具有 Enqueue(入队)、TryDequeue(尝试出队)和 TryPeek(尝试查看)方法。

  • Enqueue 将新项目添加到 ConcurrentQueue 的末尾。
  • TryDequeue 将尝试移除并返回 ConcurrentQueue 开头的对象。
  • TryPeek 将尝试返回 ConcurrentQueue 开头的对象。

 

那么 ConcurrentQueue 有什么特别之处呢?是的,它是一个线程安全集合。

问题:我们是否需要线程安全共享数据访问?答案是

在生产者-消费者场景中,消息集合由消费者处理;这里我们可以有一个或多个消费者(线程),它们都可以同时并行运行。这意味着有可能某个消费者试图访问一个已经被处理或从消息集合中移除的消息。

Queue 是 FIFO 集合,不提供内置的线程安全数据访问实现。是的,我们可以通过 Lock 来实现这一点,但这却是 ConcurrentQueue 的内置功能。

请看下面的消费者线程,它试图处理一个已经被另一个线程处理并从队列中移除的任务。由于一个或多个消费者(线程)访问的数据不安全,消费者在进入 while 循环时,即使任务当时在队列中,也未能成功 Dequeue。

 

由于 Concurrent Queue 是无锁实现,它有助于实现线程安全的数据访问。请参阅下面的代码片段,了解如何在生产者-消费者场景中使用 Concurrent Queue。

关于代码

在附加的 VS 2015 解决方案中,我们有:

  • QueuedObject.cs
    • 此类将包含一些数据,如 ConsumerThreadID、ProducerThreadID、RandomString 等,并将由生产者(Queue 集合或 Concurrent Queue 集合)入队。
  • Program.cs 包含:
    • 生产者线程生成任务,并将它们与关联的操作一起添加到选定的队列中。
    • 两个消费者线程读取选定的队列,提取任务并执行关联的操作,即反转字符串。
  • QueueService.cs 包含:
    • 对 .Net Queue 集合的实现,负责任务的入队和出队。
  • ConcurrentQueue.cs 包含:
    •  对 .Net Concurrent Queue 集合的实现,负责任务的入队和出队。

请确保注释和取消注释 Queue 和 Concurrent Queue 的调用代码行以及 ConsumeTasks 方法。例如,如果您想演示 Queue 的共享数据访问问题,请在 Program.cs 中注释掉 ProduceTasks 和 ConsumeTasks 方法中与 ConcurrentQueue 相关的调用。

QueuedObject.cs

 public class QueuedObject
    {
        public int QueueID { get; set; }
        public int ConsumerThreadID { get; set; }
        public int ProducerThreadID { get; set; }
        public  string  RandomString { get; set; }
        public DateTime EnqueueDateTime { get; set; }
        public DateTime DequeueDateTime { get; set; }
    }

QueueService.cs

public static class QueueService
    {
        static Queue<Task> _queue;

        static QueueService() { _queue = new Queue<Task>(); }
        public static void Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
        {
            Task task = new Task(action, cancelToken);
            _queue.Enqueue(task);
        }

        public static void Dequeue()
        {
            while (true)
            {
                try
                {
                    Task task = _queue.Dequeue();
                    task.RunSynchronously();
                }
                catch (NullReferenceException ex)
                {
                    Debug.WriteLine(ex.Message);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }

    }

ConcurrentQueueService.cs

class ConcurrentQueueService
    {
        static ConcurrentQueue<Task> _queue;

        static ConcurrentQueueService()
        {            
            _queue = new ConcurrentQueue<Task>();
        }

        public static void Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
        {
            Task task = new Task(action, cancelToken);
            _queue.Enqueue(task);
        }

        public static void Dequeue()
        {
            while (true)
            {
                try
                {
                    Task task;
                    if (_queue.TryDequeue(out task)) { task.RunSynchronously(); }
                }
                catch (NullReferenceException ex)
                {
                    string w = ex.Message;
                    Debug.WriteLine(ex.Message);
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
        }
    }

Program.cs

    class Program
    {
        static void Main(string[] args)
        {
            var t1 = Task.Factory.StartNew(() => ProduceTasks());
            var t2 = Task.Factory.StartNew(() => ConsumeTasks());
            var t3 = Task.Factory.StartNew(() => ConsumeTasks());
            Task.WaitAll(t1, t2, t3);            
            Console.ReadLine();
        }

        public static void ProcessQueue(QueuedObject queue)
        {
            string reversedString = new string(Enumerable.Range(1, queue.RandomString.Length).Select(i => queue.RandomString[queue.RandomString.Length - i]).ToArray());

            Console.WriteLine
                (
                "Dequeued: " + queue.QueueID +
                "\t" + "Consumer ThreadID :" + Thread.CurrentThread.ManagedThreadId +
                "\t" + DateTime.Now.ToLongTimeString() +
                "\t" + "ReversedString :" + reversedString
                );
        }
                
        public static void ProduceTasks()
        {
            Random random = new Random();
            for (int i = 1; i <= 100000; i++)
            {
                var queue = new QueuedObject
                {
                    QueueID = i,
                    ProducerThreadID = Thread.CurrentThread.ManagedThreadId,
                    EnqueueDateTime = DateTime.Now,
                    // Used to Generate Random String
                    RandomString = new string(Enumerable.Repeat("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 5).Select(s => s[random.Next(s.Length)]).ToArray())
                };

                #region Queueing using Queue Collection
                // Uncomment QueueService.Enqueue(() => { ProcessQueue(queue); }) to use .Net Queue Object to Queue Tasks.
                // And Comment ConcurrentQueueService.Enqueue
                // QueueService.Enqueue(() => { ProcessQueue(queue); });
                #endregion

                #region Queueing using ConcurrentQueue
                ConcurrentQueueService.Enqueue(() => { ProcessQueue(queue); });
                #endregion

                Console.WriteLine
                    (
                    "Enqueued: " + queue.QueueID +
                    "\t" + "Producer ThreadID :" + queue.ProducerThreadID +
                    "\t" + queue.EnqueueDateTime.ToLongTimeString() +
                    "\t" + "RandomString   :" + queue.RandomString
                    );
            }
        }
        
        public static void ConsumeTasks()
        {
            //QueueService.Dequeue();
            ConcurrentQueueService.Dequeue();
        }
    }

 

附加代码演示了生产者-消费者场景中的 Queue 和 Concurrent Queue 集合实现。这里使用一个生产者生成任务,并使用两个消费者处理接收到的任务队列集合。

兴趣点:更多生产者-消费者问题(我将在下一版文章中解决)

  • 当队列已满时,生产者应停止添加数据或进入睡眠模式;当消费者移除任何数据时,应向生产者发出通知,以便其重新开始添加数据。

  • 当没有可用数据时,消费者应进入睡眠模式;当有数据可用时,消费者应被唤醒。

 

© . All rights reserved.