一个同步线程完成的类






4.70/5 (28投票s)
轻松同步线程完成。
引言
将一个复杂的任务分解成一系列小的子任务可能非常有益;作为软件开发者,我们一直在这样做。有些子任务必须按顺序执行,而其他子任务可以并行处理。有时,有些任务可以并行执行,为另一个在所有子任务完成之前无法启动的任务做准备。如果这听起来像多线程,那你就说对了。尤其是在拥有多处理器机器时,很容易启动两个或更多后台线程来执行特定任务。但关键时刻来了!由于每个线程都独立于主应用程序线程运行,因此必须同步线程,以确保在进行下一步之前所有必需的子任务都已完成。这通常涉及大量代码来跟踪线程句柄,以及调用 WaitForMultipleObjects()
来等待所有线程完成。代码本身足够简单,但每次需要时编写它都可能很乏味。因此有了这些类。
基本线程 API
在查看这些类之前,让我们简要回顾一下类所使用的基本线程 API _beginthreadex()
。此 API 允许我们启动一个线程,指定线程过程的地址、一个 void
指针,该指针可用于将数据传递给线程(通常是指向您程序员定义的包含线程所需数据的 struct
的指针)、线程的堆栈大小,以及线程是应立即运行还是应挂起直到稍后。如果挂起,它将一直不运行,直到我们给它开绿灯。
还有两个参数我不太关心,一个允许我们指定线程句柄是否可以被子进程继承,另一个可选地指向一个 32 位位置,该位置将接收线程标识符。 有关这些参数的详细信息,请参阅 MSDN。
我们从 _beginthreadex()
收到的返回值是一个 HANDLE
,我们可以稍后使用它来检测线程是否已终止以及何时终止。这是一个打开的句柄,因此将来某个时候需要关闭句柄。(顺便说一句,我见过的一个非常常见的错误是程序在代表它们的线程终止后不关闭线程句柄。您可以通过打开任务管理器并将“线程”和“句柄”列添加到进程显示中来检测此问题。随着线程的创建和终止,您会看到句柄计数上升并持续上升。即使在 Windows NT 及其派生操作系统上,句柄也是有限的资源,如果您的应用程序运行时间足够长而不清理其线程句柄,它**终将**使系统瘫痪)。
通过调用 WaitForSingleObject()
并传入 _beginthreadex
返回的句柄,我们可以检测线程是否已终止以及何时终止。如果线程已终止,WaitForSingleObject()
将立即返回,返回值为 WAIT_OBJECT_0
。如果线程仍在运行,则 WaitForSingleObject()
将等待直到线程终止,或直到 WaitForSingleObject()
调用中指定的超时时间过去。如果超时时间过去,WaitForSingleObject()
将返回 WAIT_TIMEOUT
。很简单!代码看起来像这样。
HANDLE hHandle = _beginthreadex(NULL, 0, MyTheadProc, NULL, 0, NULL);
switch (WaitForSingleObject(hHandle, 50000))
{
case WAIT_OBJECT_0:
// The thread has terminated - do something
break;
case WAIT_TIMEOUT:
// The timeout has elapsed but the thread is still running
// do something appropriate for a timeout
break;
}
CloseHandle(hHandle);
所有这些都相当直接。50000 是以毫秒为单位的超时时间。另一方面,请记住您必须跟踪线程句柄,确保关闭它,并编写一个 switch
语句来处理这两种情况。如果您需要跟踪的线程不止一个,情况会更复杂。在这种情况下,您需要使用 WaitForMultipleObjects()
并向其传递一个线程句柄数组。WaitForMultipleObjects()
的返回值将是 WAIT_TIMEOUT
、某个错误代码,或介于 WAIT_OBJECT_0
和 WAIT_OBJECT_0 + 正在等待的句柄数 - 1
之间的值。在这种情况下,我遇到的唯一错误是无效句柄错误代码,这表明我正在等待的一个句柄已被关闭。当然,错误代码不会指示关闭了哪个句柄!
句柄是如何被关闭的?可能是程序员(我)过早地显式关闭了句柄,或者通过调用 _beginthread()
,这是一个最初很吸引人的线程 API,因为它打字较少。您调用 _beginthread()
,指定线程过程地址、堆栈大小以及上述指向线程所需数据的 void
指针。调用 _beginthread()
后,线程正在运行(无法挂起)。需要注意的是,_beginthread()
在返回到您的代码之前会关闭线程句柄——这意味着您**无法**等待该句柄。如果您尝试这样做,那么任一等待函数(WaitForSingleObject()
或 WaitForMultipleObjects()
)将立即返回无效错误代码!
最后,为什么使用 _beginthreadex()
而不是 CreateThread()
?很简单,因为 _beginthreadex()
为我们管理线程本地存储,无论是在创建线程时还是在稍后线程结束时。
CBaseThread
此类仅封装单个线程。稍后的类使用 CBaseThread
来同步或异步地等待给定的一组线程终止。该类是 _beginthreadex()
函数的一个薄包装器。它负责跟踪 _beginthreadex()
返回的线程句柄的详细信息,确保最终关闭句柄,并提供一些成员函数来访问线程句柄并等待它。该类看起来像这样
class CBaseThread : public CObject
{
DECLARE_DYNAMIC(CBaseThread);
public:
CBaseThread(HANDLE hStopEvent, volatile bool *bStop,
unsigned(__stdcall *thread)(void *),
bool bWait = false, LPVOID data = NULL);
~CBaseThread();
bool IsWaiting() const { return m_bWaiting; }
volatile bool Stop() const { return *m_bStop; }
HANDLE StopEvent() const { return m_hStopEvent; }
HANDLE ThreadHandle() const { return m_hThreadHandle; }
LPVOID UserData() const { return m_pvUserData; }
virtual bool Wait(DWORD dwTimeout = INFINITE) const;
bool Run() const
{ return ResumeThread(m_hThreadHandle) == 1; }
UINT ThreadID() const { return m_uiThreadID; }
private:
LPVOID m_pvUserData;
HANDLE m_hStopEvent,
m_hThreadHandle;
volatile bool *m_bStop,
m_bWaiting;
UINT m_uiThreadID;
};
在构造函数中,hStopEvent
句柄和 bStop
变量用于为外部世界提供一种信号机制,告诉线程它应该停止运行。我稍后将在文章中讨论这些。第三个参数是线程过程的地址。第四个参数 bWait
参数默认为 false
,表示线程应在创建后立即运行。如果 bWait
为 true
,则线程将被创建但挂起,直到调用 Run()
或 Wait()
方法后才运行。data
参数是上述指向线程所需数据的 void
指针。该类对此参数不做任何解释。Run()
使线程运行(如果它被创建为挂起状态)。Wait()
会在线程被创建为挂起状态时使其运行,然后等待直到线程终止或超时。
析构函数关闭线程句柄。
有一个新类(于2004年10月16日添加)称为 CUserThread
,它在文章的末尾进行了描述。
CSyncRendevouz
是一个封装创建多个线程、跟踪其线程句柄并确保所有线程都已完成,然后才允许处理的下一步开始的类。此类执行同步操作。换句话说,一旦您使用该类启动了一组线程并调用了Wait()
方法,调用 Wait()
的线程的执行就会停止,直到通过该类实例启动的所有线程都终止为止。该类看起来像这样
class CSyncRendevouz : public CObject
{
DECLARE_DYNAMIC(CSyncRendevouz);
public:
CSyncRendevouz(void);
~CSyncRendevouz(void);
void Stop()
{ m_bStop = TRUE; SetEvent(m_hStopEvent); }
virtual bool Wait(DWORD dwTimeout = INFINITE);
void AddThread(unsigned(__stdcall *thread)(void *),
bool bWait = false,
LPVOID data = NULL);
bool AddHandle(HANDLE hHandle);
protected:
CArray<HANDLE, HANDLE> m_handleArray;
CList<CBaseThread*, CBaseThread *> m_threads;
HANDLE m_hStopEvent;
volatile bool m_bStop;
};
用法可能如下所示CSyncRendevouz rendevouz;
rendevouz.AddThread(Thread1);
rendevouz.AddThread(Thread2);
rendevouz.Wait(50000);
这会创建一个 CSyncRendevouz
对象,将 Thread1
和 Thread2
添加到该对象,然后调用 Wait()
。在示例中,它最多等待 50 秒让线程执行。如果两个线程都在超时时间内终止,则 Wait()
调用返回 true
,否则返回 false
。上面我说代码片段将 Thread1
和 Thread2
添加到对象。仔细查看函数原型应该会发现,实际上传递给 AddThread()
方法的是线程过程的**地址**,这意味着 AddThread()
调用是创建线程的地方。这正是发生的情况,通过为每个线程创建一个 CBaseThread
对象。让我们看一下 AddThread()
的代码。
bool CSyncRendevouz::AddThread(unsigned(__stdcall *thread)(void *),
volatile bool bWait,
LPVOID data)
{
if (m_handleArray.GetCount() > MAXIMUM_WAIT_OBJECTS - 1)
return false;
ASSERT(thread);
CBaseThread *pThread = new CBaseThread(m_hStopEvent, &m_bStop, thread,
bWait,
data);
ASSERT(pThread);
ASSERT_KINDOF(CBaseThread, pThread);
m_threads.AddTail(pThread);
m_handleArray.Add(pThread->ThreadHandle());
return true;
}
首先检查此 CSyncRendevouz
实例正在监视多少个句柄,如果达到限制(当前为 64),则返回 false
。(限制由 Windows 设置)。如果我们还没有达到限制,该函数会创建一个新的 CBaseThread
对象,并向其传递用户数据、用户参数以及 CSyncRendevouz
对象的一部分对象。新创建的 CBaseThread
对象保存句柄和指针,并启动一个新线程,将其自己的地址作为线程数据传递。然后,我们将新对象添加到列表中(用于稍后删除),并将线程句柄添加到数组中。
当所有线程都创建完毕后,就该监视它们了。回想我们之前的讨论,线程可能被创建为挂起状态,或者在我们准备调用 Wait()
方法时可能已经在运行。因此,Wait()
方法必须遍历线程对象列表,并首先允许所有挂起的线程运行。这是通过以下代码完成的。
bool CSyncRendevouz::Wait(DWORD dwTimeout)
{
CBaseThread *pThread;
POSITION pos = m_threads.GetHeadPosition();
while (pos != POSITION(NULL))
{
pThread = m_threads.GetNext(pos);
ASSERT(pThread);
ASSERT_KINDOF(CBaseThread, pThread);
if (pThread->IsWaiting())
pThread->Run();
}
return WaitForMultipleObjects(m_handleArray.GetCount(),
m_handleArray.GetData(),
TRUE, dwTimeout) != WAIT_TIMEOUT;
}
请注意,还有一个 AddHandle()
成员,它允许您将任何可等待的句柄添加到等待列表中。
CAsyncRendevouz
此类派生自 CSyncRendevouz
。当您在此类上调用 Wait()
方法时,它会创建一个另一个线程来执行 Wait()
函数并立即返回。当等待结束时(通过所有线程终止或超时),工作线程会向一个窗口发送消息。消息和目标窗口句柄都在创建 CAsyncRendevouz
对象时指定。此类看起来像这样
class CAsyncRendevouz : public CSyncRendevouz
{
DECLARE_DYNAMIC(CAsyncRendevouz);
public:
CAsyncRendevouz(HWND wndTarget, UINT uiMsg,
LPVOID pvUserData = NULL);
~CAsyncRendevouz();
virtual bool Wait(DWORD dwTimeout = INFINITE);
private:
static unsigned __stdcall WaitProc(LPVOID data);
HWND m_wndTarget;
UINT m_uiMsg;
DWORD m_dwTimeout;
LPVOID m_pvUserData;
CBaseThread *m_pThread;
};
这看起来相当简单,除了构造函数中的 LPVOID
参数。这不应与 CSyncRendevouz::AddThread()
方法中同名同类型的参数混淆。CSyncRendevouz::AddThread()
调用中的参数是传递给线程过程的用户数据。CAsyncRendevouz
构造函数中的参数是作为 LPARAM
数据发布到指定窗口的消息的一部分传递的数据。
CAsyncRendevouz::Wait()
看起来像这样。
bool CAsyncRendevouz::Wait(DWORD dwTimeout)
{
m_dwTimeout = dwTimeout;
m_pThread = new CBaseThread(m_hStopEvent, &m_bStop, WaitProc, 0,
LPVOID(this));
return TRUE;
}
这会创建一个 CBaseThread
对象,并将 CAsyneRendevouz
对象作为用户数据传递。线程过程看起来像这样unsigned __stdcall CAsyncRendevouz::WaitProc(LPVOID data)
{
{
DEREF(data);
CAsyncRendevouz *pThis = (CAsyncRendevouz *) pThread->UserData();
ASSERT(pThis);
ASSERT_KINDOF(CAsyncRendevouz, pThis);
bool bResult = pThis->CSyncRendevouz::Wait(pThis->m_dwTimeout);
if (IsWindow(pThis->m_wndTarget))
::PostMessage(pThis->m_wndTarget, pThis->m_uiMsg,
WPARAM(bResult), LPARAM(pThis->m_pvUserData));
}
_endthreadex(0);
// Not reached
return 0;
}
其中有几点并不显而易见。第一个是额外的括号对。严格来说,在这个过程中它们不是必需的,但我学会了总是使用它们。为什么?因为我用对 _endthreadex()
的调用来终止线程。去查阅 MSDN 关于该函数的文档。还是不明白?我一点也不惊讶。文档中没有告诉你的是,调用 _endthreadex()
会立即终止线程,并且不会运行与 _endthreadex(()
调用在同一作用域创建的任何对象的析构函数。我是通过艰难的方式发现这一点的。所以我总是用一对括号将线程的工作代码括起来,并确保 _endthreadex()
调用在工作代码的作用域之外。CAsyncRendevouz::Wait()
调用 CSyncRendevouz::Wait()
,并在等待调用退出后将用户定义的 message
值发布到指定的窗口。退出状态(超时或所有线程终止)作为 WPARAM
值传递,用户数据作为 LPARAM
值传递。
另一个不显眼的东西是 DEREF
宏。它是这样定义的:
#define DEREF(data) \
rendevouz::CBaseThread *pThread = (rendevouz::CBaseThread *) data; \
ASSERT(pThread); \
ASSERT_KINDOF(CBaseThread, pThread);
该宏所做的只是定义一个名为 pThread
的变量,该变量是指向 CBaseThread
对象的指针。还记得我之前说过 CBaseThread
创建线程并将自己的地址作为线程数据传递吗?您很快就会明白原因。
控制线程
有时有必要在线程完成正常执行之前终止它。也许是一个处理图像数据的线程,需要几分钟才能处理完。或者是一个监控远程连接的线程,该连接每小时发送一次数据。无论哪种情况,您都遵循了良好的实践,弹出一个对话框显示进度和一个取消按钮。用户点击取消按钮。如何终止线程?一种非常糟糕的方法是使用 TerminateThread()
API。
TerminateThread
的 MSDN 文档摘录:TerminateThread is used to cause a thread to exit. When this
occurs, the target thread has no chance to execute any user-mode code and its
initial stack is not deallocated. DLLs attached to the thread are not notified
that the thread is terminating.
TerminateThread is a dangerous function that should only be
used in the most extreme cases. You should call TerminateThread
only if you know exactly what the target thread is doing, and you control all of
the code that the target thread could possibly be running at the time of the
termination. For example, TerminateThread can result in the
following problems:
- If the target thread owns a critical section, the critical section will not be released.
- If the target thread is allocating memory from the heap, the heap lock will not be released.
- If the target thread is executing certain kernel32 calls when it is terminated, the kernel32 state for the thread's process could be inconsistent.
- If the target thread is manipulating the global state of a shared DLL, the state of the DLL could be destroyed, affecting other users of the DLL.
听起来很糟糕。事实上,它太糟糕了,以至于我**从不**使用 TerminateThread()
。坦率地说,我宁愿弹出一个紧急错误消息框并终止整个进程。举个例子,如果您恰好在线程获取了程序全局堆锁时终止了该线程,那么所有后续的内存分配尝试都会阻塞,等待一个已不再运行的线程释放锁。它不运行,所以它永远不会释放那个锁!
安全地终止线程的唯一方法是使其了解外部世界,并使其轮询外部变量或将外部事件对象包含在其正在等待的对象列表中。许多线程不需要了解外部世界。它们做一些工作,必要时等待一小段时间,超时然后退出。但其他线程可能正在执行可能需要几分钟或几小时的工作。即使您不给用户中止正在进行的工作的选项,您仍然必须考虑到用户可能希望中止整个程序的可能性。但是,如果其中一个线程未能终止,程序将不会终止。即使用户界面被销毁,程序仍然在运行,并且仍然显示在任务管理器中。
在 《重叠 I/O 的一种用途》[^] 中,我解释了如何使用事件句柄从外部世界向线程发出信号,告诉它何时应终止。这就是从 CSyncRendevouz
传递到 CBaseThread
的事件句柄的目的!您的线程应该执行 WaitForMultipleObjects
,等待它知道的句柄(作为其正在执行的任务的一部分)以及从调用 CBaseThread::StopEvent()
返回的句柄。当 CBaseThread::StopEvent()
的句柄被信号时,就该进行清理并退出。您线程中的示例文本可能如下所示:
unsigned __stdcall MyThreadProc(LPVOID data)
{
{
DEREF(data);
// Create some handle we're going to wait on as part
// of normal processing
HANDLE hFile = CreateFile(PARAMETERS);
HANDLE hWaitArray[2];
bool bExit = false;
hWaitArray[0] = hFile;
hWaitArray[1] = pThread->StopHandle();
while (bExit == false)
{
// Wait on two handles. The first handle is our file handle,
// the second handle is the stop handle which is part of
// the CBaseThread object controlling us.
switch (WaitForMultipleObjects(2, hWaitArray, FALSE, dwTimeout))
{
case WAIT_TIMEOUT:
// Do something relevant
break
case WAIT_OBJECT_0:
// Something happened on the file so do something relevant;
break;
case WAIT_OBJECT_1:
// The stop event was signalled, it's time to exit.
CloseHandle(hFile);
bExit = true;
break;
}
}
}
_endthreadex(0);
// Not reached
return 0;
}
另一方面,您的线程完全可能永远不等待某个外部事件,而是执行一个循环。在这种情况下,它应该通过调用 CBaseThread::Stop()
来定期检查是否应终止。如果调用 CBaseThread::Stop()
返回 true
,则该是时候进行清理并退出了!示例文本可能如下所示:unsigned __stdcall MyThreadProc(LPVOID data)
{
{
DEREF(data);
while (pThread->Stop() == false)
{
// Do some work..
}
}
_endthreadex(0);
// Not reached
return 0;
}
当然,所有这些都只在您编写的线程实际使用停止机制时才有效。它也只在程序中的其他代码有机会调用 CSyncRendevouz::Stop()
方法时才有效。在主线程中创建 CSyncRendevouz
对象,然后在该线程中调用 Wait()
方法,并期望能够调用 Stop()
方法,这是一个错误。请记住,一旦线程调用 Wait()
,该线程就会停止运行,直到由 CSyncRendevouz
对象控制的所有线程都终止为止!演示项目没有这个问题(演示项目使用最多睡眠 5 秒的线程),但实际使用此类时会出现问题。因此,大多数此类用法将通过 CAsyncRendevouz
进行,其中在对象内创建了一个额外的线程来执行等待。主线程继续执行,并可以随时调用 Stop()
方法。然后,主线程将创建一组线程(通过 CSyncRendevouz::AddThread()
方法)来执行工作,并等待所有线程终止的消息,然后再调度下一块工作。CUserThread
这是我为rendevouz
库的第 2 版添加的一个类。它实际上与线程同步无关;我添加这个类是因为我在其他项目(和其他文章)中发现自己使用了 CBaseThread
,并且当没有相关线程时,该类的原始设计有些尴尬。尴尬源于需要在构造函数中传递停止句柄和指向 bool
变量的指针。如果您控制多个线程,共享停止句柄等是有意义的,但如果您只使用 CBaseThread
来控制一个线程,那么在每个使用者类中声明这些变量就有点麻烦了。另一方面,我发现该类极大地简化了线程的控制,以至于我不想停止使用它。 因此,在使用了大约十五个新的、使用 CBaseThread
的项目后,我发现自己添加了停止句柄和 bool
,并认为肯定有更简单的方法。该类非常简单,看起来像这样class CUserThread : public CBaseThread
{
DECLARE_DYNAMIC(CUserThread);
public:
CUserThread(unsigned(__stdcall *thread)(void *),
bool bWait = false, LPVOID data = NULL);
virtual ~CUserThread();
void TerminateThread();
private:
volatile bool m_bStopVar;
};
构造函数在参数列表中省略了停止句柄和 bool
变量,但通过以下方式确保它们被正确设置:CUserThread::CUserThread(unsigned(__stdcall *thread)(void *),
bool bWait, LPVOID data)
: CBaseThread(NULL, &m_bStopVar, thread, bWait, data)
{
m_bStopVar = false;
m_hStopEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
}
析构函数确保类自行清理,并且 TerminateThread()
方法同时设置事件句柄和 bool
变量,以确保由对象控制的线程被信号停止。历史
- 2004年7月16日 - 初始版本。
- 2004年7月17日 - 更改了
CAsyncRendevouz
类中的等待线程,以将等待结果(超时或所有线程终止)作为窗口消息的WPARAM
成员传递。 - 2004年10月16日 - 添加了
CUserThread
类。