65.9K
CodeProject 正在变化。 阅读更多。
Home

C 中的异步多播回调

starIconstarIconstarIconstarIconstarIcon

5.00/5 (12投票s)

2019年1月6日

CPOL

14分钟阅读

viewsIcon

23810

downloadIcon

575

使用这个可移植的 C 语言回调库简化线程之间的数据传递。

引言

回调是一种强大的概念,用于减少两个代码片段之间的耦合。在多线程系统中,回调存在局限性。我一直想要一种能够跨线程的回调机制,并处理所有低级细节,以便安全地将我的函数调用和事件数据从一个线程传递到另一个线程。一个可移植且易于使用的框架。不再需要在线程循环中编写复杂的switch语句,根据枚举对OS消息队列的void*值进行类型转换。创建回调。注册回调。该框架将自动在用户指定的目標线程上调用回调,并传递数据参数,这就是目标。

在使用事件循环(又称消息循环)的系统上,消息队列和switch语句有时用于处理传入的消息。循环等待消息,并将带有数据的函数调用分派到程序中。此解决方案旨在简化和标准化事件循环,以便泛化函数分派和线程之间的数据移动。

此处提出的回调解决方案提供以下功能

  • 异步回调 – 支持与任何线程之间进行异步回调
  • 线程目标 – 指定异步回调的目标线程
  • 回调 – 调用任何具有匹配签名的 C 或 C++ 自由/静态函数
  • 类型安全 – 用户定义的、类型安全的回调函数数据参数
  • 多播回调 – 在数组中存储多个回调以进行顺序调用
  • 线程安全 – 适用于多线程系统
  • 简洁 – 代码库小巧,易于维护,占用最小的代码空间
  • 可移植 – 可移植到嵌入式或 PC 平台
  • 任何编译器 – 支持任何 C 语言编译器
  • 任何操作系统 - 易于移植到任何操作系统
  • 优雅的语法 – 直观且易于使用

异步回调范例通过将回调函数指针和回调数据放置在您指定的控制线程上,极大地简化了多线程应用程序的开发。为单个模块或整个子系统暴露回调接口非常容易。该框架的使用难度不亚于标准的 C 回调,但功能更强大。

本文提出了一种利用异步多播回调的跨线程通信机制。附件中的源代码实现了上述所有功能,我将进行演示。

使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。

查看 GitHub 以获取最新源代码

回调背景

函数回调的理念非常有用。在回调术语中,发布者定义回调签名并允许匿名注册回调函数指针。订阅者创建一个符合发布者回调签名的函数实现,并在运行时向发布者注册一个回调函数指针。发布者代码对订阅者代码一无所知——注册和回调调用是匿名的。

现在,在多线程系统中,您需要理解同步与异步回调调用。如果回调是同步的,回调将在调用者的控制线程上执行。如果您在回调中设置断点,堆栈帧将显示发布者函数调用和所有同步调用的发布者回调。在这种情况下没有多线程问题,因为所有内容都在单个线程上运行。

如果发布者代码有自己的线程,它可以调用其控制线程上的回调函数,而不是订阅者的线程。发布者调用的回调可能在任何时候完全独立于订阅者的控制线程发生。这种跨线程可能会给订阅者带来问题,如果回调代码不是线程安全的,因为现在有另一个线程在未知的时间间隔内调用订阅者代码库。

使回调函数线程安全的一种方法是在发布者的回调期间向订阅者的 OS 队列发布一条消息。订阅者的线程稍后将消息出队并调用适当的函数。由于回调实现仅发布消息,因此即使是异步的回调也是线程安全的。在这种情况下,消息队列的异步性代替了软件锁提供了线程安全性。

Using the Code

我将首先介绍如何使用代码,然后深入探讨实现细节。

发布者使用 CB_DECLARE 宏在一个头文件中公开回调接口给潜在订阅者。第一个参数是回调名称。第二个参数是回调函数参数类型。在下面的示例中,int* 是回调函数参数。

CB_DECLARE(TestCb, int*)

发布者在源文件中使用 CB_DEFINE 宏来完成回调定义。第一个参数是回调名称。第二个参数是回调函数参数类型。第三个参数是回调函数参数指向的数据的大小。最后一个参数是可以为回调通知注册的最大订阅者数量。

CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)

要订阅回调,请创建一个函数(static 类成员或全局函数),如下所示。我将稍后解释为什么函数签名参数需要 (int*, void*) 函数签名。

