65.9K
CodeProject 正在变化。 阅读更多。
Home

C++插件线程池设计

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.21/5 (10投票s)

2008年5月15日

GPL3

5分钟阅读

viewsIcon

68300

downloadIcon

1721

使用命令模式和责任链模式实现一个插件式线程池库。

使用命令模式和责任链模式实现线程池

命令模式和责任链模式可以优雅地用于实现线程池设计。顾名思义,线程池是一个可用于执行任何工作的线程集合。拥有线程池的主要优点是它减少了周期性创建和销毁线程的开销。这在需要频繁创建和删除线程的应用场景中尤为重要。

我们的目标是提供一个线程池类的设计,该设计可以集成到现有代码中,而无需更改现有类的结构。也就是说,遵循开闭原则(OC principles),提供一个插件式库,使遗留类能够利用线程池的概念。进一步阐述,如果有一个类“X”,它有一个线程安全的方法“methodX”,那么用户应该能够通过线程池暴露的一个方法,在线程中调用 methodX

下面是一个代码片段,其中类 X = CTaskmethodX = DoSomeTask。创建一个线程池,比如说,初始有 5 个线程,最多 10 个线程。

ThreadPool<CTask> t1(5,10); // Initial thread size 5, max thread size 10

然后,创建一个 Command 对象,并指定你希望在线程池中的某个线程中调用的方法,本例中为 CTask::DoSomeTask

Command<CTask>* cmd =0;
cmd= new Command<CTask>(task1, // pointer to the CTask object whose --
         &CTask::DoSomeTask, // method that will be executed in a
                                 // thread in the threadpool
         task1->GetTimeOut(), // the maximum time for the task
                                 // to finish , before it is signaled 
                                 // as a hang
         "ne2_11", // a key that can be used as a logical bucket 
                             // for tasks (can be unique also)
         PRIO_NORMAL); // the priority of the task

之后,将该 Command 对象推入一个队列中,t1.QueueRequest(cmd);,这就完成了;线程池会处理剩下的事情。

好了,在我们继续之前,我们必须先熟悉命令模式和责任链模式。

命令模式的意图

将一个请求封装成一个对象,从而使你可用不同的请求对客户进行参数化;对请求排队或记录请求日志,以及支持可撤销的操作。[GoF, p233]

基本上,你可以用它来封装对象及其方法;所以,这里的代码片段或类设计会更具启发性。请注意,这只是该类设计的一种方式,另一种方式是使用一个虚方法调用 execute,派生类可以重写并实现它。但是,下面将是我们的实现方式。

class Command
{
private:
    CTest* m_objPtr; // the object pointer
    void (CTest::*m_method)(); // A pointer to the method 

public:
    explicit Command(CTest* ptr;, void (CTest::*pmethod)()) // constructor
    {
        m_objPtr =ptr;
        m_method = pmethod;
    }//The execute method that is invoked by the client
    void execute()
    {
        (m_objptr->*method)(); // Calls the method of the object
    }// And now the destructor
    ~Command(){}; //Note do not call 'delete m_objptr' as we do not own this

}

嗯,这是我们的构建模块,下一步是使这个类变得通用。另外,注意我们对方法的定义将其限制为使用签名为 void method(void); 的方法。这可以被修改,稍后会展示。

好的,下面是这个类的通用版本

