异步函数调用:轻松处理Win32线程






4.20/5 (15投票s)
一篇关于以更直观的方式利用 Win32 线程的新方法的文章。
引言
当我们创建工作线程时,我们的目的是在单独的线程(即工作线程)中执行任务,而不会阻塞主线程。事实上,公平地说,我们想做的事情只是异步调用一个函数。
不幸的是,当我们需要为这个简单的任务创建一个新线程时,由于相关 API 的限制以及我们必须关注的其他问题,存在一些问题。其中一些问题是简单且微不足道的,而另一些则很微妙,并且通常很难完全确定问题的根源,这就是多线程的本质。
但是,如果多年来我们一直在做的事情都符合相同的脚本,那么样板代码片段就可以一遍又一遍地重复。我们已经学会了如何使用 ::CreateThread()
、_beginthread[ex]()
和 AfxBeginThread()
来生成新的工作线程,以及如何通过 void
指针打包和解包一组信息以传递给工作线程。
让我们假设我们有一个耗时的同步目标函数,需要在一个单独的线程中“异步”执行。
class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... } };
为了在单独的线程中执行同步函数,我们需要使用其中一个线程创建 API。
class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... } struct pack_parameter { CMyWindow * pthis; long param1; std::string tag; }; bool CreateThreadAndCallMyLengthyFunction(long param1, std::string tag) { // Parameters packing. std::auto_ptr<pack_parameter> param_ptr(new pack_parameter); param_ptr->pthis = this; param_ptr->param1 = param1; param_ptr->tag = tag; CWinThread * pThread = AfxBeginThread( &CMyWindow::MyThreadProc, param_ptr.get() ); ASSERT( pThread ); if( pThread ) { param_ptr.release(); return true; } return false; } static UINT MyThreadProc(LPVOID parameter) { // Parameters unpacking. std::auto_ptr<pack_parameter> param_ptr( static_cast<pack_parameter *>( parameter ) ); try { int result = param_ptr->pthis->MyLengthyFunction( param_ptr->param1, param_ptr->tag ); } catch(...) { ASSERT( false ); } return 0; } void test() { CreateThreadAndCallMyLengthyFunction( 123L, "job#1" ); } };
上面的示例显示了常见的线程创建场景之一,用于在单独的工作线程中执行耗时任务。由于线程过程接受且只接受一个 void
参数作为其输入参数,因此耗时函数的所有输入参数都必须打包并在堆内存上解包,然后传递给线程过程。
而且,还有许多其他问题需要我们进一步关注,例如返回值处理、异常处理、线程同步等等。虽然经验丰富的程序员可以以正确有效的方式实现和处理所有这些微妙的问题,但新手很有可能在不知不觉中忽略其中一些问题,从而创建出难以调试的错误。
我们已经看到市面上有几个线程库,它们只是简单地封装了那些与线程相关的 API,但我会说这些库是半成熟和不完整的,前提是创建工作线程就是异步调用函数。在我看来,它们并不比调用原始线程 API 好。
使用 afc 库,可以在单独的线程中轻松异步调用函数,而不会阻塞主线程。忘记关于线程过程应指定为非成员函数的古老说法。请看下面。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... } void test() { // MyLengthyFunction( 123L, "job#1" ); afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ); } };
afc::launch<>()
是一个函数模板帮助器,用于创建 afc
代理对象(即 afc::detail::afc_proxy_t<>
),该对象委托给生成的工作线程和在该线程中执行的目标函数。
Using the Code
A. 线程特性
使用 afc::launch<>()
辅助函数模板时,应将线程特性明确指定为模板参数。afc 库提供了三种线程特性:afc::win32_thread
、afc::crt_thread
和 afc::mfc_thread
,它们分别对应 ::CreateThread()
、_beginthreadex()
和 AfxBeginThread()
。如果在单独的工作线程中执行的目标函数中需要调用任何 CRT 函数,则必须使用 crt_thread
或 mfc_thread
。在相同上下文中,如果需要调用任何 MFC 函数,则只允许使用 mfc_thread
;否则,将跳过需要在每个线程基础上切换的必要数据结构的初始化,这意味着您的目标函数可能无法按预期工作。
这些线程特性本身是一个类模板,可以指定三个线程参数作为非类型模板参数来定制线程。
namespace afc { template<LPSECURITY_ATTRIBUTES ThreadAttributes = NULL , int Priority = THREAD_PRIORITY_NORMAL, size_t StackSize = 0> struct win32_thread; template<LPVOID security = NULL , int Priority = THREAD_PRIORITY_NORMAL, size_t StackSize = 0> struct crt_thread; template<LPSECURITY_ATTRIBUTES ThreadAttributes = NULL , int Priority = THREAD_PRIORITY_NORMAL, size_t StackSize = 0> struct mfc_thread; }
不可能自动确定需要哪种类型的线程特性,因此每次调用 afc::launch<>()
时都应手动指定正确的线程特性。
#include "afc.hpp" int my_function_use_win32_only(long param1, std::string tag); int my_function_may_use_crt(long param1, std::string tag); int my_function_may_use_mfc(long param1, std::string tag); class CMyWindow : public CWnd { void test() { afc::launch<win32_thread<> >( &my_function_use_win32_only, 123L, "job#1" ); afc::launch<crt_thread<> > ( &my_function_use_win32_only, 234L, "job#2" ); afc::launch<mfc_thread<> > ( &my_function_use_win32_only, 345L, "job#3" ); // afc::launch<win32_thread<> >( &my_function_may_use_crt, 456L, "job#4" ); afc::launch<crt_thread<> > ( &my_function_may_use_crt, 567L, "job#5" ); afc::launch<mfc_thread<> > ( &my_function_may_use_crt, 678L, "job#6" ); // afc::launch<win32_thread<> >( &my_function_may_use_mfc, 789L, "job#7" ); // afc::launch<crt_thread<> > ( &my_function_may_use_mfc, 890L, "job#8" ); afc::launch<mfc_thread<> > ( &my_function_may_use_mfc, 012L, "job#9" ); } };
B. 线程完成例程
afc::launch<>()
不会阻塞调用线程并立即返回;但是,这并不意味着已生成的工作线程已完成。如果您想检索在单独线程中执行的目标函数调用的返回值,或者收到目标函数完成事件的通知,请使用 afc::on_completion<R>()
函数模板,其中 R
是目标函数调用的返回类型,但通常通过自动模板参数推导省略。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... return 333; } void OnMyLengthyFunctionComplete(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 333 == ret ); ASSERT( 777 == completion_key ); ... } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); } };
与 Boost.Function
或 Boost.Bind
类似,此库做了特殊规定,紧随成员函数指针之后指定的第一个输入参数被视为进行成员函数调用的指针或提供 get_pointer()
重载的智能指针对象。在内部,afc 使用 Boost.Bind
来打包目标函数调用的所有输入参数。
void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); afc::launch<afc::mfc_thread<> >( boost::bind( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ), afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); }
当 MyLengthyFunction()
函数调用在单独的工作线程中执行完成时,将调用 OnMyLengthyFunctionComplete()
,并将指定为 MyLengthyFunction()
函数调用返回值的第一个参数作为其输入参数。
完成例程的函数签名已预定义,应通过 afc::on_completeion<>()
函数模板提供与预定义签名匹配的完成例程。
- 非 void 返回目标函数的完成例程的预定义函数调用签名
void (R, UINT, ULONG_PTR)
void (UINT, ULONG_PTR)
#include "afc.hpp" void OnMyLengthyFunctionComplete1(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 999 == ret ); ASSERT( 111 == completion_key ); ... } void OnMyLengthyFunctionComplete2(UINT error_code, ULONG_PTR completion_key) { ASSERT( 222 == completion_key ); ... } class CMyWindow : public CWnd { int MyLengthyFunction1(long param1, std::string tag) { ... return 999; } void MyLengthyFunction2(long param1, std::string tag) { ... } void OnMyLengthyFunctionComplete3(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 999 == ret ); ASSERT( 333 == completion_key ); ... } void OnMyLengthyFunctionComplete4(UINT error_code, ULONG_PTR completion_key) { ASSERT( 444 == completion_key ); ... } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction1, this, 123L, "job#1", afc::on_completion( &OnMyLengthyFunctionComplete1, 111 ) ); afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction2, this, 123L, "job#1", afc::on_completion( &OnMyLengthyFunctionComplete2, 222 ) ); afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction1, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete3, this, 333 ) ); afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction2, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete4, this, 444 ) ); } };
completion_key
可用于将任何类型的用户定义信息传递给完成例程(从 afc::launch<>()
),如果不需要则可以忽略。
C. 异常处理程序
尝试异步调用函数时的困难之一是如何处理在目标函数调用过程中可能抛出的异常。如果目标函数保证不会抛出异常,那么它可能不再是我们的关注点,但在许多情况下我们必须处理异常。
默认情况下,afc 会捕获目标函数抛出的所有异常,并且不允许它们未处理地传播。当目标函数抛出未处理的异常时,完成例程将立即被调用,其 UINT
类型的 error_code
设置为 AFC_ERROR_UNHANDLED_EXCEPTION
。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { throw "unhandled exception"; ... return 333; } void OnMyLengthyFunctionComplete(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 0 == ret ); ASSERT( AFC_ERROR_UNHANDLED_EXCEPTION == error_code ); ASSERT( 777 == completion_key ); ... } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); } };
但是,这种默认行为可以通过提供自定义异常处理程序轻松定制和扩展。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { throw "unhandled exception"; ... return 333; } struct MyExceptionHandler { template<typename TFxn> int operator ()(TFxn fxn, UINT & error_code) const { try { return fxn(); // (pMyWnd->*&CMyWindow::MyLengthyFunction)( 123L, "job#1" ); } catch(char const * e) { error_code = 444; TRACE( _T("%s\n"), e ); // Traces "unhandled exception". return 333; } } }; void OnMyLengthyFunctionComplete(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 333 == ret ); ASSERT( 444 == error_code ); ASSERT( 777 == completion_key ); ... } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1", // Target function MyExceptionHandler(), // Exception handler afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); } };
异常处理程序也应根据预定义的函数签名是可调用的。它应该是一个模板函数,第一个模板函数参数是一个零元函数,代表目标函数。调用零元函数被转换为调用带有解包参数的目标函数。异常处理程序的返回值和 error_code
将作为输入参数传递给完成例程。
- 异常处理程序的预定义函数调用签名
R (TFxn, UINT &)
*R
应隐式转换为目标函数的返回类型。
D. 线程间通信 #1 - 来自调用线程
如前所述,afc::launch<>()
创建一个临时 afc
代理对象,该对象委托给生成的工作线程。通过访问代理对象的成员函数,可以控制工作线程。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... return 333; } void test() { afc::proxy p = afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ); TRACE( _T("Worker Thread ID: %x, Worker Thread Handle: %x\n") , p.get_thread_id(), p.get_thread_handle() ); DWORD res = 0; res = p.wait_with_message_loop( 3000 ); // Waits the target function call to complete for 3000 // milliseconds with the second message loop running. switch( res ) { case WAIT_OBJECT_0: // Target function call is completed. break; case WAIT_TIMEOUT: // Timeout. p.abort(); // 'Signals' the worker thread to abort. break; } while( p.is_running() ) { } } };
afc::proxy
类的所有可用成员函数如下所示。顺便说一下,afc::proxy
是 可复制构造的(CopyConstructible) 和 可赋值的(Assignable),因此它可以存储到 STL 容器中。
namespace afc { // Synopsis class proxy { public: HANDLE get_thread_handle() const; DWORD get_thread_id() const; BOOL abort() const; bool is_running() const; BOOL set_thread_priority(int priority) const; // ::SetThreadPriority() int get_thread_priority() const; // ::GetThreadPriority() DWORD suspend() const; // ::SuspendThread() DWORD resume() const; // ::ResumeThread() BOOL terminate(DWORD exit_code = AFC_EXIT_CODE_TERMINATION) const; // ::TerminateThread() DWORD wait(DWORD timeout) const; // ::WaitForSingleObject() DWORD wait_with_message_loop(DWORD timeout) const; // AtlWaitWithMessageLoop() }; }
调用 abort()
会导致线程特定的中止事件同步对象被置位,并且在特定工作线程中运行的目标函数可能会检查该事件对象以决定是否中止执行。这将在下面进行说明。
E. 线程间通信 #2 - 来自工作线程
利用线程特定局部存储(TLS),使工作线程能够访问并与调用线程通信。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { for(int i = 0; i < INT_MAX; ++i) { if( afc::thread_specific::check_abort() ) { // Abort event has been signaled in the caller thread. (2) return 999; } // Some lengthy operations. ... } return 333; } void OnMyLengthyFunctionComplete(int ret, UINT error_code, ULONG_PTR completion_key) { ASSERT( 999 == ret ); ASSERT( 777 == completion_key ); ... } void test() { afc::proxy p = afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1", afc::on_completion( &CMyWindow::OnMyLengthyFunctionComplete, this, 777 ) ); p.abort(); // Signals the worker thread to abort. (1) } };
提供两个线程特定的单例访问器
namespace afc { // Synopsis class thread_specific { public: static bool check_abort(); static HANDLE get_caller_thread_handle(); }; }
为了使内部用于实现单例模式的范围静态初始化线程安全,afc::launch<>()
的设计宗旨是确保工作线程过程已开始,并且所有必要的线程特定局部存储初始化在它返回之前完成。
如果从主线程(即非 afc 线程)调用上述单例访问器,则请求将被简单地忽略,并分别返回 false
和 NULL
。
F. 线程收集器
最后一个但并非最不重要的问题是如何确保所有生成的 afc 线程在主程序退出之前安全终止。afc::thread_collector
旨在管理和帮助清理通过 afc::launch<>()
启动的 afc 线程。线程收集器类似于垃圾收集器,但它不是收集垃圾内存,而是收集特定线程在生成时分配的垃圾资源。
#include "afc.hpp" #include "afc_thread_collector.hpp" class CMyApp { void InitInstance() { ... afc::thread_collector::init(); } }; class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { ... } void test() { afc::proxy p = afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ); afc::thread_collector::contract( p ); } };
任何与 afc::thread_collector
签约的 afc
代理都保证在程序退出时被收集。当程序退出时,afc::thread_collector
将对内部维护列表中的每个 afc 线程发出中止事件信号,然后等待预定义的超时时间(默认为 AFC_THREAD_COLLECTOR_WAIT_TIMEOUT
= 5000 毫秒)。当完整的超时时间已过,但仍有一些 afc 线程存活并运行时,afc::thread_collector
将强制终止这些剩余线程。
由于 afc::thread_collector
使用范围静态初始化器来实现单例模式,因此在多线程环境中可能不是线程安全的。为了使其线程安全,应该在创建可能使用 afc::thread_collector
服务的第二个线程之前,在主线程中调用 afc::thread_collector::init()
。
namespace afc { // Synopsis class afc_thread_collector { public: static unusable init(); static void contract(afc::proxy const & p); static void recede(afc::proxy const & p); }; }
注释
1. 请记住,目标函数和完成例程(如果指定)是在工作线程的上下文中执行的。
由于异步调用函数变得如此容易,我们可能会忘记这些函数是在与主线程不同的线程上下文中执行的事实。当将 afc 与需要为其自身目的初始化一些线程特定局部存储的框架一起使用时,您应该特别注意。
#include "afc.hpp" class CMyWindow : public CWnd { int MyLengthyFunction(long param1, std::string tag) { CWnd * pWnd1 = CWnd::FromHandle( this->m_hWnd ); ASSERT( pWnd1 != this ); Attach( Detach() ); // Synchonizes the thread specific handle map. CWnd * pWnd2 = CWnd::FromHandle( this->m_hWnd ); ASSERT( pWnd2 == this ); return 0; } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ); } };
2. 使用同步对象来线程安全地访问成员变量。
出于同样的原因,我们可能会忘记,如果成员变量在目标函数或完成例程中被访问,则需要一个锁对象来同步对它们的访问。
#include "afc.hpp" class CMyWindow : public CWnd { std::map<long, std::string> myMap_; mutex lock_; int MyLengthyFunction(long param1, std::string tag) { lock_.acquire(); myMap_[param1] = tag; lock_.release(); return 0; } void test() { afc::launch<afc::mfc_thread<> >( &CMyWindow::MyLengthyFunction, this, 123L, "job#1" ); lock_.acquire(); myMap_[123L] = "job#1"; lock_.release(); } };
3. 不要传递局部变量的指针或引用,并使用“按值传递”语义。
当一个人试图使用 afc 将同步函数调用转换为等效的异步函数调用时,他或她可能会很容易地犯下面的错误。
int MyOriginalFunction(std::string const & name) { ... } void test(std::string const & name) { MyOriginalFunction( name ); }
我们可以使用 afc 异步调用 MyOriginalFunction()
,如下所示。
#include "afc.hpp" int MyOriginalFunction(std::string const & name) { ... } void test(std::string const & name) { afc::launch<crt_thread<> >( &MyOriginalFunction, name ); }
你能看出问题吗?乍一看并不容易识别,但如果你仔细查看示例,你可能会注意到它无意中传递了对局部变量的引用。
void test(std::string const & name) { afc::launch<crt_thread<> >( &MyOriginalFunction, name ); return 0; } void test_all() { std::string myName = "Jae"; test( myName ); }
如果 test_all()
在对 MyOriginalFunction()
的异步函数调用完成之前返回,则很可能会访问在 test_all()
的局部函数作用域中栈内存上分配 myName
的无效内存空间。
更改函数签名以使用“按值传递”语义将解决这种情况。
#include "afc.hpp" int MyOriginalFunction(std::string name) // 'pass by value' { ... } void test(std::string const & name) { afc::launch<crt_thread<> >( &MyOriginalFunction, name ); } void test_all() { std::string myName = "Jae"; test( myName ); }