void TestCallback1(int* val, void* userData)
{
    printf(“TestCallback1 %d”, *val);
}

订阅者使用 CB_Register() 函数宏注册以接收回调。第一个参数是回调名称。第二个参数是指向回调函数的指针。第三个参数是指向线程分派函数的指针,如果希望同步回调,则为 NULL。最后一个参数是指向可选用户数据的指针,该数据在回调调用期间传递。框架内部不处理用户数据,仅将其传回回调函数。用户数据的值可以是调用者想要的任何值,也可以是 NULL

CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);

在 C/C++ 混合项目中,userData 回调参数可用于存储 this 类实例指针。为回调函数传递类 static 成员函数指针,并为用户数据传递 this 指针给 CB_Register()。在订阅者回调函数中,将 userData 类型转换为类实例指针。这提供了一种方便的方式来在 static 回调函数中访问类实例函数和数据。

当发布者需要调用所有注册订阅者的回调时,请使用 CB_Invoke()。该函数将回调和数据参数分派到目标控制线程。在下面的示例中,TestCallback1()DispatchCallbackThread1 上被调用。

int data = 123;
CB_Invoke(TestCb, &data);

使用 CB_Unregister() 来取消注册回调。

CB_Unregister(TestCb, TestCallback1, DispatchCallbackThread1);

异步回调可用于轻松地向传入和传出的 API 接口添加异步性。以下示例展示了如何实现。

SysData 发布者示例

SysData 是一个简单的模块,演示了如何公开一个传出的异步接口。该模块存储系统数据,并在模式更改时提供异步订阅者通知。模块接口如下所示。

typedef enum
{
    STARTING,
    NORMAL,
    SERVICE,
    SYS_INOP
} SystemModeType;

typedef struct
{
    SystemModeType PreviousSystemMode;
    SystemModeType CurrentSystemMode;
} SystemModeData;

// Declare a SysData callback interface
CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

void SD_Init(void);
void SD_Term(void);
void SD_SetSystemMode(SystemModeType systemMode);

发布者回调接口是 SystemModeChangedCb。调用 SD_SetSystemMode() 将新模式保存到 _systemMode 并通知所有注册的订阅者。

void SD_SetSystemMode(SystemModeType systemMode)
{
    LK_LOCK(_hLock);

    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    _systemMode = systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedCb, &callbackData);

    LK_UNLOCK(_hLock);
}

SysData 订阅者示例

订阅者创建一个符合发布者回调函数签名回调函数。

void SysDataCallback(const SystemModeData* data, void* userData)
{
    cout << "SysDataCallback: " << data->CurrentSystemMode << endl;
}

在运行时,使用 CB_Register()DispatchCallbackThread1 上注册 SysData 回调。

CB_Register(SystemModeChangedCb, SysDataCallback, DispatchCallbackThread1, NULL);

当调用 SD_SetSystemMode() 时,任何关心模式更改的客户端都会在其期望的执行线程上异步收到通知。

SD_SetSystemMode(STARTING);
SD_SetSystemMode(NORMAL);

SysDataNoLock 发布者示例

SysDataNoLock 是一个备用实现,它使用私有回调来异步且无锁地设置系统模式。

// Declare a public SysData callback interface
CB_DECLARE(SystemModeChangedNoLockCb, const SystemModeData*)

void SDNL_Init(void);
void SDNL_Term(void);
void SDNL_SetSystemMode(SystemModeType systemMode);

初始化函数使用私有 SetSystemModeCb 回调进行注册。

// Define a private callback interface
CB_DECLARE(SetSystemModeCb, SystemModeType*)
CB_DEFINE(SetSystemModeCb, SystemModeType*, sizeof(SystemModeType), 1)

void SDNL_Init(void)
{
    // Register with private callback
    CB_Register(SetSystemModeCb, SDNL_SetSystemModePrivate, DispatchCallbackThread1, NULL);
}

下面的 SSNL_SetSystemMode() 函数是一个异步传入接口的示例。对调用者而言,它看起来像一个普通函数,但在后台,一个私有函数调用被异步调用。在这种情况下,调用 SetSystemModeCb 会导致 SDNL_SetSystemModePrivate()DispatchCallbackThread1 上被调用。

void SDNL_SetSystemMode(SystemModeType systemMode)
{
    // Invoke the private callback. SDNL_SetSystemModePrivate() will be called
    // on DispatchCallbackThread1.
    CB_Invoke(SetSystemModeCb, &systemMode);
}

