65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Boost 实现线程同步队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.29/5 (7投票s)

2012年8月20日

CPOL

4分钟阅读

viewsIcon

57138

使用 STL 和 boost 实现线程同步队列。

引言

目前我正在做一个网络通信项目,并尝试使用 C++ STL 和 boost 进行开发。在开发多线程程序时,同步是一个重要的问题。如果你的程序需要处理流式数据包,那么维护一个队列是个好主意。

背景

这是我第一次使用 boost,由于缺乏好的例子,它并不容易使用。 你可以在 https://boost.ac.cn/ 找到 boost 库和文档。 这是使用 boost 的优势,摘自其网站

总而言之,是生产力。 使用像 Boost 这样高质量的库可以加速初始开发,减少错误,减少重复发明轮子,并降低长期维护成本。 并且由于 Boost 库往往会成为事实上的或法律上的标准,因此许多程序员已经熟悉它们。

在这个例子中,我只使用了 boost 同步类,但是所有函数都可以用 boost 重写,并且可以用于跨平台开发。 boost 同步类看起来很简单,但作为初学者,我仍然犯了一些错误,所以我开发了一个测试项目来验证它的功能。 在了解如何使用它之后,它将帮助你简化代码并减少错误。

使用代码 

在这个例子中,我将线程同步模型实现为生产者-消费者。 生产者线程创建数据并将其插入到队列中,而消费者线程使用数据并从队列中删除数据。 我使用互斥对象来保持两个线程同步。

我正在尝试使用不同的方法来解决同一个问题,然后比较它们的优缺点。

首先,我设计了一个接口来抽象同步队列模型。 ISynchronizedQueue 抽象类只有两个方法:add()get()add() 将在生产者线程中使用,以将数据插入到队列中,而 get() 将在消费者线程中使用,以获取和删除队列中的数据。 该接口有三种不同的实现方式。

  1. SynchronizedDequeue:是一个双端队列,使用 STL deque 实现。
  2. SychronizedVector:是一个环或循环队列,使用 STL vector 实现。
  3. SychronizedVectorNB:是 SychronizedVector 的非阻塞版本。

这是头文件和接口定义:

#include <iostream>
#include <deque>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

using namespace std;
 
#define default_packesize 1280  

class TPacket
{
    int my_size;
    unsigned char my_databuf[default_packesize];
    unsigned int ID;
public:
    TPacket() {std::memset(my_databuf,0,sizeof(my_databuf));my_size=0;}
    ~TPacket() {;}
    int GetSize() {return my_size;}
    void SetSize(int size) {my_size = size;}
    unsigned int GetID() {return ID;}
    void SetID(int id) {ID = id;}
    bool GetData(char* pbuf,int& size) 
    {
        if(my_size>size)
            return false;
        size = my_size;
        memcpy(pbuf,my_databuf,my_size);
        return true;
    }
    bool SetData(char* pbuf,int size) 
    {
        if(size>default_packesize)
            return false;
        memcpy(my_databuf,pbuf,size);
        my_size=size;
        return true;
    }
public:    
    virtual bool IsValid() {return false;}
    virtual bool Encode() {return false;}
    virtual bool Decode() {return false;}
};

//queue interface
template <class T>
class ISynchronizedQueue
{
public:
    virtual bool add(T pkt) = 0;
    virtual bool get(T& pkt) = 0;
    virtual bool read(T& pkt) = 0;
    virtual bool del(T& pkt) = 0;
    virtual bool clear() = 0;
};

让我们看看这些实现。

class SynchronizedDequeue: public ISynchronizedQueue<TPacket>
{
    boost::mutex m_mutex;
    deque<TPacket> m_queue;
    boost::condition_variable m_cond;

public:
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(m_queue.size()>100)
            m_queue.clear();
        m_queue.push_back(pkt);
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        m_queue.pop_front();
        return true;
    }

    bool read(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        return true;
    }

    bool del(TPacket& pkt)
    {
        return get(pkt);
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        m_queue.clear();
        return true;
    }
};

SynchronizedDequeue 具有动态队列大小。 优点是如果生产者比消费者快,则不会丢失任何数据,所有生成的数据都将由消费者处理。 缺点是它对内存管理性能有更大的影响。 当数据包插入队列时,它会分配内存,并在我们将数据返回给消费者线程时释放内存。 由于会多次进行内存分配和释放,这可能会减慢同一进程中较大对象的内存回收速度。

class SynchronizedVector :public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVector(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt)  //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

SychronizedVector 使用固定大小的队列来避免内存管理开销,但它会覆盖队列中未及时处理的旧数据,同时新数据会进来并将其冲掉。

class SynchronizedVectorNB :public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVectorNB(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;

        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt) //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

