使用 boost 和 tbb 的 C++ 和 MFC 中的异步调用






4.40/5 (5投票s)
异步调用 MFC 中现有类的函数,同时保留 MFC 和 COM 所需的线程亲和性。
引言
许多用 MFC 编写的应用程序在尝试扩展它们以利用现代平台上的多核时,可以极大地受益于使用 Boost 和 TBB 等可用库。
当我们支持、迁移、改进 MFC 应用程序时,我们就会想,如果能异步调用现有类的某些函数,从而减轻 UI 线程的负担,简化单线程 UI 应用中耗时算法的实现,或者仅仅是为了做更多的事情,那该多好。
最后提一下,我们故意省略了大量的,或者有些人会说全部的理论(甚至更糟是哲学)方面、细节、命名法或设计模式范式,以及编码的严谨性,而是专注于提供非常易于理解、易于阅读、实用的东西。
背景
一旦我们尝试实现异步执行,就会出现各种各样的障碍,就像雨后春笋一样。MFC 对象具有线程亲和性,它们不是线程安全的,其中一些在内部使用 COM 对象,并坚持必须从单线程单元(通常是主 UI 线程)调用。我们需要在调用执行之间处理消息,以满足 UI、COM 或 Winsock(异步)的需求。我们需要封送调用,打包参数和返回值。我们必须担心线程安全以及线程之间的最小锁定,捕获异常并将其正确地传回给调用者。
如果我们要异步运行的类会进行回调,比如可观察对象或具有连接点的 COM/ActiveX 对象,那么挑战将上升到另一个层面。
这令人不知所措,不是吗?我们多么希望能够将这段代码移植到 .Net 并滥用内置的异步模式?
本文的目的是说明一种可能的方法,将现有的 MFC 类(及其所有约束)从同步执行类转换为异步执行类,而不改变该类的实现。
要实现这一目标,一个非常重要的事情是类设计得当,接口定义清晰,回调接口(如果适用)。
拥有接口允许我们实现替代类,这些类将实际调用转发给原始类,而实际执行发生在单独的后台线程中。
而且,正如现实生活中可能出现的那样,让我们假设我们想要异步执行的类具有一些我们想要同步执行的函数,一些返回同步结果的函数,以及一些我们正在异步执行并完成时触发回调的函数。
使用代码
我们必须首先向读者道歉,因为示例应用程序可能不是一个非常有启发性的应用程序。我们不得不在做一些可以说明 Boost 和 TBB 在实现异步调用函数机制或方法方面的作用,与什么都不做之间做出选择。所以我们做了一些我们希望您觉得有用的事情。
代码是用 Visual Studio 2008 SP1 和 TR1 编写的,静态链接了 Boost 1.49 和 TBB 4.0(2012 年 6 月 13 日)。存档应该包含编译发布版和调试版代码所需的一切。
我们有一个名为 CCalculator
的类,它实现了名为 ICalculator
的接口,并在需要时使用构造函数中接收的 ICalculatorEvents
接口触发事件。
class ICalculator
{
public:
virtual bool Start(void) = 0;
virtual int Add(int a, int b) = 0;
virtual void Factorial(unsigned int a) = 0;
virtual void Stop(void) = 0;
};
class ICalculatorEvents
{
public:
virtual void Started(void) = 0;
virtual void Stopped(void) = 0;
virtual void Result(int) = 0;
virtual void Error(int) =0;
};
class CCalculator :
public ICalculator
{
public:
CCalculator(ICalculatorEvents&);
~CCalculator(void);
bool Start(void);
int Add(int a, int b);
void Factorial(unsigned int a);
void Stop(void);
protected:
bool m_running;
ICalculatorEvents& m_callback;
};
CCalculator::CCalculator(ICalculatorEvents& callback):
m_running(false),
m_callback(callback)
{
}
CCalculator::~CCalculator(void)
{
}
bool CCalculator::Start(void)
{
m_running = true;
m_callback.Started();
return true;
}
int CCalculator::Add(int a, int b)
{
return a+b;
}
void CCalculator::Factorial(unsigned int a)
{
int factorial = (a == 0)?0:1;
for (unsigned int i = 1; i < a; i++)
factorial *= i;
m_callback.Result(factorial);
}
void CCalculator::Stop(void)
{
m_running = false;
m_callback.Stopped();
}
我们在 CMainDlg
类中使用 CCalculator
类,在该类中我们实例化它或由 CAsyncCalculator
提供的替代实现。拥有一个接口消除了修改用户代码的需要。
BOOL CMainDlg::OnInitDialog() {
...
//m_calculator = boost::shared_ptr<ICalculator>(new CCalculator(*this));
m_calculator = boost::shared_ptr<ICalculator>(new CAsyncCalculator(*this));
return TRUE; // return TRUE unless you set the focus to a control
}
ICalculator
接口的替代实现,即执行函数和回调的异步实现,称为 CAsyncCalculator
,定义如下:
typedef tbb::concurrent_queue<boost::function<void()>> CallQueue;
class CEventReceiver:public CWnd
{
public:
CEventReceiver(CallQueue& queue);
~CEventReceiver();
void QueueCall(boost::function<void()> &call);
protected:
LRESULT OnExecuteCall(WPARAM, LPARAM);
DECLARE_MESSAGE_MAP()
private:
CallQueue& m_call_queue;
};
class CAsyncCalculator :
public CWinThread,
// Is an ICalculator
public ICalculator,
// Implemented in terms of ...
private ICalculatorEvents
{
public:
CAsyncCalculator(ICalculatorEvents&);
~CAsyncCalculator(void);
// ICalculator
bool Start(void);
int Add(int a, int b);
void Factorial(unsigned int a);
void Stop(void);
protected:
afx_msg void OnExecuteCall(WPARAM, LPARAM);
DECLARE_MESSAGE_MAP()
void QueueCall(boost::function<void()> &call);
private:
// CWinThread overrides
BOOL InitInstance();
int ExitInstance();
// ICalculatorEvents
void Started(void);
void Stopped(void);
void Result(int);
void Error(int);
private:
ICalculatorEvents& m_external_callback;
boost::shared_ptr <ICalculator> m_calculator_impl;
CallQueue m_call_queue;
CallQueue m_callback_queue;
boost::barrier m_worker_thread_started;
CEventReceiver m_event_receiver;
};
关注点
在遗忘之前,让我说一下,当你在 MFC 中使用 多重继承 时,最好将 MFC 类放在第一个,如果你不想遇到任何意外的话:
warning C4407: cast between different pointer to member representations, compiler may generate incorrect code
编译这行时:
ON_THREAD_MESSAGE(UWM_EXECUTE_CALL, OnExecuteCall)
CAsyncCalculator
是一个派生自 ICalculator
接口的类。我们还从 CWinThread
派生它,并在构造函数中创建一个后台线程。该线程将执行我们的调用。我们有一个并发队列用于在后台线程上执行的调用,另一个队列用于回调。回调需要由调用 ICalculator
接口函数的同一个线程执行,也就是创建 CAsyncCalculator
实例的同一个线程。这就是 CEventReceiver
窗口类发挥作用的地方,它是一个消息处理窗口,在 CAsyncCalculator
的构造函数中创建,它接收回调并执行它们。
CAsyncCalculator::CAsyncCalculator(ICalculatorEvents& callback):
m_external_callback(callback),
m_worker_thread_started(2),
m_event_receiver(m_callback_queue)
{
BOOL thread_created = CreateThread();
m_worker_thread_started.wait();
}
我们承认这个 CAsyncCalculator
类有点太拥挤了,它确实有从两个不同线程调用的函数,但请将其视为一个教学示例,而不是您可能想要提交给严格的 OOP 代码审查的东西。
重要的是,要由后台线程创建包含的 CCalculator
对象实例,所有对 CCalculator
实例的调用都将在此执行,并且所有回调都将从此触发。
BOOL CAsyncCalculator::InitInstance()
{
m_calculator_impl = boost::shared_ptr<ICalculator>(new CCalculator(*this));
m_worker_thread_started.wait();
return TRUE;
}
CAsyncCalculator
类中 ICalculator
接口的实现揭示了调用被打包、封送并排队在不同线程上执行的机制。
bool CAsyncCalculator::Start(void)
{
boost::packaged_task<bool> task(boost::bind(&ICalculator::Start,m_calculator_impl.get()));
boost::unique_future<bool> f = task.get_future();
QueueCall(MoveTaskIntoFunction<bool>(task));
// Timed wait
if (!f.timed_wait(CALL_TIMEOUT))
{
throw std::runtime_error("Start call timeout.");
}
return f.get();
}
上面您可以看到我们想要同步执行调用,但有一个超时作为我们允许等待执行完成的最大时长。
MoveTaskIntoFunction
是一件大事,我们将其归功于 **D Drmmr**。您可以在 这里 阅读更多关于该主题的内容。您需要它,因为您想在同一个并发队列中存储具有不同返回类型的任务/函数。
QueueCall
和 OnExecuteCall
看起来像这样:
void CAsyncCalculator::QueueCall(boost::function<void()> &call) { m_call_queue.push(call); PostThreadMessage(UWM_EXECUTE_CALL, 0, 0); } void CAsyncCalculator::OnExecuteCall(WPARAM, LPARAM) { boost::function<void()> call; while(m_call_queue.try_pop(call)) { call(); } }
回调通过类似的劫持过程进行,其中初始回调接口引用被本地存储为成员变量 m_external_callback
,并被派生自 ICalculatorEvents
的私有实现所替换。
一旦回调从 CCalculator
到达 CAsyncCalculator
,它就会被打包并排队等待 CEventReceiver
对象执行。
void CAsyncCalculator::Started(void)
{
boost::packaged_task<void> task(boost::bind(&ICalculatorEvents::Started, &m_external_callback));
m_event_receiver.QueueCall(MoveTaskIntoFunction<void>(task));
}
这在 OnExecuteCall
消息处理方法中发生,与我们在 CAsyncCalculator
中所做的类似。
LRESULT CEventReceiver::OnExecuteCall (WPARAM, LPARAM)
{
boost::function<void()> call;
while(m_call_queue.try_pop(call))
{
call();
}
return NULL;
}
void CEventReceiver::QueueCall(boost::function<void()> &call)
{
m_call_queue.push(call);
::PostMessage(m_hWnd, UWM_EXECUTE_CALL, 0,0);
}
这可能看起来需要输入很多代码,但我相信肯定有缩短它的方法。