由于此私有函数始终在 DispatchCallbackThread1 上异步调用,因此不需要锁。

static void SDNL_SetSystemModePrivate(SystemModeType* systemMode, void* userData)
{
    // Create the callback data
    SystemModeData callbackData;
    callbackData.PreviousSystemMode = _systemMode;
    callbackData.CurrentSystemMode = *systemMode;

    // Update the system mode
    _systemMode = *systemMode;

    // Callback all registered subscribers
    CB_Invoke(SystemModeChangedNoLockCb, &callbackData);
}

回调签名限制

此设计对所有回调函数施加了以下限制

  1. 每个回调处理一个用户定义的参数类型。
  2. 参数可以是 const 或非 const 指针(例如,const MyData*MyData*)。
  3. 两个回调函数参数始终是:MyData*void*
  4. 每个回调都具有 void 返回类型。

例如,如果一个回调声明为

CB_DECLARE(TestCb, const MyData*)

则回调函数签名为:

void MyCallback(const MyData* data, void* userData);

如果需要,该设计可以扩展以支持多个参数。然而,该设计在某种程度上模仿了嵌入式程序员一直以来所做的事情,例如

  1. 动态创建结构或类的实例并填充数据
  2. 通过 OS 消息以 void* 的形式传递数据指针
  3. 从 OS 消息队列获取数据并将 void* 类型转换回原始类型
  4. 删除动态创建的数据

在此设计中,整个基础设施会自动发生,而无需程序员付出额外的努力。如果需要多个数据参数,则必须将它们打包到单个类/结构中并用作回调数据参数。

实现

回调框架的代码行数出奇地少。去掉注释,可能还有几百行代码,这些代码(希望)易于理解和维护。

实现使用宏和令牌粘贴为每个回调提供一个唯一的类型安全接口。令牌粘贴运算符(##)在预处理器展开宏时用于合并两个令牌。CB_DECLARE 宏如下所示

#define CB_DECLARE(cbName, cbArg) \
    typedef void(*cbName##CallbackFuncType)(cbArg cbData, void* cbUserData); \
    BOOL cbName##_Register(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData); \
    BOOL cbName##_IsRegistered(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Unregister(cbName##CallbackFuncType cbFunc, 
         CB_DispatchCallbackFuncType cbDispatchFunc); \
    BOOL cbName##_Invoke(cbArg cbData); \
    BOOL cbName##_InvokeArray(cbArg cbData, size_t num, size_t size);

在上面使用的 SysData 示例中,编译器预处理器将 CB_DECLARE 展开为

typedef void(*SystemModeChangedCbCallbackFuncType)
            (const SystemModeData* cbData, void* cbUserData); 

BOOL SystemModeChangedCb_Register(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc, void* cbUserData);
 
BOOL SystemModeChangedCb_IsRegistered(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Unregister(SystemModeChangedCbCallbackFuncType cbFunc, 
        CB_DispatchCallbackFuncType cbDispatchFunc); 

BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData); 

BOOL SystemModeChangedCb_InvokeArray(const SystemModeData* cbData, size_t num, size_t size);

请注意,每个 cbName## 位置都被宏名称参数替换,在这种情况下,它是来自下面声明的 SystemModeChangedCb

CB_DECLARE(SystemModeChangedCb, const SystemModeData*)

类似地,CB_DEFINE 宏展开以创建回调函数实现。注意该宏提供了一个轻量级、类型安全的包装器,围绕私有函数,如 _CB_AddCallback()_CB_Dispatch()。如果尝试注册错误的函数签名,编译器将生成错误或警告。宏会自动处理您通常手动编写的单调、样板化的代码。

注册的回调存储在 CB_Info 实例的 static 数组中。调用 CB_Invoke(SystemModeChangedCb, &callbackData) 执行 SystemModeChangedCb_Invoke()。然后 _CB_Dispatch() 遍历 CB_Info 数组并将一个 CB_CallbackMsg 消息分派到每个目标线程。消息数据被动态创建以通过 OS 消息队列传输。

// Macro generated unique invoke function
BOOL SystemModeChangedCb_Invoke(const SystemModeData* cbData) 
{ 
    return _CB_Dispatch(&SystemModeChangedCbMulticast[0], 2, cbData, sizeof(SystemModeData)); 
}

BOOL _CB_Dispatch(CB_Info* cbInfo, size_t cbInfoLen, const void* cbData, 
    size_t cbDataSize)
{
    BOOL invoked = FALSE;

    LK_LOCK(_hLock);

    // For each CB_Info instance within the array
    for (size_t idx = 0; idx<cbInfoLen; idx++)
    {
        // Is a client registered?
        if (cbInfo[idx].cbFunc)
        {
            // Dispatch callback onto the OS task
            if (CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize))
            {
                invoked = TRUE;
            }
        }
    }

    LK_UNLOCK(_hLock);
    return invoked;
}

