65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 std 或 Boost 线程的工作队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.75/5 (7投票s)

2011 年 11 月 13 日

CPOL

2分钟阅读

viewsIcon

34475

downloadIcon

741

使用标准或 Boost 线程和 C++ 的工作线程示例。

引言

异步工作队列(们) 独立于调用线程处理请求。这些请求被推送到队列中,队列的访问受到互斥锁的保护。独立的 worker 线程从这些队列中弹出项目来处理它们。在 worker 线程中处理期间,互斥锁被解锁,以便调用线程可以访问队列。条件变量用于发出需要处理项目的信号。作为优化,它仅在所有 worker 线程空闲时才发出条件变量信号。

WorkQueueCplusplus/queue1.PNG

在典型的生产者-消费者场景中,当消费者无法像生产者那样快速处理请求时,队列可能会被淹没。然后队列会溢出。这个类提供了一个名为 'Overflow' 的虚拟函数,当项目数量达到最大队列大小时会调用该函数。默认实现会丢弃该项目,但重写的函数可以使用另一种策略(例如,优先处理队列中的项目并丢弃其他项目)。

这里呈现的类名为 KBaseWorkQueue。以下部分列出了最重要的部分。

模板参数

  • T:模板参数,用于指定项目。项目会被复制到队列中以及从队列中复制出来。如果复制项目开销过大(例如,视频图像),请使用 shared_ptr。

数据成员

  • nThreads:指定 worker 线程的数量。
  • nMaxSize:在发出溢出信号之前的最大队列大小。

成员函数

  • Add:将项目添加到工作队列的函数
  • Process:纯虚拟函数,必须重写才能处理项目。请注意,在调用此函数时(在 worker 线程的上下文中),互斥锁会被解锁。
  • Overflow:在溢出情况下在调用线程的上下文中调用。

背景

这篇文章大量使用了 Butenhof 7.2 中优秀书籍“Programming with POSIX Threads” 中描述的示例。Butenhof 使用 pthreads,而本文使用了 std 或 Boost.Threads 作为库,因此应该与平台无关。Boost.Threads 受 pthreads 的许多概念启发;std 线程受 Boost 启发。

在 www.codeproject.com 上已经有许多文章描述了 worker 队列

Using the Code

示例

//
// Test queue (uses STL version)
//
#include "kbaseworkqueue.hpp"
#include <atomic>
#include <assert.h>


class TestQueue : public KBaseWorkQueue<int>
{
   typedef KBaseWorkQueue<int> base;

public:
   TestQueue(size_t nThreads, size_t nMaxSize)
      : base(nThreads, nMaxSize)
      , m_n(0)
   {}

   virtual void Process(const int& rn) override
   {
      m_n += rn;
   }
   
   std::atomic<int>  m_n;  // atomic only necessary if more than 1 thread is used
};


int main()
{
   const size_t nThreads = 2;
   const size_t nQueue   = 10;

   TestQueue queue{nThreads, nQueue};

   for (size_t n = 0; n != 50; ++n)
   {
      queue.Add(2);
   }

   queue.Stop();

   assert(queue.m_n <= 100);
   
   return 0;
}

有改进空间

多线程代码总是更复杂。我已经在四核处理器上对这个类进行了压力测试,没有发现任何问题。但是,测试无法证明不存在错误。此外,最好将 unlock/lock 调用序列围绕 'Process' 调用包装在一个 'scope' 对象中。在我的私有实现中,我已经在工作队列溢出时扩展了工作队列,添加了“drop”或“wait”策略。

历史

  • 2011 年 11 月 13 日 - 发布原始版本。
  • 2018 年 4 月 28 日 - std 版本
© . All rights reserved.