带线程间消息传递的异步多播回调






4.93/5 (44投票s)
使用这个多线程、可移植的 C++ 异步回调框架,简化线程间数据的传递。
引言
回调是一种强大的概念,用于减少两段代码之间的耦合。在多线程系统中,回调存在局限性。我一直想要的是一种能够跨线程的回调机制,并且能够安全地处理将事件数据从一个线程传递到另一个线程的所有底层细节。我需要一个小型、可移植且易于使用的框架。不再需要在线程循环中使用类型转换 OS 消息队列 void* 值并基于枚举的庞大 switch 语句。目标是创建一个回调,注册一个回调,然后框架会自动使用用户指定的线程上的数据参数调用回调。
本文提出的回调解决方案提供以下功能:
- 异步回调 – 支持与任何线程之间进行异步回调。
- 线程目标 – 指定异步回调的目标线程。
- 回调 – 调用具有匹配签名的任何 C 或 C++ 自由函数。
- 类型安全 – 用户定义的、类型安全的回调函数数据参数。
- 成员函数 – 调用实例成员函数。
- 多播回调 – 在列表中存储多个回调以顺序调用。
- 线程安全 – 适用于多线程系统。
- 紧凑 – 代码库小巧,易于维护,占用代码空间最少。
- 可移植 – 可移植到嵌入式或 PC 平台。
- 任何编译器 – 无需高级 C++ 语言特性。
- 任何操作系统 - 易于移植到任何操作系统。
- 优雅的语法 – 直观且易于使用。
回调范例通过将回调和回调数据放置在您指定的控制线程上来极大地简化了多线程应用程序的开发。为单个类、模块或整个子系统公开异步回调接口非常容易。框架的使用难度不亚于标准的 C 回调,但功能更强大。
本文提出了一种利用异步多播回调的跨线程通信机制。附件源代码实现了上述所有功能,我将进行演示。
提供了三种异步多播回调实现:两种 C++ 和一种 C。有关另外两篇相关文章,请参阅“参考文献”部分。
使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。
查看 GitHub 以获取最新源代码
- 跨线程消息传递的异步多播回调 - 作者:David Lafreniere
回调背景
函数回调的概念非常有用。在回调术语中,发布者定义回调签名并允许匿名注册函数指针。订阅者创建一个符合发布者回调签名的函数实现,并在运行时向发布者注册回调函数指针。发布者代码对订阅者代码一无所知——注册和回调调用是匿名的。
现在,在多线程系统中,您需要了解同步与异步回调调用。如果回调是同步的,则回调将在调用者的控制线程上执行。如果您在回调内部设置断点,堆栈帧将显示发布者函数调用以及所有同步调用的发布者回调。此场景没有多线程问题,因为所有内容都在单个线程上运行。
如果发布者代码有自己的线程,它可能会在其控制线程上调用回调函数,而不是在订阅者的线程上。发布者调用的回调可能在任何时候发生,完全独立于订阅者的控制线程。如果回调代码不是线程安全的,这种跨线程可能会给订阅者带来问题,因为现在您有另一个线程在未知的时间间隔调用订阅者代码库。
使回调函数线程安全的一种解决方案是在发布者的回调期间将消息发布到订阅者的 OS 队列。订阅者的线程稍后将消息出队并调用适当的函数。由于回调实现仅发布消息,因此回调(即使是异步的)也是线程安全的。在这种情况下,消息队列的异步性提供了线程安全,而不是软件锁。
回调通常是自由函数,无论是类 static
成员还是全局函数。在 C++ 中,实例成员函数的处理方式不同,并且在成员函数指针方面存在显著限制。我不会深入讨论所有细节,该主题已被广泛讨论,但足以说明您不能让单个指针指向所有函数类型。此框架支持调用自由函数,但提供支持以便将调用回实例成员函数。
Using the Code
我将首先介绍如何使用代码,然后深入介绍实现细节。
发布者使用 AsycCallback<>
类向潜在订阅者公开回调接口。实例用一个模板参数创建——函数回调参数的用户数据类型。在下面的示例中,int
将成为回调函数的参数。
AsyncCallback<int> callback;
要订阅回调,请创建一个自由函数(static
成员或全局函数),如下所示。我稍后将解释为什么 <int>
参数需要 (const int&, void*)
函数签名。
void SimpleCallback(const int& value, void* userData)
{
cout << "SimpleCallback " << value << endl;
}
订阅者使用 Register()
函数注册以接收回调。第一个参数是指向回调函数的指针。第二个参数是指向要调用回调的线程的指针。
callback.Register(&SimpleCallback, &workerThread1);
当发布者需要为所有已注册的订阅者调用回调时,请使用 operator()
或 Invoke()
。这两个函数都不会同步执行回调;而是将每个回调分派到目标控制线程。
callback(123);
callback.Invoke(123);
使用 Unregister()
取消注册回调。
callback.Unregister(&SimpleCallback, &workerThread1);
或者,要取消注册所有回调,请使用 Clear()
。
callback.Clear();
始终使用以下两种方法之一检查是否有人订阅了回调:
if (callback)
callback(123);
if (!callback.Empty())
callback(123);
AsyncCallback<>
可轻松用于为传入和传出的 API 接口添加异步性。以下示例展示了如何实现。
SysData 示例
SysData
是一个简单的类,展示了如何公开一个传出的异步接口。该类存储系统数据,并在模式更改时提供异步订阅者通知。类接口如下所示:
class SysData
{
public:
/// Clients register with AsyncCallback to get callbacks when system mode changes
AsyncCallback<SystemModeChanged> SystemModeChangedCallback;
/// Get singleton instance of this class
static SysData& GetInstance();
/// Sets the system mode and notify registered clients via SystemModeChangedCallback.
/// @param[in] systemMode - the new system mode.
void SetSystemMode(SystemMode::Type systemMode);
private:
SysData();
~SysData();
/// The current system mode data
SystemMode::Type m_systemMode;
/// Lock to make the class thread-safe
LOCK m_lock;
};
用于接收回调的订阅者接口是 SystemModeChangedCallback
。调用 SetSystemMode()
将新模式保存到 m_systemMode
并通知所有已注册的订阅者。
void SysData::SetSystemMode(SystemMode::Type systemMode)
{
LockGuard lockGuard(&m_lock);
// Create the callback data
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = m_systemMode;
callbackData.CurrentSystemMode = systemMode;
// Update the system mode
m_systemMode = systemMode;
// Callback all registered subscribers
if (SystemModeChangedCallback)
SystemModeChangedCallback(callbackData);
}
SysDataClient 示例
SysDataClient
是一个回调订阅者,并在构造函数中注册通知。请注意,Register()
的第三个参数是指向 this
的指针。该指针作为 userData
在每次回调时传回。框架内部不对 userData
做任何处理,除了将其传回回调调用。userData
值可以是调用者想要的任何内容。
// Constructor
SysDataClient() :
m_numberOfCallbacks(0)
{
// Register for async callbacks
SysData::GetInstance().SystemModeChangedCallback.Register(&SysDataClient::CallbackFunction,
&workerThread1, this);
}
现在,当系统模式更改时,将调用 SysDataClient::CallbackFunction()
。请注意,userData
参数被类型转换回 SysDataClient
实例。由于 Register()
提供了 this
指针,回调函数能够在执行期间访问任何对象实例或函数。
static void CallbackFunction(const SystemModeChanged& data, void* userData)
{
// The user data pointer originates from the 3rd argument in the Register() function
// Typecast the void* to SysDataClient* to access object instance data/functions.
SysDataClient* instance = static_cast<SysDataClient*>(userData);
instance->m_numberOfCallbacks++;
cout << "CallbackFunction " << data.CurrentSystemMode << endl;
}
当调用 SetSystemMode()
时,任何关心模式更改的人都会在其期望的执行线程上异步收到通知。
// Set new SystemMode values. Each call will invoke callbacks to all
// registered client subscribers.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);
SysDataNoLock 示例
SysDataNoLocks
是一个替代实现,它使用一个 private
AsyncCallback<>
来异步无锁地设置系统模式。
class SysDataNoLock
{
public:
/// Clients register with AsyncCallback to get callbacks when system mode changes
AsyncCallback<SystemModeChanged> SystemModeChangedCallback;
/// Get singleton instance of this class
static SysDataNoLock& GetInstance();
/// Sets the system mode and notify registered clients via SystemModeChangedCallback.
/// @param[in] systemMode - the new system mode.
void SetSystemMode(SystemMode::Type systemMode);
private:
SysDataNoLock();
~SysDataNoLock();
/// Private callback to get the SetSystemMode call onto a common thread
AsyncCallback<SystemMode::Type> SetSystemModeCallback;
/// Sets the system mode and notify registered clients via SystemModeChangedCallback.
/// @param[in] systemMode - the new system mode.
/// @param[in] userData - a 'this' pointer to SysDataNoLock.
static void SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData);
/// The current system mode data
SystemMode::Type m_systemMode;
};
构造函数使用 SetSystemModePrivate()
和 private
SetSystemModeCallback
进行注册。
SysDataNoLock::SysDataNoLock() :
m_systemMode(SystemMode::STARTING)
{
SetSystemModeCallback.Register(&SysDataNoLock::SetSystemModePrivate, &workerThread2, this);
workerThread2.CreateThread();
}
下面的 SetSystemMode()
函数是一个异步传入接口的示例。对调用者来说,它看起来像一个普通函数,但在幕后,会异步调用一个 private
成员。在这种情况下,调用 SetSystemModeCallback
会导致 SetSystemModePrivate()
在 workerThread2
上被调用。
void SysDataNoLock::SetSystemMode(SystemMode::Type systemMode)
{
// Invoke the private callback. SetSystemModePrivate() will be called on workerThread2.
SetSystemModeCallback(systemMode);
}
由于这个 private
函数始终异步调用在 workerThread2
上,因此不需要锁。
void SysDataNoLock::SetSystemModePrivate(const SystemMode::Type& systemMode, void* userData)
{
SysDataNoLock* instance = static_cast<SysDataNoLock*>(userData);
// Create the callback data
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = instance->m_systemMode;
callbackData.CurrentSystemMode = systemMode;
// Update the system mode
instance->m_systemMode = systemMode;
// Callback all registered subscribers
if (instance->SystemModeChangedCallback)
instance->SystemModeChangedCallback(callbackData);
}
Timer 示例
一旦就绪了回调框架,创建计时器回调服务就非常简单了。许多系统需要一种方法来基于超时生成回调。可能是低速轮询的周期性超时,也可能是为了防止某些事情在预期时间内未发生而设置的错误超时。无论哪种情况,回调都必须发生在指定的控制线程上。在 Timer
类中使用 AsyncCallback<>
可以很好地解决这个问题。
class Timer
{
public:
AsyncCallback<TimerData> Expired;
void Start(UINT32 timeout);
void Stop();
//...
};
用户创建一个计时器实例并注册过期事件。在这种情况下,MyCallback()
将在 1000ms 后被调用。
m_timer.Expired.Register(&MyClass::MyCallback, &myThread, this);
m_timer.Start(1000);
示例中未提供 Timer
实现。但是,文章“C++ 带有线程的状态机”包含一个 Timer
类,该类展示了一个完整的集成 C++ 状态机的多线程 AsyncCallback<>
示例。
回调签名限制
此设计对所有回调函数施加了以下限制:
- 每个回调处理单个用户定义参数类型(
TData
)。 - 两个回调函数参数始终是:
const TData&
和void*
。 - 每个回调的返回类型均为
void
。
例如,如果声明了一个 AsyncCallback<>
:
AsyncCallback<MyData> myCallback;
则回调函数签名为:
void MyCallback(const MyData& data, void* userData);
如果需要,该设计可以扩展以支持多个参数。但是,该设计在一定程度上模仿了嵌入式程序员一直以来所做的事情,例如:
- 动态创建
struct
或class
的实例并填充数据。 - 通过 OS 消息以
void*
的形式传递数据指针。 - 从 OS 消息队列获取数据并将
void*
类型转换回原始类型。 - 删除动态创建的数据。
在此设计中,整个基础架构会自动完成,而无需程序员付出额外的努力。如果需要多个数据参数,则必须将它们打包到单个 class
/struct
中并用作回调数据参数。
实现
回调框架的代码行数出奇地少。去除注释,可能还有几百行代码(希望)易于理解和维护。
AsyncCallback<>
和 AsyncCallbackBase
构成了发布回调接口的基础。这些类是线程安全的。基础版本是非模板化的,以减少代码空间。AsyncCallbackBase
提供了调用列表和线程安全机制。
AsyncCallback::Invoke()
遍历列表并将回调消息分派给每个目标线程。数据被动态创建以通过 OS 消息队列传输。
void Invoke(const TData& data)
{
LockGuard lockGuard(GetLock());
// For each registered callback
InvocationNode* node = GetInvocationHead();
while (node != NULL)
{
// Create a new instance of callback and copy
const Callback* callback = new Callback(*node->CallbackElement);
// Create a new instance of the callback data and copy
const TData* callbackData = new TData(data);
// Create a new message instance
CallbackMsg* msg = new CallbackMsg(this, callback, callbackData);
// Dispatch message onto the callback destination thread. TargetInvoke()
// will be called by the target thread.
callback->GetCallbackThread()->DispatchCallback(msg);
// Get the next registered callback subscriber
node = node->Next;
}
}
AsyncCallback::TargetInvoke()
由目标线程调用以实际执行回调。动态数据在调用回调后被删除。
virtual void TargetInvoke(CallbackMsg** msg) const
{
const Callback* callback = (*msg)->GetCallback();
// Typecast the void* back to a TData type
const TData* callbackData = static_cast<const TData*>((*msg)->GetCallbackData());
// Typecast a generic callback function pointer to the CallbackFunc type
CallbackFunc func = reinterpret_cast<CallbackFunc>(callback->GetCallbackFunction());
// Execute the registered callback function
(*func)(*callbackData, callback->GetUserData());
// Delete dynamically data sent through the message queue
delete callbackData;
delete callback;
delete *msg;
*msg = NULL;
}
异步回调会施加某些限制,因为回调目标线程所需的一切都必须在堆上创建,打包到一个类中,并放入 OS 消息队列。
插入 OS 队列是平台特定的。CallbackThread
类提供了要在每个目标平台实现的接口。有关更完整的讨论,请参阅下面的“移植”部分。
class CallbackThread
{
public:
virtual void DispatchCallback(CallbackMsg* msg) = 0;
};
一旦消息放入消息队列,平台特定的代码将解包消息并调用 AsyncCallbackBase::TargetInvoke()
函数,并在调用完成后销毁动态分配的数据。
unsigned long WorkerThread::Process(void* parameter)
{
MSG msg;
BOOL bRet;
while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
{
switch (msg.message)
{
case WM_DISPATCH_CALLBACK:
{
ASSERT_TRUE(msg.wParam != NULL);
// Get the ThreadMsg from the wParam value
ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);
// Convert the ThreadMsg void* data back to a CallbackMsg*
CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData());
// Invoke the callback callback on the target thread
callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);
// Delete dynamic data passed through message queue
delete threadMsg;
break;
}
case WM_EXIT_THREAD:
return 0;
default:
ASSERT();
}
}
return 0;
}
请注意,线程循环与大多数系统不同,大多数系统都有一个巨大的 switch
语句来处理各种传入的数据消息,对 void*
数据进行类型转换,然后调用特定函数。该框架通过单个 WM_DISPATCH_CALLBACK
消息支持所有回调。设置完成后,相同的精简线程循环会处理每次回调。新的发布者和订阅者会随着系统的设计而出现和消失,但中间的代码不会改变。
这是一个巨大的优势,因为在许多系统中,在线程之间传递数据需要大量的手动步骤。您需要不断地处理每个线程循环,在发送时创建数据,在接收时销毁数据,并调用各种 OS 服务和类型转换。在这里,您无需做任何这些。中间的所有内容都已为用户整齐地处理。
Heap
堆用于创建动态数据。这是由于使用了调用列表以及需要通过消息队列发送数据对象。请记住,您的回调数据在回调过程中会被复制和销毁。大多数情况下,回调数据是 POD(普通旧数据结构)。如果您有无法按位复制的更复杂类型,请务必为回调数据实现拷贝构造函数。
在某些系统上,使用堆是不受欢迎的。对于这些情况,我使用固定块内存分配器。xallocator
实现解决了动态存储问题,并且比全局堆快得多。要使用它,只需包含 xallocator.h 并将宏 XALLOCATOR
添加到类声明中。整个类层次结构可以通过将 XALLOCTOR
放在基类中来使用固定块分配器。
#include "xallocator.h"
class Callback
{
XALLOCATOR
// ...
};
使用 xallocator
后,调用 operator new
或 delete
会让固定块分配器接管存储职责。对象的创建和销毁方式完全相同,只是内存的来源不同。有关 xallocator
的更多信息,以及获取源代码,请参阅文章“用快速固定块内存分配器替换 malloc/free”。唯一需要的文件是 Allocator.h/cpp 和 xallocator.h/cpp。
要在回调框架中使用 xallocator
,请将 XALLOCATOR
宏放在以下类定义中:
回调
CallbackMsg
InvocationNode
对于平台特定的文件,您也需要包含 XALLOCATOR
。在此示例中,它们是:
ThreadMsg
SystemModeChanged
移植
代码易于移植到任何平台。只需要两个 OS 服务:线程和一个软件锁。代码分为五个目录。
AsyncCallback
- 核心框架实现文件PortWin
– Windows 特定文件(线程/锁)Examples
– 示例代码VS2008
– Visual Studio 2008 项目文件VS2015
– Visual Studio 2015 项目文件
该库有一个单一的 abstract
类 CallbackThread
,带有一个纯 virtual
函数:
virtual void DispatchCallback(CallbackMsg* msg) = 0;
在大多数项目中,我会将底层的原始 OS 调用封装到一个线程类中,以封装并强制执行正确的行为。在这里,我提供 ThreadWin
作为 Windows API CreateThread()
的包装器。
一旦有了线程类,只需继承 CallbackThread
接口并实现 DispatchCallback()
函数。在 Windows 上,只需向消息队列发布消息即可。
void ThreadWin::DispatchCallback(CallbackMsg* msg)
{
// Create a new ThreadMsg
ThreadMsg* threadMsg = new ThreadMsg(WM_DISPATCH_CALLBACK, msg);
// Post the message to the this thread's message queue
PostThreadMessage(WM_DISPATCH_CALLBACK, threadMsg);
}
Windows 线程循环获取消息并为传入的实例调用 TargetInvoke()
函数。通过队列发送的数据在完成后会被删除。
switch (msg.message)
{
case WM_DISPATCH_CALLBACK:
{
ASSERT_TRUE(msg.wParam != NULL);
// Get the ThreadMsg from the wParam value
ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);
// Convert the ThreadMsg void* data back to a CallbackMsg*
CallbackMsg* callbackMsg = static_cast<CallbackMsg*>(threadMsg->GetData());
// Invoke the callback callback on the target thread
callbackMsg->GetAsyncCallback()->TargetInvoke(&callbackMsg);
// Delete dynamic data passed through message queue
delete threadMsg;
break;
}
case WM_EXIT_THREAD:
return 0;
default:
ASSERT();
}
软件锁由 LockGuard
类处理。此类可以更新为您选择的锁,或者您可以使用其他机制。锁仅在少数几个地方使用。
代码大小
为了衡量使用此技术的影响,代码使用 Keil 为 ARM CPU 构建。如果部署在项目上,将创建许多 AsyncCallback<>
实例,因此它需要节省空间。
使用完全优化时,每个额外的 AsyncCallback<>
、一个订阅者、一个注册调用和一个回调调用的增量代码大小约为 120 字节。手动移动线程间数据肯定会占用这么多代码。
哪种回调实现?
我在 CodeProject 上记录了三种不同的异步多播回调实现。每个版本都有其独特的特性和优势。下面的部分重点介绍了每个解决方案之间的主要区别。请参阅下面的“参考文献”部分获取指向每篇文章的链接。
C 中的异步多播回调
- 用 C 实现
- 回调函数仅限自由函数或静态成员
- 支持一个回调参数
- 回调参数必须是指针类型
- 使用
memcpy
复制回调参数数据 - 通过宏提供类型安全
- 静态数组存储已注册的订阅者回调
- 已注册订阅者数量在编译时固定
- C 语言中的固定块内存分配器
- 紧凑的实现
带线程间消息传递的异步多播回调
- 用 C++ 实现
- 回调函数仅限自由函数或静态成员
- 支持一个回调参数
- 回调参数必须是指针类型
- 使用复制构造函数复制回调参数数据
- 通过模板提供类型安全
- 模板使用量少
- 已注册订阅者回调的动态列表
- 已注册订阅者数量在运行时扩展
- C++ 中的固定块内存分配器
- 紧凑的实现
C++ 中的异步多播委托
- 用 C++ 实现
- C++ 委托范式
- 任何回调函数类型(成员、静态、自由)
- 支持多个回调参数(最多 5 个)
- 回调参数任意类型(值、引用、指针、指针的指针)
- 使用复制构造函数复制回调参数数据
- 通过模板提供类型安全
- 大量使用模板
- 已注册订阅者回调的动态列表
- 已注册订阅者数量在运行时扩展
- C++ 中的固定块内存分配器
- 较大的实现
参考文献
- C 语言异步多播回调 - 作者:David Lafreniere
- C++ 异步多播委托 - 作者:David Lafreniere
- 用快速固定块内存分配器替换 malloc/free - 作者:David Lafreniere
- 带同步启动的 Win32 线程包装器 - 作者:David Lafreniere
- C++ 带有线程的状态机 - 作者:David Lafreniere
结论
设计发布者/订阅者回调系统有很多方法。此版本包含我以前从未见过的独特功能,特别是异步回调生成到客户端指定控制线程的简易性。实现被最小化,以便于移植到任何嵌入式或其他系统。
我曾在项目中使用过这项技术并取得了巨大成功。每个类或子系统都可以使用 AsyncCallback<>
实例公开一个或多个传出接口。系统内的任何代码都可以连接并接收异步回调,而无需担心跨线程或实现所有功能的机制。这样的功能简化了应用程序设计,并通过易于理解的回调范例在架构上标准化了跨线程通信。
历史
- 2016年4月15日
- 首次发布
- 2016年4月17日
- 更新了文章以更正错误
- 2016年4月18日
- 更新了源代码以包含 VS2008 和 VS2015 项目文件,以加快评估测试
- 文章小幅修正
- 2016年4月22日
- 新增参考文献部分
- 对源代码进行了小的简化更新
- 更新文章文本
- 2016年4月29日
- 移除了
ThreadWin
的堆使用 - 更新了附带的源代码
- 移除了
- 2016年11月19日
- 源代码有小的 bug 修复
- 更新了计时器和参考文献部分
- 2016年12月2日
- 修复
Callback::operator==()
中的 bug - 更新了附带的源代码
- 修复
- 2019年1月25日
- 添加了“哪个回调实现?”部分
- 更新了参考文献部分