目标 OS 任务事件循环会出队一个 CB_CallbackMsg* 并调用 CB_TargetInvoke()。创建的动态数据在函数退出前被释放。

void CB_TargetInvoke(const CB_CallbackMsg* cbMsg)
{
    ASSERT_TRUE(cbMsg);
    ASSERT_TRUE(cbMsg->cbFunc);

    // Invoke callback function with the callback data
    cbMsg->cbFunc(cbMsg->cbData, cbMsg->cbUserData);

    // Free data sent through OS queue
    XFREE((void*)cbMsg->cbData);
    XFREE((void*)cbMsg);
} 

异步回调会施加一些限制,因为回调目标线程所需的一切都必须在堆上创建,打包到 CB_CallbackMsg 结构中,并放入 OS 消息队列。

插入 OS 队列是平台特定的。CB_DispatchCallbackFuncType 函数指针 typedef 为目标平台上的每个线程事件循环提供了要实现的 OS 队列接口。请参阅下面的移植部分以获得更完整的讨论。

typedef BOOL (*CB_DispatchCallbackFuncType)(const CB_CallbackMsg* cbMsg);

一旦消息被放入消息队列,平台特定的代码会解包消息,调用 CB_TargetInvoke() 函数并销毁动态分配的数据。在此示例中,一个简单的 WorkerThreadStd 类提供了利用 C++ 线程支持库的线程事件循环。虽然此示例使用了 C++ 线程,但回调模块是用纯 C 编写的。通过将 OS 细节从回调实现中抽象出来,可以实现这一点。

void WorkerThread::Process()
{
    while (1)
    {
        ThreadMsg* msg = 0;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->GetId())
        {
            case MSG_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg->GetData() != NULL);

                // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
                const CB_CallbackMsg* callbackMsg = 
                        static_cast<const CB_CallbackMsg*>(msg->GetData());

                // Invoke the callback on the target thread
                CB_TargetInvoke(callbackMsg);

                // Delete dynamic data passed through message queue
                delete msg;
                break;
            }
        }
    }
}

请注意,此线程循环与大多数具有大型 switch 语句处理各种传入数据消息、类型转换 void* 数据然后调用特定函数的系统不同。该框架支持所有回调,只需一个 WM_DISPATCH_DELEGATE 消息。设置完成后,同一个小型线程循环会处理每一个回调。随着系统的设计,新的发布者和订阅者会来来去去,但中间的代码不会改变。

这是一个巨大的好处,因为在许多系统中,在线程之间传递数据需要大量的手动步骤。您必须不断地处理每个线程循环,在发送时创建数据,在接收时销毁数据,并调用各种 OS 服务和类型转换。在这里,您无需执行任何这些操作。中间的所有内容都已为用户妥善处理。

调用顺序

下面的两个列表显示了在 DispatchCallbackThread1 上异步调用回调函数 TestCallback1() 所需的调用顺序。在此示例中,TestCb 是使用 CB_DECLARE/CB_DEFINE 创建的,并且 TestCallback1() 回调函数已使用 CB_Register() 注册到 TestCb

CB_DECLARE(TestCb, int*)
CB_DEFINE(TestCb, int*, sizeof(int), MAX_REGISTER)

CB_Register(TestCb, TestCallback1, DispatchCallbackThread1, NULL);

主线程

  1. CB_Invoke(TestCb, &data) – 宏函数从主线程启动所有注册的回调
  2. TestCb_Invoke(&data) – 由 CB_DECLARE 创建的类型安全宏包装函数
  3. _CB_Dispatch(&TestCbMulticast[0], MAX_REGISTER, &data, sizeof(int))private 函数循环遍历所有注册的订阅者
  4. CB_DispatchCallback(&cbInfo[idx], cbData, cbDataSize) – 将 cbData 回调数据分派到注册的订阅者
  5. DispatchCallbackThread1(cbMsg) – 动态分配的 cbMsg 被放入线程 1 的消息队列

