另一个线程池
一个具有最少依赖项的简单线程池
引言
本文介绍了一个简单的线程池。它展示了一些关于多线程编程的基本技术,因此对于初学者和已经有一些多线程编程基本经验的中级开发者来说都可能很有趣。当然,代码可以直接使用。
这个线程池类的基本优点是独立性。它不依赖任何库或框架,如STL或ATL,它只使用基本的Windows API。因此,它可以轻松集成到任何Windows应用程序中,而无需使用ATL、STL或MFC。而且它或多或少都很简单,所以即使您经验不足,也能理解它在做什么以及它是如何工作的。
Using the Code
要使用线程池,请将文件ThreadPool.h和ThreadPool.cpp包含在您的项目中,并在管理线程池的类中添加一个类型为CThreadPool
的成员变量。在示例项目(WTL)中,这是一个对话框
#include "ThreadPool.h"
...
class CMainDlg : public CDialogImpl<CMainDlg>
,public CUpdateUI<CMainDlg>
,public CMessageFilter
,public CIdleHandler
{
private:
CThreadPool m_ThreadPool;
...
}
然后初始化线程池。在示例中,这是在OnInitDialog
中完成的
HRESULT hr = m_ThreadPool.Init(2, 10);
Init()
方法接受最多4个参数
HRESULT Init(UINT uInitalNumberOfThreads, UINT uMaxNumberOfThreads = 0
,SIZE_T dwStackSize = 0, LPSECURITY_ATTRIBUTES pSecurityAttributes = NULL);
uInitalNumberOfThreads
是最初创建的线程数。线程数最多可能增长到uMaxNumberOfThreads
。dwStackSize
和pSecurityAttributes
是要传递给_beginthreadex
/ CreateThread
函数的参数。除了第一个参数外,所有参数都是可选的。
如果一切顺利,池将返回S_OK
;如果已经初始化,则返回E_FAIL
;如果出现问题,则返回包含标准Windows错误代码的HRESULT
。如果不是所有线程都能成功创建(这很可能与内存问题有关,见下文),它将返回S_FALSE
。
您可能还想给您的线程池一些工作。这可以通过派生自CThreadObjectBase
的类的worker对象来完成(在示例中为CThreadObject
)。这些类必须实现CThreadObjectBase
的两个虚方法
class CThreadObjectBase
{
public:
virtual void Run(CThreadPoolThreadCallback &pool) = 0;
virtual void Done() = 0;
};
第一个方法接受一个参数,即对CThreadPoolThreadCallback
对象的引用。这实际上就是线程池本身,但只提供一个方法:CanContinue()
,类型为BOOL
,worker对象用它来询问线程池是否应该终止执行。
Run()
方法由执行线程调用,应该执行实际的工作。此外,它应该不时地检查线程池是否想要关闭,如果是,则正常终止
void CThreadObject::Run(CThreadPoolThreadCallback &pool)
{
for (int n = 0; n < m_nRuns; n++)
{
if (!pool.CanContinue())
return;
// here we do the work
}
}
第二个方法Done()
在Run()
方法返回后调用,或者当worker对象从列表中移除而未运行时调用(因为线程池可能在所有等待对象被处理之前关闭),并应用于清理。在示例项目中,worker对象在通知主窗口已完成其任务后会自我删除
void CThreadObject::Done()
{
InterlockedDecrement(&s_lCount);
if (::IsWindow(m_hWndNotify))
PostMessage(m_hWndNotify, WMX_OBJECT_REMOVED, NULL, (LPARAM)m_n);
delete this;
}
要将对象添加到线程池,请调用CThreadPool
的Add()
方法,并将对象的指针作为参数传递
m_ThreadPool.Add(new CThreadObject());
对象将被放入等待队列,一旦有可用线程,它的Run()
方法就会被执行。
要关闭线程池,请使用Close()
方法。这将清空队列,停止并删除所有线程,并清理所有使用的事件。此外,您还可以通过调用EmptyQueue()
来仅仅清空队列,这将从队列中移除所有等待的worker对象。
CThreadPool
还有一些用于统计信息的方法
GetActiveThreadCount()
返回当前正在运行对象的线程数。GetThreadCount()
返回已创建的线程数。GetMaxThreadCount()
返回可能创建的最大线程数。GetThreadId(UINT n)
返回特定线程的标识符。n
是线程的索引。GetThreadStatus(UINT n)
返回线程的状态:工作(TRUE
)或等待(FALSE
)。n
是线程的索引。
它是如何工作的?
对象队列
首先有一个包含等待执行的worker对象的队列。为了尽可能降低依赖性,我决定避免使用任何STL或ATL列表类。无论如何,这个队列非常简单,它是一个基本的FIFO(先进先出)、单向链表。它只需要提供很少的功能:在列表末尾添加对象、返回列表的第一个元素以及判断列表是否为空。当然,还有一个用于清理列表的RemoveAll()
方法。
但是列表必须是线程安全的,这通过使用CRITICAL_SECTION
和一个访问器类来实现。所有对列表的访问都通过这个访问器类CThreadObjectListAccessor
完成。每次池需要访问列表时都会创建一个实例。访问器的ctor
锁定列表,dtor
解锁列表。所有访问列表的方法都在访问器类中,因此所有操作都通过访问器完成。使用此技术可确保在每次请求之前始终锁定列表,并在执行请求(一个或多个)的函数返回时解锁列表。
线程过程
线程过程看起来是这样的
DWORD WINAPI CThreadPool::CThread::ThreadProc(LPVOID p)
{
LPTHREAD pThis = (LPTHREAD)p;
CThreadPoolThreadCallback& pool =
(CThreadPoolThreadCallback&)(pThis->m_Pool);
HANDLE phWaitHandles[] =
{
pThis->m_Pool.m_hEventShutdown
,pThis->m_Pool.m_hEventObjectAvailable
};
BOOL bRun = TRUE;
DWORD dw = 0;
LPTHREADOBJECT pThreadObject = NULL;
// the main thread loop
while(bRun)
{
dw = WaitForMultipleObjects(2, phWaitHandles, FALSE, INFINITE);
switch(dw)
{
case WAIT_OBJECT_0+1: // object available
pThis->m_bHasObject = pThis->m_Pool.GetNextObject(pThreadObject);
if (pThis->m_bHasObject)
{
InterlockedIncrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
pThreadObject->Run(pool);
pThreadObject->Done();
pThis->m_bHasObject = FALSE;
InterlockedDecrement((LONG*)&pThis->m_Pool.m_uThreadsActive);
}
// fallthrough
case WAIT_TIMEOUT: // timedout
break;
case WAIT_OBJECT_0: // shutdown
default: // some error
bRun = FALSE;
break;
}
}
pThis->m_dwThreadResult = dw;
#ifdef THREADPOOL_USE_CRT
_endthreadex(dw);
#else
ExitThread(dw);
#endif
return dw;
}
线程在一个循环中运行,并等待其中一个句柄被信号。当有worker对象添加到队列时,m_hEventObjectAvailable
会收到信号。只要队列不为空,它就会保持信号状态。当池想要关闭时,m_hEventShutdown
会收到信号,并且线程会像池的CanContinue()
方法一样查询它,以告知正在运行的worker对象是否应停止其工作。
如果m_hEventObjectAvailable
收到信号,线程将通过调用线程池的GetNextObject()
来请求下一个等待对象。如果队列不为空,此方法会将对象传递给pThreadObject
参数并返回TRUE
。然后线程更新m_uThreadsActive
成员,并调用线程对象的Run()
和Done()
成员。
在ThreadProc()
的末尾,您可以看到一个预编译开关THREADPOOL_USE_CRT
,它也用于CThread::Start
HRESULT CThreadPool::CThread::Start(SIZE_T dwStackSize
,LPSECURITY_ATTRIBUTES pSecurityAttributes)
{
// start the thread
#ifdef THREADPOOL_USE_CRT
m_hThread = (HANDLE) _beginthreadex(pSecurityAttributes,
(unsigned int)dwStackSize
,(unsigned (__stdcall*)(void*)) CThreadPool::CThread::ThreadProc
,this, 0, (unsigned int*)&m_dwThreadID);
#else
m_hThread = ::CreateThread(pSecurityAttributes, dwStackSize
,CThreadPool::CThread::ThreadProc, this, 0, &m_dwThreadID);
#endif
HRESULT hr = HRESULT_FROM_WIN32(GetLastError());
return (NULL == m_hThread) ? hr : S_OK;
}
关于_beginthread
、_beginthreadex
和CreateThread
以及何时使用它们的讨论也存在于互联网上。这两个函数的基本规则是,如果您使用CRT,则应使用_beginthread
/ _beginthreadex
;如果您不使用CRT,则应使用CreateThread
。
将Worker对象添加到队列
HRESULT CThreadPool::Add(LPTHREADOBJECT pThreadObject)
{
THREADASSERT(NULL != pThreadObject);
CThreadObjectListAccessor acc(m_ThreadObjectList);
HRESULT hr = acc.Add(pThreadObject);
if (FAILED(hr))
return hr;
if ((m_uThreadsActive == m_uThreadsCreated)
&& (m_uThreadsCreated < m_uThreadCount))
{
AddThread();
}
SetEvent(m_hEventObjectAvailable);
return S_OK;
}
使用访问器,新对象被添加到队列中。这在内存不足时可能会失败(分配新的列表节点),因此Add()
可能会返回E_OUTOFMEMORY
结果。在worker对象添加后,池会检查所有线程是否都在忙,如果是,并且尚未达到最大线程数,则通过调用AddThread()
向池添加新线程。之后,它会将m_hEventObjectAvailable
设置为信号状态,以便下一个空闲线程将获取添加的对象。当队列为空时,在获取worker对象后,m_hEventObjectAvailable
将在GetNextObject()
中重置。
可以创建多少个线程?
这通常是多线程编程论坛上的一个话题。正如许多人已经发现的那样(微软也这么说),线程的最大数量主要与进程的地址空间有关。在win32上,由于每个线程默认使用1MB的堆栈大小,而进程的地址空间是2GB(请参阅http://blogs.msdn.com/oldnewthing/archive/2005/07/29/444912.aspx),最大线程数约为2000个。因此,如果您计划创建超过2000个线程,应该将dwStackSize
参数设置为低于1MB的值。但无论如何,您真的需要2000多个线程吗?
最后
有一点需要注意:池本身不是线程安全的。这意味着,您不应该从一个线程调用例如Init()
,而从另一个线程调用Close()
。这也是worker对象仅通过CThreadPoolThreadCallback
访问池的原因之一。
如果您计划将SECURITY_ATTRIBUTES
传递给池,那么它们必须在池存在期间(至少直到您调用Close()
)保持有效,因为池只是复制指针,而不是SECURITY_ATTRIBUTES
的内容。
历史
- 2010年5月6日:首次发布