多线程编程好容易 (简单线程、线程池、对象池等)
这个简单的库实现了多线程编程的几乎所有方面。
引言
这个简单的库实现了多线程编程的几乎所有方面,并且不需要对多线程概念有深入的理解。我在编写这个库时,始终牢记库必须易于任何具备基本 C++ 和模板知识的程序员使用。
这个库消除了所有与线程管理相关的额外工作,因此您可以将时间集中在业务逻辑的实现上。
使用代码
这个库中有五个最重要的类和接口需要您了解:
CSafeThread
是一个模板类,描述了一个抽象线程。我决定将这个类定义为模板类,因为线程的参数是未知的,最重要的是,我希望在类的析构函数中调用Stop()
虚方法,这只适用于CSafeThread
是一个具体类的情况。CThreadInterface
是一个抽象类(接口)。这个类是您实现具体线程的基类。实现线程需要两个步骤。首先,定义您的线程类并派生自CThreadInterface
接口。BEGIN_THREAD_EVENT
– 当线程开始执行时发生,END_THREAD_EVENT
– 当线程通过退出Thread()
方法完成时发生,TERMINATE_THREAD_EVENT
– 当调用Stop()
方法时发生。CSequentialObjectPool
是一个模板类,实现了工作项队列处理器。对象池在单个线程中逐个处理队列中的对象。工作项是用户定义的具体类,派生自CWorkItemInterface
。- 最后一个最重要的类是
CThreadPool
类。为了在您的应用程序中创建线程池,您需要定义一个派生自CWorkItemInterface
接口的工作项类,然后实例化并初始化CThreadPool
类。最后,将工作项排队以便在线程池中进行处理。
class CMyThread: public mb_thread_lib::CThreadInterface<_bstr_t>
{
protected:
virtual void Thread(mb_thread_lib::smart_ptr<_bstr_t> sp)
{
//this is our thread, sp is a parameter we are passing in
// implement your thread code here
}
};
现在,实例化您的线程
mb_thread_lib::CSafeThread< CMyThread, _bstr_t> Thread;
启动线程
Thread.Start( mb_thread_lib::smart_ptr<_bstr_t>(
new _bstr_t("parameter for thread")) );
等待此线程完成
Thread.Wait();
您可以通过调用 CSafeThread
实例的 Stop()
方法或销毁实例来显式停止线程。如果您想接收线程事件的通知,您需要在您的线程类中实现 Notify()
方法。
void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
{
switch(evt)
{
case mb_thread_lib::BEGIN_THREAD_EVENT:
m_bStop = false;
break;
case mb_thread_lib::TERMINATE_THREAD_EVENT:
m_bStop = true;
break;
default:
break;
}
};
有三种事件类型:
让我们看一下源代码中的 TestSequentialObjectPool 示例。首先,我定义了 CTestWorkItem
类,这是一个工作项类。这个类实现了一个需要执行的任务。您必须将此类派生自 CWorkItemInterface
,它是一个抽象类。您可以创建不同类型的工作项来与同一个 CSequentialObjectPool
实例配合使用。
class CTestWorkItem: public mb_thread_lib::CWorkItemInterface
{
bool bPrcessing;
public:
CTestWorkItem()
{
}
~CTestWorkItem()
{
}
void ProcessWorkItem()
{
//execute our code here
//…
}
void AbortWorkItem()
{
//abort our item
//…
}
};
现在,我定义了 CPushThread
类。这是一个辅助线程类,用于将工作项推送到 CSequentialObjectPool
实例进行处理。这个类实现了 CSafeThread
类型的类(您已经知道如何创建线程,请参见上文)。
class CPushThread: public mb_thread_lib::CThreadInterface
<mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> >
{
volatile bool m_bStop;
protected:
void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
{
switch(evt)
{
case mb_thread_lib::BEGIN_THREAD_EVENT:
m_bStop = false;
break;
case mb_thread_lib::TERMINATE_THREAD_EVENT:
m_bStop = true;
break;
default:
break;
}
};
public:
void Thread(mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> > spWorkQueue)
{
for(int i = 0; ((!m_bStop)&&(i < MY_MAX_ITER*2)); ++i)
{
_bstr_t bstr("Val: ");
_variant_t var;
var = i;
var.ChangeType(VT_BSTR);
bstr = bstr + var.bstrVal;CTestWorkItem *pWorkItem = new CTestWorkItem();
pWorkItem->m_bstr = bstr;
mb_thread_lib::smart_ptr< mb_thread_lib::CWorkItemInterface >
spWorkItem(dynamic_cast<mb_thread_lib::CWorkItemInterface*>(pWorkItem));
if(!spWorkQueue->QueueWorkItem(spWorkItem))
break;
}
}
};
现在,我创建了一个 CSequentialObjectPool
类的实例,并且因为它是一个基于线程的类,所以我调用 Start()
方法来启动一个线程来处理工作项。然后,我调用我的推送线程(它是 CPushThread
类的实例)的 Start()
方法,并将 CSequentialObjectPool
实例的智能指针传递进去。
//create sequental object pool (work queue)
mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t> >
spWorkQueue(new mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t>());
//create a push thread (this thread feeds work queue
// with objects for sequental processing)
mb_thread_lib::CSafeThread<CPushThread,
mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> > PushThread;
//start work queue
spWorkQueue->Start(mb_thread_lib::smart_ptr<_bstr_t>(NULL));
//start push thread
PushThread.Start(spWorkQueue);
Sleep(1000);
//create thread pool
mb_thread_lib::CThreadPool ThreadPool;
//initialize thread pool with 2 MIN threads,
//15 MAX threads and thread idle time 10 sec
ThreadPool.InitPool(2, 15, 10000);
_variant_t var;
int i;
//creaet 50000 work items
for(i = 0; i < 50000; ++i)
{
var = i;
var.ChangeType(VT_BSTR);
CTestWorkItem* p = new CTestWorkItem();
p->m_bstr = var.bstrVal;
mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
//push work items into the thread pool
ThreadPool.QueueWorkItem(spWorkItem);
}
另一个有用的类是 CTimer
。为了使用 CTimer
类,您必须定义一个派生自 CWorkItemInterface
的工作项类,创建 CTimer
类的实例,并使用对工作项的引用和到期时间来启动计时器。
CTestWorkItem* p = new CTestWorkItem();
p->m_bstr = L"Timer work item.";
mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
mb_thread_lib::CTimer Timer;
DWORD d = 2000;
printf("execute handler every %d second(s)\n", d/1000);
Timer.Start(spWorkItem,d/* 2 sec. */);
Sleep(10000);
Timer.Stop();
除了这些主要类之外,还有几个同步类和一个智能指针实现。有两种类型的临界区类:CLocalCriticalSec
和 CGlobalCriticalSec
。CLocalCriticalSec
类是临界区 Win32 API 的封装器,而 CGlobalCriticalSec
是互斥量 Win32 API 的封装器。为了实例化一个命名的临界区,无论是本地的还是全局的,库中都有 CCriticalSecFactory
类。要销毁一个命名的临界区,请使用 CCriticalSecDisposer
类。您可以在 "ThreadTest.spp" 文件中找到一个示例,位于以下注释下方:“test critical sections”。CAutoCriticalSec
类允许您通过实例化带有对临界区实例引用的类来自动保护您的部分代码。