template <typename T>
class Command
{
    T* m_objptr;
    void (T::*method)();
public:
    //Takes the pointer to an object and a pointer to the method
    explicit Command(T* pObj,void (T::*p_method)())
    {
        :

其余部分与之前的类结构相同。现在,如果你需要为你的方法使用不同的签名,你必须使用 std::bind1ststd::bind2nd,或者编写一个类似的类和辅助方法。

假设你的方法有以下签名

int CTask::ThreeParameterTask(int par1, int par2, int par3)

我们将看看如何将其适配到命令模式中——为此,首先,你必须编写一个成员函数适配器,以便它可以像函数对象一样被调用。

注意——这很丑陋,也许你可以使用 Boost 的 bind 辅助工具等,但如果你不能或不想用,这是一种方法。

// a template class for converting a member function of the type int function(int,int,int)
//to be called as a function object
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
class mem_fun3_t
{
public:
    explicit mem_fun3_t(_Ret (_Class::*_Pm)(_arg1,_arg2,_arg3))
        :m_Ptr(_Pm) //okay here we store the member function pointer for later use
    {}

    //this operator call comes from the bind method
    _Ret operator()(_Class *_P, _arg1 arg1, _arg2 arg2, _arg3 arg3) const
    {
        return ((_P->*m_Ptr)(arg1,arg2,arg3));
    }
private:
    _Ret (_Class::*m_Ptr)(_arg1,_arg2,_arg3);// method pointer signature
};

此外,我们需要一个辅助方法 mem_fun3 来辅助调用上述类。

template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3> mem_fun3 ( _Ret (_Class::*_Pm)(_arg1,_arg2,_arg3) )
{
    return (mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3>(_Pm));
}

现在,为了绑定参数,我们必须编写一个绑定器函数。所以,它来了

template<typename _Func,typename _Ptr,typename _arg1,typename _arg2,typename _arg3>
class binder3
{
public:
    //This is the constructor that does the binding part
    binder3(_Func fn,_Ptr ptr,_arg1 i,_arg2 j,_arg3 k)
        :m_ptr(ptr),m_fn(fn),m1(i),m2(j),m3(k){}


        //and this is the function object 
        void operator()() const
        {
            m_fn(m_ptr,m1,m2,m3);//that calls the operator
        }
private:
    _Ptr m_ptr;
    _Func m_fn;
    _arg1 m1; _arg2 m2; _arg3 m3;
};

以及一个辅助函数来使用 binder3 类——bind3

//a helper function to call binder3
template <typename _Func, typename _P1,typename _arg1,typename _arg2,typename _arg3>
binder3<_Func, _P1, _arg1, _arg2, _arg3> bind3(_Func func, _P1 p1,_arg1 i,_arg2 j,_arg3 k)
{
    return binder3<_Func, _P1, _arg1, _arg2, _arg3> (func, p1,i,j,k);
}

现在,我们必须将它与 Command 类一起使用;使用下面的 typedef

typedef binder3<mem_fun3_t<int,T,int,int,int> ,T* ,int,int,int> F3;

//and change the signature of the ctor
//just to illustrate the usage with a method signature taking more than one parameter
explicit Command(T* pObj,F3* p_method,long timeout,const char* key,
    long priority = PRIO_NORMAL ):
m_objptr(pObj),m_timeout(timeout),m_key(key),m_value(priority),method1(0),method0(0),
    method(0)
{
    method3 = p_method;
}

下面是调用它的方式

F3 f3 = PluginThreadPool::bind3( PluginThreadPool::mem_fun3( 
          &CTask::ThreeParameterTask), task1,2122,23 );

注意:f3(); 将调用方法 task1->ThreeParameterTask(21,22,23);

cmd= new Command<CTask>(task1, &f3 ,task1->GetTimeOut(),
     "ne2_11",PluginThreadPool::PRIO_NORMAL);

我们已经完成了这些丑陋的部分……

好的,现在转向下一个模式:“责任链”。意图——通过给多个对象处理请求的机会,来避免请求的发送者和接收者之间的耦合。将接收对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。[GoF, p223]

基本上,这个模式可以被看作是一个对象的链表。我们将设计一个名为 ThreadChain 的类来模拟这个模式。在我们的案例中,我们将用它来作为 ThreadChain 对象的链表。这些对象将作用于一个 Command 对象的容器。每个 ThreadChain 对象都有一个与之关联的线程,以及一个名为 busy 的标志,如果与之关联的线程正在执行一个 Command,该标志将被设置为 true

对象的链接是在 ThreadPool 的构造函数中完成的——请参见代码片段。

///There is at least one thread in the thread pool
ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
root = prev;

for(int i=0;i<minthreadCount-1;++i)
{
    ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
    prev=temp;
    root = temp;
}

ThreadChain 类有一个名为 canHandle 的方法,它基本上检查其线程是否空闲以处理任务。如果不是,它就将请求传播到下一个对象。

bool canHandle()
{
    if(!busy)
    {
        SetEvent(g_Event); // signal the waiting thread for handling task
        return true;
    }
    :
    if(!next)
    {
        return false; ///max thread count reached ///If the thread count is already
                                                  ///max nothing to be done
    }
    return next->canHandle(); ///Else see if the next object in the chain is free
                              ///to handle the request
}

请注意,ThreadChain 类有一个名为 HandleRequests 的方法,它在与该对象关联的线程中等待某个事件。一旦收到该事件,它便开始处理请求。

void HandleRequest()
{
    while(!deleteMe)
    ///The deleteMe is set in the dtor usually if this threads time
    ///out is over
    {
        WaitForSingleObject(g_Event,INFINITE);
        ///This event is signaled from the
        ///canHandle method
        :
        busy=true;
        ///Now set that this object as busy-it is going to handle a request
        :
        Command<T>* temp = threadpool->GetRequest();
        :
        if(temp!=NULL)
        {
            :
            m_timeout = temp->GetTimeOut()*1000; /// The time out is in seconds
            temp->execute();
            delete temp;

每个 ThreadChain 对象还会检查与之关联的线程是否处于挂起状态;如果是,它将设置 deleteMe 标志并从链中解开自己。

///The method to handle the hung threads
void HandleHungThreads(ThreadChain<T>* prev)
{
    bool bIsHung =false;
    if(IsHung() || ReleaseIdleThread())
    {
        bIsHung = true;
        if(this == threadpool->root){ 
            threadpool->root = next_p; ///case if root is hanging
            prev= next; 
        } else
            // remove this item from thread chain link and link the
            // other two prev->next = GetNext(); 
    }
    :
    if(bIsHung)///if this is a hung thread 
    {
        Release(); ///then release it
        next->HandleHungThreads(prev);// 'this' is out of link
    }
    else
        next->HandleHungThreads(this); ///propagate to the next object

此外,如果在突发情况下,线程数从最小数量增加,之后负载下降,那么 ThreadChain 有一个名为 ReleaseIdleThreads 的方法来完成这个任务。

bool ReleaseIdleThread()
{
    WaitForSingleObject(threadChkMutex,INFINITE);
    if(threadpool->m_threadCount <= threadpool->minthreadCount)
    //thread count is equal to the minimum thread count
    {
        ReleaseMutex(threadChkMutex);
        return false;
    }
    :
}else if(GetTickCount()-lastactivetime > MAX_IDLETIME)
{
    printf("Max idle time exceeded for this thread\n");
    bReleasethis=true;
}

ThreadPool 是我们设计的一个新类,它充当控制器的角色。它将负责扩大或缩小线程池中的线程数量,以及启动对挂起线程的检查等。它持有一个包含 Command 对象的容器。该容器类名为 CRequestQueue

任务可以根据优先级来执行。优先级最高的任务将首先从队列中弹出。此外,每个队列都与一个键(key)相关联。这在有多个引擎注入任务时很有用;通过这种方法,你将以循环方式遍历每个键的第一个任务,从而避免任何一个过载的引擎独占资源。

ThreadPool 类还为客户端提供了一个名为 QueRequest 的接口,用于将客户端构建的 Command 对象推入队列。

class CRequestQueue
{
private:
    typedef priority_queue<Command<T>*,//value type
        deque<Command<T>*>,//container type
        Command<T>::Compare > REQUESTQUEUE; //comapritor

    //Map task key to the priority queue
    std::map<long,REQUESTQUEUE > RequestQueue

好了,我想我已经向你介绍了线程池模式的思路。关于更精细的实现细节,你可以查看附加的源代码。

注意——对于高于 VC6 的 VC 编译器,我添加了一个小的源码更新 - threapoolvc7.zip

参考文献和链接

  1. GoF - 设计模式
  2. 将 C++ 成员函数转换为函数对象
© . All rights reserved.