SychronizedVectorNB 不会被生产者或消费者线程阻塞。 优点是,如果在队列访问线程的同一循环中需要完成其他一些活动,则非阻塞版本将保证响应时间。

上面的两个队列在线程尝试拥有互斥对象时可能会阻塞线程。 如果一个线程拥有互斥锁,然后发生一些异常,则另一个线程也会被阻塞。 它的缺点是,当它无法获得锁时,它可能无法将数据添加到队列中,然后调用者需要再次添加相同的数据。

这是生产者线程的示例代码

DWORD WINAPI ProducerServerThread(LPVOID lpParam)
{
 int count=0;
 
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt; 
 LOG("\n-------------------------Producer thread begin-----------------------");
 while(1)
 {
  DWORD t1 = GetTickCount();
  Sleep(50);
 
  if(count++>=1000)
   break;
  
  //initialize packet data to zero.
  memset(&pkt,0,sizeof(pkt));

  //add content to packet, I only set the ID here, you can do something more.
  pkt.SetID(count);


  if(pQ->add(pkt))
   LOG("Add PACKET ID = %d ",pkt.GetID());
  else
   LOG("Add Packet Failed");
  DWORD t2 = GetTickCount();
 
  LOG("ONE-LOOP DURATION = %d",t2-t1);
 }
 LOG("\n-------------------------Producer thread end-----------------------");
 return 0;
}

这是消费者线程的示例代码

DWORD WINAPI ConsumerServerThread(LPVOID lpParam)
{
 int count=0;
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt;
 LOG("\n-------------------------Cosumer thread begin-----------------------");
 while(1)
 {
  Sleep(10);
 
  if(count++>=1200)
   break;
 
  if(pQ->get(pkt))
   LOG("Get Packet ID = %d",pkt.GetID());
  else
   LOG("Get Packet Failed");
 }
 LOG("\n-------------------------Cosumer thread end-----------------------");
 return 0;
}

这是主线程的示例代码

SynchronizedDequeue m_q[5];
//SynchronizedVector m_q[5];
//SynchronizedVectorNB m_q[5]

int _tmain(int argc, _TCHAR* argv[])
{
   int thread_count =5;
   HANDLE server_threads[10]; 
 
   for (int i=0; i < thread_count ;i++)
    {
        server_threads[i] = CreateThread(
                                        NULL,
                                        0,
                                        ProducerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
    for (int i= 0; i < thread_count ;i++)
    {
        server_threads[i+thread_count] = CreateThread(
                                        NULL,
                                        0,
                                        ConsumerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
  // Wait until the threads exit, then cleanup
    int retval = WaitForMultipleObjects(
                                   2*thread_count,
                                   server_threads,
                                   TRUE,
                                   INFINITE
                                   );
    if ((retval == WAIT_FAILED) || (retval == WAIT_TIMEOUT))
    {
        LOG( "WaitForMultipleObjects failed: %d\n", GetLastError());
        return 0;
    }
}

在测试代码中,我创建了五个生产者、五个消费者和五个队列。 每个生产者都有与其关联的消费者,它们使用相同的队列。 你可以通过数据包 ID 验证每个生成的数据包是否由消费者线程按顺序处理。 你可以自己定义 LOG 宏,我使用了一个线程安全的 LOG 宏,带有日志时间输出。 通过日志时间,你可以更清楚地看到线程性能。

19:33:50:106  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:106  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:122  7760 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:122  7416 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:122  7760 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:138  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  5972 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  4244 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:138  7416 info: ConsumerServerThread: Get Packet Failed
19:33:50:138  8268 info: ProducerServerThread: ONE-LOOP DURATION = 62
19:33:50:153  4244 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  5808 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  8836 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  7352 info: ProducerServerThread: Add PACKET ID = 1 
19:33:50:153  8464 info: ConsumerServerThread: Get Packet Failed
19:33:50:153  5972 info: ConsumerServerThread: Get Packet ID = 1
19:33:50:153  8836 info: ProducerServerThread: ONE-LOOP DURATION = 63
19:33:50:153  7352 info: ProducerServerThread: ONE-LOOP DURATION = 63

在测试了具有 5 个生产者-消费者线程对的三种不同同步队列,每个线程添加和获取 1000 个数据包之后,它们的性能基本相同。 日志本身将花费大约 10 毫秒。 你可以修改它,以查看这三种类型的队列在更大的数据集、更长的运行时间或更大的对象内存分配中的表现。

关注点

第一次使用 C++ boost 库的一小部分真是太有趣了。

历史

  • 第一个版本,08/17/2012。
  • 第二个版本,08/19/2012:更新文章,提供更多说明。
  • 第三个版本,10/28/2014:更新代码以支持模板。
© . All rights reserved.