使用 std 或 Boost 线程的工作队列
使用标准或 Boost 线程和 C++ 的工作线程示例。
引言
异步工作队列(们) 独立于调用线程处理请求。这些请求被推送到队列中,队列的访问受到互斥锁的保护。独立的 worker 线程从这些队列中弹出项目来处理它们。在 worker 线程中处理期间,互斥锁被解锁,以便调用线程可以访问队列。条件变量用于发出需要处理项目的信号。作为优化,它仅在所有 worker 线程空闲时才发出条件变量信号。
在典型的生产者-消费者场景中,当消费者无法像生产者那样快速处理请求时,队列可能会被淹没。然后队列会溢出。这个类提供了一个名为 'Overflow
' 的虚拟函数,当项目数量达到最大队列大小时会调用该函数。默认实现会丢弃该项目,但重写的函数可以使用另一种策略(例如,优先处理队列中的项目并丢弃其他项目)。
这里呈现的类名为 KBaseWorkQueue
。以下部分列出了最重要的部分。
模板参数
T
:模板参数,用于指定项目。项目会被复制到队列中以及从队列中复制出来。如果复制项目开销过大(例如,视频图像),请使用 shared_ptr。
数据成员
nThreads
:指定 worker 线程的数量。nMaxSize
:在发出溢出信号之前的最大队列大小。
成员函数
Add
:将项目添加到工作队列的函数Process
:纯虚拟函数,必须重写才能处理项目。请注意,在调用此函数时(在 worker 线程的上下文中),互斥锁会被解锁。Overflow
:在溢出情况下在调用线程的上下文中调用。
背景
这篇文章大量使用了 Butenhof 7.2 中优秀书籍“Programming with POSIX Threads” 中描述的示例。Butenhof 使用 pthreads,而本文使用了 std 或 Boost.Threads 作为库,因此应该与平台无关。Boost.Threads 受 pthreads 的许多概念启发;std 线程受 Boost 启发。
在 www.codeproject.com 上已经有许多文章描述了 worker 队列
- 'CThread - 一个 Worker 线程包装类'。这个使用 Win32 API 函数。
- 'WorkerThread Library':使用 MFC 的
CWinThread
。
Using the Code
示例
//
// Test queue (uses STL version)
//
#include "kbaseworkqueue.hpp"
#include <atomic>
#include <assert.h>
class TestQueue : public KBaseWorkQueue<int>
{
typedef KBaseWorkQueue<int> base;
public:
TestQueue(size_t nThreads, size_t nMaxSize)
: base(nThreads, nMaxSize)
, m_n(0)
{}
virtual void Process(const int& rn) override
{
m_n += rn;
}
std::atomic<int> m_n; // atomic only necessary if more than 1 thread is used
};
int main()
{
const size_t nThreads = 2;
const size_t nQueue = 10;
TestQueue queue{nThreads, nQueue};
for (size_t n = 0; n != 50; ++n)
{
queue.Add(2);
}
queue.Stop();
assert(queue.m_n <= 100);
return 0;
}
有改进空间
多线程代码总是更复杂。我已经在四核处理器上对这个类进行了压力测试,没有发现任何问题。但是,测试无法证明不存在错误。此外,最好将 unlock/lock 调用序列围绕 'Process
' 调用包装在一个 'scope' 对象中。在我的私有实现中,我已经在工作队列溢出时扩展了工作队列,添加了“drop”或“wait”策略。
历史
- 2011 年 11 月 13 日 - 发布原始版本。
- 2018 年 4 月 28 日 - std 版本