分派回调线程 1

  1. WorkerThread::Process() – 工作线程从消息队列获取 cbMsg
  2. CB_TargetInvoke(cbMsg) – 回调在目标控制线程上被调用
  3. TestCallback1(&data, NULL) – 目标回调函数在线程 1 上使用回调 data 被调用

Heap

动态数据是必需的,以便通过消息队列发送数据结构。请记住,在回调期间,您的回调参数指向的数据会被按位复制。

在某些系统上,使用堆是不希望的。对于这些情况,我使用固定块内存分配器。x_allocator 实现解决了动态存储问题,并且比全局堆更快。要使用它,只需在 callback.c 中定义 USE_CALLBACK_ALLOCATOR。有关 x_allocator 的更多信息,请参阅参考文献部分。

移植

代码易于移植到任何平台。只需要两个 OS 服务:线程和软件锁。代码分为四个目录。

  1. Callback - 核心库实现文件
  2. Port – 特定于 Windows 的文件(线程/锁)
  3. Examples – 显示用法的示例代码
  4. VS2017 – Visual Studio 2017 项目文件

移植到另一个平台需要为每个线程实现一个接受 const CB_CallbackMsg* 的分派函数。下面的函数显示了一个示例。

// C language interface to a callback dispatch function
extern "C" BOOL DispatchCallbackThread1(const CB_CallbackMsg* cbMsg)
{
    workerThread1.DispatchCallback(cbMsg);
    return TRUE;
}

void WorkerThread::DispatchCallback(const CB_CallbackMsg* msg)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    ThreadMsg* threadMsg = new ThreadMsg(MSG_DISPATCH_DELEGATE, msg);

    // Add dispatch delegate msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

线程事件循环获取消息并调用 CB_TargetInvoke() 函数。通过队列发送的数据在完成后会被删除。

case MSG_DISPATCH_DELEGATE:
{
    ASSERT_TRUE(msg->GetData() != NULL);

    // Convert the ThreadMsg void* data back to a CB_CallbackMsg* 
    const CB_CallbackMsg* callbackMsg = static_cast<const CB_CallbackMsg*>(msg->GetData());

    // Invoke the callback on the target thread
    CB_TargetInvoke(callbackMsg);

    // Delete dynamic data passed through message queue
    delete msg;
    break;
}

软件锁由 LockGuard 模块处理。此文件可以使用您选择的锁进行更新,或者您可以使用不同的机制。锁只在少数几个地方使用。在 callback.c 中定义 USE_LOCKS 以使用 LockGuard 模块锁。

哪种回调实现?

我在 CodeProject 上记录了三种不同的异步多播回调实现。每个版本都有其独特的特性和优势。下面的部分重点介绍了每个解决方案之间的主要区别。有关每个文章的链接,请参阅下面的参考文献部分。

C 中的异步多播回调

  • 用 C 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用 memcpy 复制回调参数数据
  • 通过宏提供类型安全
  • 静态数组存储已注册的订阅者回调
  • 已注册订阅者数量在编译时固定
  • C 语言中的固定块内存分配器
  • 紧凑的实现

带线程间消息传递的异步多播回调

  • 用 C++ 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 模板使用量少
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 紧凑的实现

C++ 中的异步多播委托

  • 用 C++ 实现
  • C++ 委托范式
  • 任何回调函数类型(成员、静态、自由)
  • 支持多个回调参数(最多 5 个)
  • 回调参数任意类型(值、引用、指针、指针的指针)
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 大量使用模板
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 较大的实现

参考文献

结论

设计发布者/订阅者回调系统有很多方法。此 C 语言版本集成了独特的功能,标准化了事件循环,并简化了在客户端指定的控制线程上生成异步回调。实现被保持在最低限度,以便于移植到任何嵌入式或其他系统。

此回调实现适用于 C 和 C++ 项目。但是,如果您正在开发严格的 C++ 项目,您可以考虑使用参考文献部分中列出的 C++ 回调实现之一。

我在项目中成功地使用了这种技术。每个模块或子系统都可以使用 CB_DECLARECB_DEFINE 宏公开一个或多个传出接口。系统内的任何代码都可以连接并接收异步回调,而无需担心跨线程或使其正常工作的机制。这样的功能简化了应用程序设计,并通过一种易于理解的回调范例在架构上标准化了跨线程通信。

历史

  • 2019 年 1 月 6 日
    • 首次发布
  • 2019 年 1 月 29 日
© . All rights reserved.