C++插件线程池设计






4.21/5 (10投票s)
使用命令模式和责任链模式实现一个插件式线程池库。
使用命令模式和责任链模式实现线程池
命令模式和责任链模式可以优雅地用于实现线程池设计。顾名思义,线程池是一个可用于执行任何工作的线程集合。拥有线程池的主要优点是它减少了周期性创建和销毁线程的开销。这在需要频繁创建和删除线程的应用场景中尤为重要。
我们的目标是提供一个线程池类的设计,该设计可以集成到现有代码中,而无需更改现有类的结构。也就是说,遵循开闭原则(OC principles),提供一个插件式库,使遗留类能够利用线程池的概念。进一步阐述,如果有一个类“X
”,它有一个线程安全的方法“methodX
”,那么用户应该能够通过线程池暴露的一个方法,在线程中调用 methodX
。
下面是一个代码片段,其中类 X
= CTask
,methodX
= DoSomeTask
。创建一个线程池,比如说,初始有 5 个线程,最多 10 个线程。
ThreadPool<CTask> t1(5,10); // Initial thread size 5, max thread size 10
然后,创建一个 Command
对象,并指定你希望在线程池中的某个线程中调用的方法,本例中为 CTask::DoSomeTask
。
Command<CTask>* cmd =0;
cmd= new Command<CTask>(task1, // pointer to the CTask object whose --
&CTask::DoSomeTask, // method that will be executed in a
// thread in the threadpool
task1->GetTimeOut(), // the maximum time for the task
// to finish , before it is signaled
// as a hang
"ne2_11", // a key that can be used as a logical bucket
// for tasks (can be unique also)
PRIO_NORMAL); // the priority of the task
之后,将该 Command 对象推入一个队列中,t1.QueueRequest(cmd);
,这就完成了;线程池会处理剩下的事情。
好了,在我们继续之前,我们必须先熟悉命令模式和责任链模式。
命令模式的意图
将一个请求封装成一个对象,从而使你可用不同的请求对客户进行参数化;对请求排队或记录请求日志,以及支持可撤销的操作。[GoF, p233]
基本上,你可以用它来封装对象及其方法;所以,这里的代码片段或类设计会更具启发性。请注意,这只是该类设计的一种方式,另一种方式是使用一个虚方法调用 execute,派生类可以重写并实现它。但是,下面将是我们的实现方式。
class Command
{
private:
CTest* m_objPtr; // the object pointer
void (CTest::*m_method)(); // A pointer to the method
public:
explicit Command(CTest* ptr;, void (CTest::*pmethod)()) // constructor
{
m_objPtr =ptr;
m_method = pmethod;
}//The execute method that is invoked by the client
void execute()
{
(m_objptr->*method)(); // Calls the method of the object
}// And now the destructor
~Command(){}; //Note do not call 'delete m_objptr' as we do not own this
}
嗯,这是我们的构建模块,下一步是使这个类变得通用。另外,注意我们对方法的定义将其限制为使用签名为 void method(void);
的方法。这可以被修改,稍后会展示。
好的,下面是这个类的通用版本
template <typename T>
class Command
{
T* m_objptr;
void (T::*method)();
public:
//Takes the pointer to an object and a pointer to the method
explicit Command(T* pObj,void (T::*p_method)())
{
:
其余部分与之前的类结构相同。现在,如果你需要为你的方法使用不同的签名,你必须使用 std::bind1st
或 std::bind2nd
,或者编写一个类似的类和辅助方法。
假设你的方法有以下签名
int CTask::ThreeParameterTask(int par1, int par2, int par3)
我们将看看如何将其适配到命令模式中——为此,首先,你必须编写一个成员函数适配器,以便它可以像函数对象一样被调用。
注意——这很丑陋,也许你可以使用 Boost 的 bind 辅助工具等,但如果你不能或不想用,这是一种方法。
// a template class for converting a member function of the type int function(int,int,int)
//to be called as a function object
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
class mem_fun3_t
{
public:
explicit mem_fun3_t(_Ret (_Class::*_Pm)(_arg1,_arg2,_arg3))
:m_Ptr(_Pm) //okay here we store the member function pointer for later use
{}
//this operator call comes from the bind method
_Ret operator()(_Class *_P, _arg1 arg1, _arg2 arg2, _arg3 arg3) const
{
return ((_P->*m_Ptr)(arg1,arg2,arg3));
}
private:
_Ret (_Class::*m_Ptr)(_arg1,_arg2,_arg3);// method pointer signature
};
此外,我们需要一个辅助方法 mem_fun3
来辅助调用上述类。
template<typename _Ret,typename _Class,typename _arg1,typename _arg2,typename _arg3>
mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3> mem_fun3 ( _Ret (_Class::*_Pm)(_arg1,_arg2,_arg3) )
{
return (mem_fun3_t<_Ret,_Class,_arg1,_arg2,_arg3>(_Pm));
}
现在,为了绑定参数,我们必须编写一个绑定器函数。所以,它来了
template<typename _Func,typename _Ptr,typename _arg1,typename _arg2,typename _arg3>
class binder3
{
public:
//This is the constructor that does the binding part
binder3(_Func fn,_Ptr ptr,_arg1 i,_arg2 j,_arg3 k)
:m_ptr(ptr),m_fn(fn),m1(i),m2(j),m3(k){}
//and this is the function object
void operator()() const
{
m_fn(m_ptr,m1,m2,m3);//that calls the operator
}
private:
_Ptr m_ptr;
_Func m_fn;
_arg1 m1; _arg2 m2; _arg3 m3;
};
以及一个辅助函数来使用 binder3
类——bind3
//a helper function to call binder3
template <typename _Func, typename _P1,typename _arg1,typename _arg2,typename _arg3>
binder3<_Func, _P1, _arg1, _arg2, _arg3> bind3(_Func func, _P1 p1,_arg1 i,_arg2 j,_arg3 k)
{
return binder3<_Func, _P1, _arg1, _arg2, _arg3> (func, p1,i,j,k);
}
现在,我们必须将它与 Command
类一起使用;使用下面的 typedef
typedef binder3<mem_fun3_t<int,T,int,int,int> ,T* ,int,int,int> F3;
//and change the signature of the ctor
//just to illustrate the usage with a method signature taking more than one parameter
explicit Command(T* pObj,F3* p_method,long timeout,const char* key,
long priority = PRIO_NORMAL ):
m_objptr(pObj),m_timeout(timeout),m_key(key),m_value(priority),method1(0),method0(0),
method(0)
{
method3 = p_method;
}
下面是调用它的方式
F3 f3 = PluginThreadPool::bind3( PluginThreadPool::mem_fun3(
&CTask::ThreeParameterTask), task1,2122,23 );
注意:f3();
将调用方法 task1->ThreeParameterTask(21,22,23);
。
cmd= new Command<CTask>(task1, &f3 ,task1->GetTimeOut(),
"ne2_11",PluginThreadPool::PRIO_NORMAL);
我们已经完成了这些丑陋的部分……
好的,现在转向下一个模式:“责任链”。意图——通过给多个对象处理请求的机会,来避免请求的发送者和接收者之间的耦合。将接收对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。[GoF, p223]
基本上,这个模式可以被看作是一个对象的链表。我们将设计一个名为 ThreadChain
的类来模拟这个模式。在我们的案例中,我们将用它来作为 ThreadChain
对象的链表。这些对象将作用于一个 Command
对象的容器。每个 ThreadChain
对象都有一个与之关联的线程,以及一个名为 busy
的标志,如果与之关联的线程正在执行一个 Command
,该标志将被设置为 true
。
对象的链接是在 ThreadPool
的构造函数中完成的——请参见代码片段。
///There is at least one thread in the thread pool
ThreadChain<T> *prev = new ThreadChain<T>(0,this);///last node
root = prev;
for(int i=0;i<minthreadCount-1;++i)
{
ThreadChain<T> *temp = new ThreadChain<T>(prev,this);
prev=temp;
root = temp;
}
ThreadChain
类有一个名为 canHandle
的方法,它基本上检查其线程是否空闲以处理任务。如果不是,它就将请求传播到下一个对象。
bool canHandle()
{
if(!busy)
{
SetEvent(g_Event); // signal the waiting thread for handling task
return true;
}
:
if(!next)
{
return false; ///max thread count reached ///If the thread count is already
///max nothing to be done
}
return next->canHandle(); ///Else see if the next object in the chain is free
///to handle the request
}
请注意,ThreadChain
类有一个名为 HandleRequests
的方法,它在与该对象关联的线程中等待某个事件。一旦收到该事件,它便开始处理请求。
void HandleRequest()
{
while(!deleteMe)
///The deleteMe is set in the dtor usually if this threads time
///out is over
{
WaitForSingleObject(g_Event,INFINITE);
///This event is signaled from the
///canHandle method
:
busy=true;
///Now set that this object as busy-it is going to handle a request
:
Command<T>* temp = threadpool->GetRequest();
:
if(temp!=NULL)
{
:
m_timeout = temp->GetTimeOut()*1000; /// The time out is in seconds
temp->execute();
delete temp;
每个 ThreadChain
对象还会检查与之关联的线程是否处于挂起状态;如果是,它将设置 deleteMe
标志并从链中解开自己。
///The method to handle the hung threads
void HandleHungThreads(ThreadChain<T>* prev)
{
bool bIsHung =false;
if(IsHung() || ReleaseIdleThread())
{
bIsHung = true;
if(this == threadpool->root){
threadpool->root = next_p; ///case if root is hanging
prev= next;
} else
// remove this item from thread chain link and link the
// other two prev->next = GetNext();
}
:
if(bIsHung)///if this is a hung thread
{
Release(); ///then release it
next->HandleHungThreads(prev);// 'this' is out of link
}
else
next->HandleHungThreads(this); ///propagate to the next object
此外,如果在突发情况下,线程数从最小数量增加,之后负载下降,那么 ThreadChain
有一个名为 ReleaseIdleThreads
的方法来完成这个任务。
bool ReleaseIdleThread()
{
WaitForSingleObject(threadChkMutex,INFINITE);
if(threadpool->m_threadCount <= threadpool->minthreadCount)
//thread count is equal to the minimum thread count
{
ReleaseMutex(threadChkMutex);
return false;
}
:
}else if(GetTickCount()-lastactivetime > MAX_IDLETIME)
{
printf("Max idle time exceeded for this thread\n");
bReleasethis=true;
}
ThreadPool
是我们设计的一个新类,它充当控制器的角色。它将负责扩大或缩小线程池中的线程数量,以及启动对挂起线程的检查等。它持有一个包含 Command
对象的容器。该容器类名为 CRequestQueue
。
任务可以根据优先级来执行。优先级最高的任务将首先从队列中弹出。此外,每个队列都与一个键(key)相关联。这在有多个引擎注入任务时很有用;通过这种方法,你将以循环方式遍历每个键的第一个任务,从而避免任何一个过载的引擎独占资源。
ThreadPool
类还为客户端提供了一个名为 QueRequest
的接口,用于将客户端构建的 Command
对象推入队列。
class CRequestQueue
{
private:
typedef priority_queue<Command<T>*,//value type
deque<Command<T>*>,//container type
Command<T>::Compare > REQUESTQUEUE; //comapritor
//Map task key to the priority queue
std::map<long,REQUESTQUEUE > RequestQueue
好了,我想我已经向你介绍了线程池模式的思路。关于更精细的实现细节,你可以查看附加的源代码。
注意——对于高于 VC6 的 VC 编译器,我添加了一个小的源码更新 - threapoolvc7.zip。