65.9K
CodeProject 正在变化。 阅读更多。
Home

现代 C++ 中的异步多播委托

starIconstarIconstarIconstarIconstarIcon

5.00/5 (25投票s)

2020 年 8 月 19 日

CPOL

27分钟阅读

viewsIcon

27527

downloadIcon

578

一个 C++ 委托库,能够以同步或异步方式匿名调用任何可调用函数

前言

本文档介绍了一个现代 C++ 实现的异步委托。该库实现了匿名的同步和异步函数回调。目标函数将使用所有参数在注册者期望的控制线程上调用。

我之前写的文章“C++ 中的异步多播委托”是基于 C++03 构建的。这个“现代”版本使用了 C++17 的特性。可变参数模板和模板元编程提高了库的可用性,并显著减少了源代码行数。虽然两篇文章的基本思想相似,但新版本是完全重写的。

我已经创建了四种“异步回调”的实现方式;三种 C++ 版本和一种 C 版本。有关其他实现的链接,请参阅文章末尾的“参考文献”部分。

使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。

查看 GitHub 以获取最新源代码

相关的 GitHub 存储库

引言

似乎没有什么比委托更能引起 C++ 程序员的兴趣了。在其他语言中,委托是一等公民特性,因此开发者可以轻松使用这些已知的结构。然而,在 C++ 中,委托并非原生支持。但这并不能阻止我们程序员模仿委托存储和调用任何可调用函数的那种便利性。

委托通常支持同步执行,也就是说,在调用时,绑定的函数会在调用者的控制线程中执行。在多线程应用程序中,最好能够指定目标函数及其应执行的线程,而无需对函数签名施加限制。该库会做繁重的工作,将委托和所有参数数据传输到目标线程。本文的目的是提供一个具有一致 API 的 C++ 委托库,该库能够对任何可调用函数进行同步和异步调用。

现代 C++ 委托库的特性包括:

  1. 任意编译器 - 标准 C++17 代码,适用于任何编译器,无需奇怪的技巧
  2. 任意函数 - 调用任何可调用函数:成员函数、静态函数或自由函数
  3. 任意参数类型 - 支持任何参数类型:值、引用、指针、指针的指针
  4. 多个参数 - 支持绑定函数的可变数量参数
  5. 同步调用 - 同步调用绑定的函数
  6. 异步调用 - 在客户端指定的线程上异步调用绑定的函数
  7. 阻塞异步调用 - 使用阻塞或非阻塞委托异步调用
  8. 智能指针支持 - 使用裸对象指针或 std::shared_ptr 绑定实例函数
  9. Lambda 支持 - 使用委托异步绑定和调用 lambda 函数。
  10. 自动堆内存处理 - 自动将参数数据复制到堆内存,以便安全地通过消息队列传输
  11. 任意操作系统 - 易于移植到任何操作系统。包含 C++11 std::thread 端口
  12. 32/64 位 - 支持 32 位和 64 位项目。
  13. CMake 构建 - CMake 支持包括 Windows 和 Linux 在内的大多数工具链。
  14. 单元测试 - 包含委托库的广泛单元测试
  15. 无外部库 - 委托不依赖于外部库
  16. 易用性 - 函数签名模板参数(例如,MulticastDelegate<void(TestStruct*)>

委托的实现通过在您指定的控制线程上执行带有所有函数参数的委托函数,极大地简化了多线程应用程序的开发。该框架处理了所有底层机制,以安全地在目标线程上调用任何函数签名。CMake 构建方便在 Windows、Linux 和其他平台上进行实验。

委托背景

如果您不熟悉委托,它的概念非常简单。委托可以被看作是一个超级函数指针。在 C++ 中,没有指针类型能够指向所有可能的函数变体:实例成员、虚函数、const 函数、静态函数和自由(全局)函数。函数指针无法指向实例成员函数,而指向成员函数的指针则有各种限制。然而,委托类可以以类型安全的方式指向任何函数,只要函数签名匹配。简而言之,委托指向任何具有匹配签名的函数,以支持匿名函数调用。

在实践中,虽然委托很有用,但多播版本极大地扩展了它的实用性。能够绑定多个函数指针并按顺序调用所有注册者的能力,使其成为有效的发布/订阅机制。发布者代码公开委托容器,一个或多个匿名订阅者注册到发布者以接收回调通知。

在多线程系统中,回调(无论是基于委托还是基于函数指针)的问题在于它们是同步发生的。必须小心,不要将来自另一个控制线程的回调调用到非线程安全的代码上。多线程应用程序开发很困难。对于原始设计者来说很难;对于不同技能水平的工程师维护代码来说也很难;因为 bug 的表现方式很难捉摸。理想情况下,一种架构解决方案有助于最大限度地减少错误并简化应用程序开发。

这个 C++ 委托实现功能齐全,允许以同步或异步方式调用任何函数(包括实例成员函数),并带上任意参数。委托库使得绑定和调用任何函数都变得轻而易举。

快速入门

一个简单的发布/订阅异步委托示例。

出版社

通常,委托被插入到委托容器中。AlarmCd 是一个委托容器。

Figure 1
图 1:AlarmCb 委托容器

1. MulticastDelegateSafe - 委托容器类型。
2. void(int, const string&) - 委托容器接受的函数签名。可以插入任何匹配的函数,例如类成员、静态函数或 lambda 函数。
3. AlarmCb - 委托容器名称。

调用委托容器以通知订阅者。

MulticastDelegateSafe<void(int, const string&)> AlarmCb; 
 
void NotifyAlarmSubscribers(int alarmId, const string& note) 
{ 
    // Invoke delegate to generate callback(s) to subscribers 
    AlarmCb(alarmId, note); 
}

订阅者

通常,订阅者会注册到委托容器实例以接收回调,无论是同步的还是异步的。

Figure 2
图 2:插入到 AlarmCb 委托容器

1. AlarmCb - 发布者委托容器实例。
2. += - 将函数目标添加到容器。
3. MakeDelegate - 创建委托实例。
4. &alarmSub - 订阅者对象指针。
5. &AlarmSub::MemberAlarmCb - 订阅者回调成员函数。
6. workerThread1 - 将调用回调的线程。添加线程参数会将回调类型从同步更改为异步。

创建符合委托签名的函数。将可调用函数插入委托容器。

class AlarmSub 
{ 
    void AlarmSub() 
    { 
        // Register to receive callbacks on workerThread1
        AlarmCb += MakeDelegate(this, &AlarmSub::HandleAlarmCb, workerThread1); 
    } 
    
    void ~AlarmSub() 
    { 
        // Unregister from callbacks
        AlarmCb -= MakeDelegate(this, &AlarmSub::HandleAlarmCb, workerThread1); 
    } 
    
    void HandleAlarmCb(int alarmId, const string& note) 
    { 
        // Handle callback here. Called on workerThread1 context. 
    } 
}

这是一个简单的示例。存在许多其他使用模式,包括异步 API、带有超时的阻塞委托等。

项目构建

CMake 用于创建构建文件。CMake 是免费且开源的软件。支持 Windows、Linux 和其他工具链。项目根目录内的示例 CMake 控制台命令

Windows Visual Studio

cmake -G "Visual Studio 17 2022" -A Win32 -B ../AsyncMulticastDelegateModernBuild -S .

cmake -G "Visual Studio 17 2022" -A x64 -B ../AsyncMulticastDelegateModernBuild -S .

cmake -G "Visual Studio 17 2022" -A x64 -B ../AsyncMulticastDelegateModernBuild -S . -DENABLE_UNIT_TESTS=ON

执行后,在 AsyncMulticastDelegateModernBuild 目录中打开 Visual Studio 项目。

Figure 3
图 3:Visual Studio 构建

Linux Make

cmake -G "Unix Makefiles" -B ../AsyncMulticastDelegateModernBuild -S .

cmake -G "Unix Makefiles" -B ../AsyncMulticastDelegateModernBuild -S . -DENABLE_UNIT_TESTS=ON

执行后,在 AsyncMulticastDelegateModernBuild 目录中使用 make 命令构建软件。使用 ./DelegateApp 运行控制台应用程序。

Figure 4
图 4:Linux Makefile 构建

Using the Code

我将首先介绍如何使用代码,然后深入探讨实现细节。

委托库由委托和委托容器组成。委托能够绑定到单个可调用函数。多播委托容器包含一个或多个委托,按顺序调用。单播委托容器最多包含一个委托。

主要的委托类如下所示:

  • DelegateFree<>
  • DelegateFreeAsync<>
  • DelegateFreeAsyncWait<>
  • DelegateMember<>
  • DelegateMemberAsync<>
  • DelegateMemberAsyncWait<>
  • DelegateMemberSp<>
  • DelegateMemberSpAsync<>

DelegateFree<> 绑定到自由函数或静态成员函数。DelegateMember<> 绑定到类实例成员函数。DelegateMemberSp<> 使用 std::shared_ptr 而不是裸对象指针来绑定到类实例成员函数。所有版本都提供同步函数调用。

DelegateFreeAsync<>DelegateMemberAsync<>DelegateMemberSpAsync<> 的操作方式与其同步对应物相同;不同之处在于这些版本在指定的控制线程上提供非阻塞异步函数执行。

DelegateFreeAsyncWait<>DelegateMemberAsyncWait<> 提供阻塞异步函数执行,在目标线程上,并带有调用者提供的最大等待超时时间。

三个主要的委托容器类是:

  • SinglecastDelegate<>
  • MulticastDelegate<>
  • MulticastDelegateSafe<>

SinglecastDelegate<> 是一个接受单个委托的委托容器。单播版本的优势在于它体积稍小,并且允许绑定的函数返回非 void 类型。

MulticastDelegate<> 是一个实现的单向链表委托容器,接受多个委托。只有绑定到返回类型为 void 的函数的委托才能添加到多播委托容器中。

MultcastDelegateSafe<> 是一个线程安全的容器,实现为单向链表,接受多个委托。如果多个线程访问容器实例,请始终使用线程安全版本。

每个容器都按值存储委托。这意味着委托被内部复制到堆内存或固定块内存中,具体取决于模式。用户无需在将委托插入容器之前手动创建委托。通常,重载的模板函数 MakeDelegate() 用于创建基于函数参数的委托实例。

同步委托

所有委托都使用重载的 MakeDelegate() 模板函数创建。编译器使用模板参数推导来选择正确的 MakeDelegate() 版本,从而无需手动指定模板参数。例如,这是一个简单的自由函数。

void FreeFuncInt(int value)
{
      cout << "FreeCallback " << value << endl;
}

要将自由函数绑定到委托,请使用 MakeDelegate() 创建 DelegateFree<void(int)> 实例。DelegateFree 模板参数是完整的函数签名:void(int)MakeDelegate() 返回一个 DelegateFree<void(int)> 对象,接下来的行使用委托调用函数 FreeFuncInt

// Create a delegate bound to a free function then invoke
DelegateFree<void(int)> delegateFree = MakeDelegate(&FreeFuncInt);
delegateFree(123);

成员函数以相同的方式绑定到委托,只是这次 MakeDelegate() 使用两个参数:一个类实例和一个成员函数指针。两个 DelegateMember 模板参数是类名(即 TestClass)和绑定的函数签名(即 void(TestStruct*))。

// Create a delegate bound to a member function then invoke    
DelegateMember<TestClass, void(TestStruct*)> delegateMember = 
      MakeDelegate(&testClass, &TestClass::MemberFunc);    
delegateMember(&testStruct);

通常使用委托容器来保存一个或多个委托,而不是创建具体的自由或成员委托。委托容器可以包含任何委托类型。例如,下面是一个绑定到任何具有 void (int) 函数签名的函数的委托容器:

MulticastDelegate<void(int)> delegateA;

单播委托的创建方式相同

SinglecastDelegate<void(int)> delegateB;

返回值的函数签名也是可能的。委托容器接受一个 float 参数并返回一个 int 的函数。

SinglecastDelegate<int(float)> delegateC;

SinglecastDelegate<> 可以绑定到返回值的函数,而多播版本则不能。原因是,当调用多个回调时,应该使用哪个回调函数的返回值?正确的答案是都不使用,因此多播容器仅接受函数签名使用 void 作为返回类型的委托。

MulticastDelegate 容器绑定到一个或多个函数。

MulticastDelegate<void(int, int)> delegateD;

MulticastDelegate<void(float, int, char)> delegateE;

当然,支持的不仅仅是内置的按值传递的参数类型。

MulticastDelegate<void(const MyClass&, MyStruct*, Data**)> delegateF;

使用重载的 MakeDelegate() 函数和 operator+= 来创建委托实例并将其添加到多播委托容器中。绑定自由函数或 static 函数仅需要一个函数指针参数。

delegateA += MakeDelegate(&FreeFuncInt);

实例成员函数也可以添加到任何委托容器中。对于成员函数,MakeDelegate() 的第一个参数是指向类实例的指针。第二个参数是指向成员函数的指针。

delegateA += MakeDelegate(&testClass, &TestClass::MemberFunc);

首先检查已注册的客户端,然后为所有已注册的委托调用回调。如果 MulticastDelegate<void(int)> 中存储了多个委托,则每个委托都会被依次调用。

// Invoke the delegate target functions
if (delegateA)
      delegateA(123);

使用 operator-= 从委托容器中删除委托实例。

delegateA -= MakeDelegate(&FreeFuncInt);

或者,使用 Clear() 删除容器中的所有委托。

delegateA.Clear();

使用 operator= 将委托添加到单播容器中。

SinglecastDelegate<int(int)> delegateF;
delegateF = MakeDelegate(&FreeFuncIntRetInt);

使用 Clear() 或赋值 0 进行删除。

delegateF.Clear();
delegateF = 0;

异步非阻塞委托

到目前为止,所有委托都是同步的。异步功能是建立在同步委托实现之上的。要使用异步委托,需要一个可以被多个线程安全访问的线程安全委托容器。锁会保护类 API 免受同时访问。下面显示了“Safe”版本。

MulticastDelegateSafe<void(TestStruct*)> delegateC;

将线程指针作为 MakeDelegate() 的最后一个参数会强制创建异步委托。在这种情况下,添加线程参数会导致 MakeDelegate() 返回 DelegateMemberAsync<> 而不是 DelegateMember<>

delegateC += MakeDelegate(&testClass, &TestClass::MemberFunc, &workerThread1);

调用方式与同步版本相同,只是这次回调函数 TestClass::MemberFunc() 是从 workerThread1 中调用的。

if (delegateC)
      delegateC(&testStruct);

这是另一个异步委托在 workerThread1 上使用 std::stringint 参数调用的示例。

// Create delegate with std::string and int arguments then asynchronously    
// invoke on a member function
MulticastDelegateSafe<void(const std::string&, int)> delegateH;
delegateH += MakeDelegate(&testClass, &TestClass::MemberFuncStdString, &workerThread1);
delegateH("Hello world", 2020);

库的使用在同步和异步委托之间是一致的。唯一的区别是将线程指针参数添加到 MakeDelegate()。在通过线程边界回调时,请始终记住使用线程安全的 MulticastDelegateSafe<> 容器。

当调用非阻塞异步委托时,委托库的默认行为是将参数复制到堆内存中,以便安全地传输到目标线程。这意味着所有参数都会被复制。如果你的数据不是普通旧数据 (POD),无法进行按位复制,请确保实现适当的复制构造函数来自己处理复制。

有关更多示例,请参阅附件源代码中的 main.cppDelegateUnitTests.cpp

绑定到 std::shared_ptr

绑定到实例成员函数需要一个对象指针。委托库支持使用裸指针和 std::shared_ptr 智能指针进行绑定。用法和你预期的差不多;只需在调用 MakeDelegate() 时使用 std::shared_ptr 替换裸对象指针即可。根据是否将线程参数传递给 MakeDelegate(),将返回 DelegateMemberSp<>DelegateMemberSpAsync<> 实例。

// Create a shared_ptr, create a delegate, then synchronously invoke delegate function
std::shared_ptr<TestClass> spObject(new TestClass());
auto delegateMemberSp = MakeDelegate(spObject, &TestClass::MemberFuncStdString);
delegateMemberSp("Hello world using shared_ptr", 2020);

警告:使用裸对象指针

某些异步委托用法模式可能导致在已删除的对象上调用回调。问题是这样的:一个对象的函数绑定到一个委托并异步调用,但在目标线程上调用之前,目标对象已被删除。换句话说,绑定到委托的对象可能在目标线程的消息队列有机会调用回调之前就被删除了。以下代码暴露了问题:

    // Example of a bug where the testClassHeap is deleted 
    // before the asychronous delegate is invoked on the workerThread1. 
    // In other words, by the time workerThread1 calls
    // the bound delegate function the testClassHeap instance is deleted 
    // and no longer valid.
    TestClass* testClassHeap = new TestClass();
    auto delegateMemberAsync = 
         MakeDelegate(testClassHeap, &TestClass::MemberFuncStdString, &workerThread1);
    delegateMemberAsync("Function async invoked on deleted object. Bug!", 2020);
    delegateMemberAsync.Clear();
    delete testClassHeap;

上面的示例是人为设计的,但它确实清楚地表明没有什么可以阻止一个对象在等待异步调用发生时被删除。在许多嵌入式系统架构中,注册可能发生在单例对象或其生命周期跨越整个执行的对象上。这样,应用程序的使用模式就防止了对已删除对象的调用。然而,如果对象临时出现,暂时订阅委托以进行回调,然后在稍后被删除,则消息队列中可能存在的延迟委托可能会调用已删除对象上的函数。

幸运的是,C++ 智能指针正是解决这些复杂对象生命周期问题的关键。DelegateMemberSpAsync<> 委托使用 std::shared_ptr 而不是裸对象指针进行绑定。现在委托拥有了共享指针,对象过早删除的危险就消除了。共享指针只会在所有引用不再使用后才删除指向的对象。在下面的代码片段中,客户端代码删除了对 testClassSp 的所有引用,但委托放入队列的副本可以防止 TestClass 在异步委托回调发生后才被删除。TestClass 实例将通过 std::shared_ptr<TestClass> 删除,一旦回调完成后智能指针引用计数变为 0,而无需额外的程序员介入。

    // Example of the smart pointer function version of the delegate. 
    // The testClassSp instance is only deleted after workerThread1 
    // invokes the callback function thus solving the bug.
    std::shared_ptr<TestClass> testClassSp(new TestClass());
    auto delegateMemberSpAsync = MakeDelegate
         (testClassSp, &TestClass::MemberFuncStdString, &workerThread1);
    delegateMemberSpAsync("Function async invoked using smart pointer. 
                           Bug solved!", 2020);
    delegateMemberSpAsync.Clear();
    testClassSp.reset();

实际上,这项技术可以用来调用对象函数,然后对象在回调发生后自动删除。使用上面的例子,创建一个共享指针实例,绑定一个委托,然后调用委托。现在 testClassSp 可以超出范围,TestClass::MemberFuncStdString 仍将在 workerThread1 上安全调用。TestClass 实例将通过 std::shared_ptr<TestClass> 删除,一旦回调完成后智能指针引用计数变为 0,而无需额外的程序员介入。

std::shared_ptr<TestClass> testClassSp(new TestClass());
auto delegateMemberSpAsync =
    MakeDelegate(testClassSp, &TestClass::MemberFuncStdString, &workerThread1);
delegateMemberSpAsync("testClassSp deletes after delegate invokes", 2020);

异步阻塞委托

阻塞委托会等待目标线程执行绑定的委托函数。与非阻塞委托不同,阻塞版本不将参数数据复制到堆内存。它们还允许返回非 void 类型,而非阻塞委托只绑定返回 void 的函数。由于函数参数未经修改地传递到目标线程,因此函数将按您期望的同步版本执行,包括传入/传出指针和引用。

通过指针/引用传递的堆栈参数无需线程安全。原因是调用线程会阻塞等待目标线程完成。这意味着委托实现保证只有一个线程能够访问堆栈分配的参数数据。

阻塞委托必须指定超时时间(以毫秒为单位)或 WAIT_INFINITE。与非阻塞异步委托(保证被调用)不同,如果阻塞委托的超时时间过期,则不会调用该函数。使用 IsSuccess() 来确定委托是否成功。

将超时时间作为 MakeDelegate() 的最后一个参数会根据是绑定自由函数还是成员函数,返回 DelegateFreeAsyncWait<>DelegateMemberAsyncWait<> 实例。通常不会将“Wait”委托添加到委托容器中。典型的使用模式是创建委托和堆栈上的函数参数,然后调用。下面的代码片段创建了一个阻塞委托,函数签名为 int (std::string&)。函数在 workerThread1 上调用。函数 MemberFuncStdStringRetInt() 将更新传出的 string msg 并向调用者返回一个整数。

    // Create a asynchronous blocking delegate and invoke. 
    // This thread will block until the msg and year stack values 
    // are set by MemberFuncStdStringRetInt on workerThread1.
    auto delegateI = 
          MakeDelegate(&testClass, &TestClass::MemberFuncStdStringRetInt, 
                       &workerThread1, WAIT_INFINITE);
    std::string msg;
    int year = delegateI(msg);
    if (delegateI.IsSuccess())
    {
        cout << msg.c_str() << " " << year << endl;
    }

异步 Lambda 调用

委托可以异步调用非捕获 lambda 函数。下面的示例在 workerThread1 上调用 LambdaFunc1

    auto LambdaFunc1 = +[](int i) -> int
    {
        cout << "Called LambdaFunc1 " << i << std::endl;
        return ++i;
    };

    // Asynchronously invoke lambda on workerThread1 and wait for the return value
    auto lambdaDelegate1 = MakeDelegate(LambdaFunc1, &workerThread1, WAIT_INFINITE);
    int lambdaRetVal2 = lambdaDelegate1(123);

委托是可调用的,因此可以传递给标准库。下面的示例展示了 CountLambda 通过 std::count_ifworkerThread1 上异步执行。

    std::vector<int> v{ 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    auto CountLambda = +[](int v) -> int
    {
        return v > 2 && v <= 6;
    };
    auto countLambdaDelegate = MakeDelegate(CountLambda, &workerThread1, WAIT_INFINITE);

    const auto valAsyncResult = std::count_if(v.begin(), v.end(),
        countLambdaDelegate);
    cout << "Asynchronous lambda result: " << valAsyncResult << endl;

委托库

委托库包含许多类。一个包含文件 DelegateLib.h 提供了对所有委托库特性的访问。该库包装在 DelegateLib 命名空间中。包含的单元测试有助于确保实现 robust。下表显示了委托类层次结构。

  • DelegateBase
  • Delegate<>
  • DelegateFree<>
  • DelegateFreeAsync<>
  • DelegateFreeAsyncWaitBase<>
  • DelegateFreeAsyncWait<>
  • DelegateMember<>
  • DelegateMemberAsync<>
  • DelegateMemberAsyncWaitBase<>
  • DelegateMemberAsyncWait<>
  • DelegateMemberSp<>
  • DelegateMemberSpAsync<>

DelegateBase 是所有委托实例的非模板、抽象基类。比较运算符和 Clone() 方法定义了接口。

class DelegateBase {
public:
    virtual ~DelegateBase() {}

    /// Derived class must implement operator== to compare objects.
    virtual bool operator==(const DelegateBase& rhs) const = 0;
    virtual bool operator!=(const DelegateBase& rhs) { return !(*this == rhs); }


    /// Use Clone to provide a deep copy using a base pointer. Covariant
    /// overloading is used so that a Clone() method return type is a
    /// more specific type in the derived class implementations.
    /// @return A dynamic copy of this instance created with operator new.
    /// @post The caller is responsible for deleting the clone instance.
    virtual DelegateBase* Clone() const = 0;
};

Delegate<> 提供了一个带模板化函数参数的模板类。operator() 函数允许使用正确的函数参数调用委托函数。Clone() 的协变重载提供了更具体的返回类型。

Clone() 函数是委托容器类所必需的。委托容器需要创建委托的副本以存储在列表中。由于委托容器只知道抽象基类 Delegate<> 实例,因此它必须在创建副本时使用 Clone() 函数。

template <class R>
struct Delegate; // Not defined

template <class RetType, class... Args>
class Delegate<RetType(Args...)> : public DelegateBase {
public:
    virtual RetType operator()(Args... args) = 0;
    virtual Delegate* Clone() const = 0;
};

RetType 是绑定的函数返回类型。Args 参数包是零个或多个绑定的函数参数。operator() 根据派生类的实现,同步或异步地调用绑定的函数。

在同一个类中高效地存储实例成员函数和自由函数是很困难的。因此,为每种类型的绑定函数创建了两个类。DelegateMember<> 处理实例成员函数。DelegateFree<> 处理自由函数和静态函数。

Clone() 创建类的新实例。Bind() 接受类实例和成员函数指针。operator() 函数允许调用使用 Bind() 分配的函数。

template <class C, class R>
struct DelegateMember; // Not defined

template <class TClass, class RetType, class... Args>
class DelegateMember<TClass, RetType(Args...)> : public Delegate<RetType(Args...)> {
public:
    typedef TClass* ObjectPtr;
    typedef RetType(TClass::*MemberFunc)(Args...);
    typedef RetType(TClass::*ConstMemberFunc)(Args...) const;

    DelegateMember(ObjectPtr object, MemberFunc func) { Bind(object, func); }
    DelegateMember(ObjectPtr object, ConstMemberFunc func) { Bind(object, func); }
    DelegateMember() : m_object(nullptr), m_func(nullptr) { }

    /// Bind a member function to a delegate. 
    void Bind(ObjectPtr object, MemberFunc func) {
        m_object = object;
        m_func = func;
    }

    /// Bind a const member function to a delegate. 
    void Bind(ObjectPtr object, ConstMemberFunc func) {
        m_object = object;
        m_func = reinterpret_cast<MemberFunc>(func);
    }

    virtual DelegateMember* Clone() const { return new DelegateMember(*this); }

    // Invoke the bound delegate function
    virtual RetType operator()(Args... args) {
        return std::invoke(m_func, m_object, args...);
    }

    virtual bool operator==(const DelegateBase& rhs) const {
        const DelegateMember<TClass, RetType(Args...)>* 
        derivedRhs = 
        dynamic_cast<const DelegateMember<TClass, RetType(Args...)>*>(&rhs);
        return derivedRhs &&
            m_func == derivedRhs->m_func &&
            m_object == derivedRhs->m_object;
    }

    bool Empty() const { return !(m_object && m_func); }
    void Clear() { m_object = nullptr; m_func = nullptr; }

    explicit operator bool() const { return !Empty(); }

private:
    ObjectPtr m_object;      // Pointer to a class object
    MemberFunc m_func;       // Pointer to an instance member function
};

请注意,std::invoke 用于在 operator() 中调用绑定的函数。使用 RetValArgs 参数包模板参数,这个单一的 DelegateMember 类可以处理所有目标函数签名。

DelegateFree<> 绑定到自由函数或静态成员函数。请注意,它与 DelegateMember<> 一样继承自 Delegate<>Bind() 接受一个函数指针,operator() 允许后续调用绑定的函数。

template <class R>
struct DelegateFree; // Not defined

template <class RetType, class... Args>
class DelegateFree<RetType(Args...)> : public Delegate<RetType(Args...)> {
public:
    typedef RetType(*FreeFunc)(Args...);

    DelegateFree(FreeFunc func) { Bind(func); }
    DelegateFree() : m_func(nullptr) { }

    /// Bind a free function to the delegate.
    void Bind(FreeFunc func) { m_func = func; }

    virtual DelegateFree* Clone() const { return new DelegateFree(*this); }

    /// Invoke the bound delegate function. 
    virtual RetType operator()(Args... args) {
        return std::invoke(m_func, args...);
    }

    virtual bool operator==(const DelegateBase& rhs) const {
        const DelegateFree<RetType(Args...)>* 
        derivedRhs = dynamic_cast<const DelegateFree<RetType(Args...)>*>(&rhs);
        return derivedRhs &&
            m_func == derivedRhs->m_func;
    }

    bool Empty() const { return !m_func; }
    void Clear() { m_func = nullptr; }

    explicit operator bool() const { return !Empty(); }

private:
    FreeFunc m_func;        // Pointer to a free function
};

DelegateMemberAsync<> 是非阻塞异步版本委托,允许在客户端指定的控制线程上调用。operator() 函数实际上不调用目标函数,而是将委托和所有函数参数打包到堆内存中,形成一个 DelegateMsgHeapArgs<> 实例,以便通过消息队列使用 DispatchDelegate() 发送。调用 operator() 后,目标线程调用 DelegateInvoke() 函数来实际调用绑定的函数。

template <class C, class R>
struct DelegateMemberAsync; // Not defined

template <class TClass, class... Args>
class DelegateMemberAsync<TClass, void(Args...)> : 
      public DelegateMember<TClass, void(Args...)>, public IDelegateInvoker {
public:
    typedef TClass* ObjectPtr;
    typedef void (TClass::*MemberFunc)(Args...);
    typedef void (TClass::*ConstMemberFunc)(Args...) const;

    // Constructors take a class instance, member function, and callback thread
    DelegateMemberAsync(ObjectPtr object, MemberFunc func, DelegateThread* thread) : 
                        m_sync(false)
        { Bind(object, func, thread); }
    DelegateMemberAsync(ObjectPtr object, ConstMemberFunc func, 
                        DelegateThread* thread) : m_sync(false)
        { Bind(object, func, thread); }
    DelegateMemberAsync() : m_thread(nullptr), m_sync(false) { }

    /// Bind a member function to a delegate. 
    void Bind(ObjectPtr object, MemberFunc func, DelegateThread* thread) {
        m_thread = thread;
        DelegateMember<TClass, void(Args...)>::Bind(object, func);
    }

    /// Bind a const member function to a delegate. 
    void Bind(ObjectPtr object, ConstMemberFunc func, DelegateThread* thread) {
        m_thread = thread;
        DelegateMember<TClass, void(Args...)>::Bind(object, func);
    }

    virtual DelegateMemberAsync<TClass, void(Args...)>* Clone() const {
        return new DelegateMemberAsync<TClass, void(Args...)>(*this);
    }

    virtual bool operator==(const DelegateBase& rhs) const {
        const DelegateMemberAsync<TClass, void(Args...)>* 
        derivedRhs = 
        dynamic_cast<const DelegateMemberAsync<TClass, void(Args...)>*>(&rhs);
        return derivedRhs &&
            m_thread == derivedRhs->m_thread &&
            DelegateMember<TClass, void(Args...)>::operator == (rhs);
    }

    /// Invoke delegate function asynchronously
    virtual void operator()(Args... args) {
        if (m_thread == nullptr || m_sync)
            DelegateMember<TClass, void(Args...)>::operator()(args...);
        else
        {
            // Create a clone instance of this delegate 
            auto delegate = 
            std::shared_ptr<DelegateMemberAsync<TClass, void(Args...)>>(Clone());

            // Create the delegate message
            auto msg = std::shared_ptr<DelegateMsgHeapArgs<Args...>>
                       (new DelegateMsgHeapArgs<Args...>(delegate, args...));

            // Dispatch message onto the callback destination thread. DelegateInvoke()
            // will be called by the target thread. 
            m_thread->DispatchDelegate(msg);
        }
    }

    /// Called by the target thread to invoke the delegate function 
    virtual void DelegateInvoke(std::shared_ptr<DelegateMsgBase> msg) {
        // Typecast the base pointer to back to the templatized instance
        auto delegateMsg = static_cast<DelegateMsgHeapArgs<Args...>*>(msg.get());

        // Invoke the delegate function
        m_sync = true;
        std::apply(&DelegateMember<TClass, void(Args...)>::operator(),
            std::tuple_cat(std::make_tuple(this), delegateMsg->GetArgs()));
    }

private:
    /// Target thread to invoke the delegate function
    DelegateThread* m_thread;
    bool m_sync;
};

与使用 std::invoke 的同步委托不同,异步版本使用 std::apply 来调用目标线程上的绑定函数,并使用由 make_tuple_heap() 创建并通过消息队列发送的参数元组。

// Invoke the delegate function 
m_sync = true;
std::apply(&DelegateMember<TClass, void(Args...)>::operator(), 
    std::tuple_cat(std::make_tuple(this), delegateMsg->GetArgs()));

DelegateMemberAsyncWait<> 是一个阻塞异步委托,它绑定到类实例成员函数。下面显示了两个主要函数。当调用 operator() 时,它会阻塞等待 DelegateInvoke() 在目标线程上被调用或超时过期。“Wait”版本不使用 make_tuple_heap(),因为原始数据类型直接传递到目标线程以支持输出参数。

template <class C, class R>
struct DelegateMemberAsyncWait; // Not defined

template <class TClass, class RetType, class... Args>
class DelegateMemberAsyncWait<TClass, RetType(Args...)> : 
      public DelegateMember<TClass, RetType(Args...)>, public IDelegateInvoker {
public:
    /// ...

    /// Invoke delegate function asynchronously
    virtual RetType operator()(Args... args) {
        if (this->m_thread == nullptr || m_sync)
            return DelegateMember<TClass, RetType(Args...)>::operator()(args...);
        else {
            // Create a clone instance of this delegate 
            auto delegate = std::shared_ptr<DelegateMemberAsyncWait<TClass, 
                            RetType(Args...)>>(Clone());
            delegate->m_sema.Create();
            delegate->m_sema.Reset();

            // Create a new message instance 
            auto msg = std::shared_ptr<DelegateMsg<Args...>>
                       (new DelegateMsg<Args...>(delegate, args...)); 

            // Dispatch message onto the callback destination thread. DelegateInvoke()
            // will be called by the target thread. 
            this->m_thread->DispatchDelegate(msg);

            // Wait for target thread to execute the delegate function
            if ((m_success = delegate->m_sema.Wait(m_timeout)))
                m_invoke = delegate->m_invoke;

            return m_invoke.GetRetVal();
        }
    }

    /// Called by the target thread to invoke the delegate function 
    virtual void DelegateInvoke(std::shared_ptr<DelegateMsgBase> msg) {
        // Typecast the base pointer to back to the templatized instance
        auto delegateMsg = static_cast<DelegateMsg<Args...>*>(msg.get());

        // Invoke the delegate function then signal the waiting thread
        m_sync = true;
        m_invoke(this, delegateMsg->GetArgs());
        this->m_sema.Signal();
    }

    /// ...

堆模板参数包

非阻塞异步调用意味着所有参数数据都必须复制到堆内存中以传输到目标线程。参数有不同的样式:按值、按引用、指针和指针的指针。对于非阻塞委托,除按值传递外,其他类型都需要在堆上创建数据,以确保数据在目标线程上有效。能够将每个参数保存到 DelegateMsgHeapArgs<> 中的关键是 make_tuple_heap() 函数。这个模板元编程函数创建一个参数元组,其中每个元组元素都在堆上创建。

/// @brief Terminate the template metaprogramming argument loop
template<typename... Ts>
auto make_tuple_heap(std::list<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, 
                     std::tuple<Ts...> tup)
{
    return tup;
}

/// @brief Creates a tuple with all tuple elements created on the heap using
/// operator new. Call with an empty list and empty tuple. 
/// The empty tuple is concatenated with each heap element. 
/// The list contains heap_arg_deleter_base objects for each 
/// argument heap memory block that will be automatically deleted after the bound
/// function is invoked on the target thread. 
template<typename Arg1, typename... Args, typename... Ts>
auto make_tuple_heap(std::list<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, 
                     std::tuple<Ts...> tup, Arg1 arg1, Args... args)
{
    auto new_tup = tuple_append(heapArgs, tup, arg1);
    return make_tuple_heap(heapArgs, new_tup, args...);
}

模板元编程在代码中使用 C++ 模板系统执行编译时计算。注意 make_tuple_heap() 的递归编译器调用,随着 Arg1 模板参数被函数消耗,直到没有参数剩余,递归才终止。上面的片段显示了堆分配的元组函数参数的连接。这允许将参数复制到动态内存中,通过消息队列传输到目标线程。

make_tuple_heap.h 中的这段代码创建起来很棘手,因为每个参数都必须分配内存、复制数据、附加到元组,然后根据其类型进行后续的去分配。更复杂的是,所有这些都必须以泛型方式完成,处理 N 个不同的模板参数。这是通过消息队列传递参数模板包的关键。然后 DelegateMsgHeapArgs 存储元组参数,以便目标线程方便使用。目标线程使用 std::apply() 调用绑定的函数,并使用堆分配的元组参数。

下面显示了 tuple_append() 的指针参数实现。它为参数创建动态内存,复制参数数据,将其添加到删除器列表中以便在目标函数调用完成后稍后清理,最后返回附加的元组。

/// @brief Append a pointer argument to the tuple
template <typename Arg, typename... TupleElem>
auto tuple_append(std::list<std::shared_ptr<heap_arg_deleter_base>>& heapArgs, 
                  const std::tuple<TupleElem...> &tup, Arg* arg)
{
    Arg* heap_arg = nullptr;
    try
    {
        heap_arg = new Arg(*arg);

        std::shared_ptr<heap_arg_deleter_base> deleter
                       (new heap_arg_deleter<Arg*>(heap_arg));
        heapArgs.push_back(deleter);

        return std::tuple_cat(tup, std::make_tuple(heap_arg));
    }
    catch (std::bad_alloc&)
    {
        if (heap_arg)
            delete heap_arg;
        throw;
    }
}

下面实现了指针参数删除器。当目标函数调用完成时,heap_arg_deleter 的析构函数将删除堆参数内存。堆参数不能更改为智能指针,因为它会改变目标函数签名中使用的参数类型。因此,heap_arg_deleter 作为智能指针包装器,围绕(潜在的)非智能堆参数。

/// @brief Frees heap memory for pointer heap argument
template<typename T>
class heap_arg_deleter<T*> : public heap_arg_deleter_base
{
public:
    heap_arg_deleter(T* arg) : m_arg(arg) { }
    virtual ~heap_arg_deleter()
    {
        delete m_arg;
    }
private:
    T* m_arg;
};

参数堆拷贝

非阻塞异步调用意味着所有参数数据都必须复制到堆内存中以传输到目标线程。这意味着所有参数,无论参数类型如何,都将被复制,包括:值、指针、指针的指针、引用。如果你的数据不是普通旧数据 (POD),无法进行按位复制,请确保实现适当的复制构造函数来自己处理复制。

例如,异步调用此函数时,参数 TestStruct 将被复制。

void TestFunc(TestStruct* data);

绕过参数堆拷贝

有时,您可能不希望委托库复制您的参数。相反,您只希望目标线程拥有原始副本的指针。以下是如何真正传递指针而不复制对象的方法。使用 shared_ptr 作为函数参数可防止对象被复制。

例如,异步调用此函数时,不会复制 TestStruct 对象。

void TestFunc(std::shared_ptr<TestStruct> data);

数组参数堆拷贝

根据 C 标准,数组函数参数会被调整为指针。简而言之,任何声明为 T a[]T a[N] 的函数参数都被视为声明为 T *a。由于数组大小未知,库无法复制整个数组。例如,下面的函数

void ArrayFunc(char a[]) {}

需要委托参数 char*,因为 char a[] 被“调整”为 char *a

MulticastDelegateSafe1<char*> delegateArrayFunc;
delegateArrayFunc += MakeDelegate(&ArrayFunc, &workerThread1);

使用异步委托时,无法按值异步传递 C 风格数组。我的建议是,如果可能,尽量避免使用 C 风格数组,以避免混淆和错误。

工作线程 (std::thread)

下面显示了 std::thread 实现的线程循环。该循环在每个异步委托实例上调用 DelegateInvoke() 函数。

void WorkerThread::Process()
{
    while (1)
    {
        std::shared_ptr<ThreadMsg> msg;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->GetId())
        {
            case MSG_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg->GetData() != NULL);

                // Convert the ThreadMsg void* data back to a DelegateMsg* 
                auto delegateMsg = msg->GetData();

                // Invoke the callback on the target thread
                delegateMsg->GetDelegateInvoker()->DelegateInvoke(delegateMsg);
                break;
            }

            case MSG_EXIT_THREAD:
            {
                return;
            }

            default:
                ASSERT();
        }
    }
}

任何项目特定的线程循环都可以调用 DelegateInvoke()。这只是一个示例。唯一的要求是您的工作线程类继承自 DelegateLib::DelegateThread 并实现 DispatchDelegate() 抽象函数。DisplatchDelegate() 会将共享消息指针插入线程队列进行处理。

委托容器

委托容器存储一个或多个委托。委托容器层次结构如下所示:

  • MulticastDelegateBase
  • MulticastDelegate<>
  • MulticastDelegateSafe<>
  • SinglecastDelegate<>

MulticastDelegate<> 提供 operator() 函数以按顺序调用列表中的每个委托。

MulticastDelegateSafe<> 提供委托 API 的线程安全包装器。每个函数都提供一个锁守护程序,以防止同时访问。使用 RAII(资源获取即初始化)技术来处理锁。

template <class R>
struct MulticastDelegateSafe; // Not defined

/// @brief Thread-safe multicast delegate container class. 
template<class RetType, class... Args>
class MulticastDelegateSafe<RetType(Args...)> : 
      public MulticastDelegate<RetType(Args...)>
{
public:
    MulticastDelegateSafe() { LockGuard::Create(&m_lock); }
    ~MulticastDelegateSafe() { LockGuard::Destroy(&m_lock); }

    void operator+=(const Delegate<RetType(Args...)>& delegate) {
        LockGuard lockGuard(&m_lock);
        MulticastDelegate<RetType(Args...)>::operator +=(delegate);
    }
    void operator-=(const Delegate<RetType(Args...)>& delegate) {
        LockGuard lockGuard(&m_lock);
        MulticastDelegate<RetType(Args...)>::operator -=(delegate);
    }
    void operator()(Args... args) {
        LockGuard lockGuard(&m_lock);
        MulticastDelegate<RetType(Args...)>::operator ()(args...);
    }
    bool Empty() {
        LockGuard lockGuard(&m_lock);
        return MulticastDelegate<RetType(Args...)>::Empty();
    }
    void Clear() {
        LockGuard lockGuard(&m_lock);
        MulticastDelegate<RetType(Args...)>::Clear();
    }

    explicit operator bool() {
        LockGuard lockGuard(&m_lock);
        return MulticastDelegate<RetType(Args...)>::operator bool();
    }

private:
    // Prevent copying objects
    MulticastDelegateSafe(const MulticastDelegateSafe&) = delete;
    MulticastDelegateSafe& operator=(const MulticastDelegateSafe&) = delete;

    /// Lock to make the class thread-safe
    LOCK m_lock;
};

SysData 示例

几个实际示例将演示常见的委托使用模式。首先,SysData 是一个简单的类,展示了如何公开一个出站异步接口。该类存储系统数据,并在模式更改时提供异步订阅者通知。类接口如下所示:

class SysData
{
public:
    /// Clients register with MulticastDelegateSafe1 to get callbacks 
    /// when system mode changes
    MulticastDelegateSafe<void(const SystemModeChanged&)> SystemModeChangedDelegate;

    /// Get singleton instance of this class
    static SysData& GetInstance();

    /// Sets the system mode and notify registered clients 
    /// via SystemModeChangedDelegate.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemMode(SystemMode::Type systemMode);    

private:
    SysData();
    ~SysData();

    /// The current system mode data
    SystemMode::Type m_systemMode;

    /// Lock to make the class thread-safe
    LOCK m_lock;
};

接收回调的订阅者接口是 SystemModeChangedDelegate。调用 SetSystemMode() 将新模式保存到 m_systemMode 中,并通知所有已注册的订阅者。

void SysData::SetSystemMode(SystemMode::Type systemMode)
{
    LockGuard lockGuard(&m_lock);

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedDelegate)
        SystemModeChangedDelegate(callbackData);
}

SysDataClient 示例

SysDataClient 是一个委托订阅者,在构造函数中注册 SysData::SystemModeChangedDelegate 通知。

    // Constructor
    SysDataClient() :
        m_numberOfCallbacks(0)
    {
        // Register for async delegate callbacks
        SysData::GetInstance().SystemModeChangedDelegate += 
                 MakeDelegate(this, &SysDataClient::CallbackFunction, &workerThread1);
        SysDataNoLock::GetInstance().SystemModeChangedDelegate += 
                       MakeDelegate(this, &SysDataClient::CallbackFunction, 
                       &workerThread1);
    }

当系统模式更改时,SysDataClient::CallbackFunction() 现在将在 workerThread1 上调用。

    void CallbackFunction(const SystemModeChanged& data)
    {
        m_numberOfCallbacks++;
        cout << "CallbackFunction " << data.CurrentSystemMode << endl;
    }

当调用 SetSystemMode() 时,任何对模式更改感兴趣的方都会收到同步或异步通知,具体取决于注册的委托类型。

// Set new SystemMode values. Each call will invoke callbacks to all
// registered client subscribers.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);

SysDataNoLock 示例

SysDataNoLock 是一个替代实现,它使用一个 private MulticastDelegateSafe<> 来异步设置系统模式且不使用锁。

class SysDataNoLock
{
public:
    /// Clients register with MulticastDelegateSafe to get callbacks 
    /// when system mode changes
    MulticastDelegateSafe<void(const SystemModeChanged&)> SystemModeChangedDelegate;

    /// Get singleton instance of this class
    static SysDataNoLock& GetInstance();

    /// Sets the system mode and notify registered clients 
    /// via SystemModeChangedDelegate.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemMode(SystemMode::Type systemMode);    

    /// Sets the system mode and notify registered clients 
    /// via a temporary stack created
    /// asynchronous delegate. 
    /// @param[in] systemMode - The new system mode. 
    void SetSystemModeAsyncAPI(SystemMode::Type systemMode);    

    /// Sets the system mode and notify registered clients 
    /// via a temporary stack created asynchronous delegate. 
    /// This version blocks (waits) until the delegate callback
    /// is invoked and returns the previous system mode value. 
    /// @param[in] systemMode - The new system mode. 
    /// @return The previous system mode. 
    SystemMode::Type SetSystemModeAsyncWaitAPI(SystemMode::Type systemMode);

private:
    SysDataNoLock();
    ~SysDataNoLock();

    /// Private callback to get the SetSystemMode call onto a common thread
    MulticastDelegateSafe<void(SystemMode::Type)> SetSystemModeDelegate; 

    /// Sets the system mode and notify registered clients 
    /// via SystemModeChangedDelegate.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemModePrivate(SystemMode::Type);    

    /// The current system mode data
    SystemMode::Type m_systemMode;
};

构造函数将 SetSystemModePrivate() 注册到 private SetSystemModeDelegate

SysDataNoLock::SysDataNoLock() :
    m_systemMode(SystemMode::STARTING)
{
    SetSystemModeDelegate += MakeDelegate
                 (this, &SysDataNoLock::SetSystemModePrivate, &workerThread2);
    workerThread2.CreateThread();
}

下面的 SetSystemMode() 函数是一个异步入站接口的示例。对调用者来说,它看起来像一个普通函数,但底层是通过委托异步调用私有成员。在这种情况下,调用 SetSystemModeDelegate 会导致 SetSystemModePrivate()workerThread2 上被调用。

void SysDataNoLock::SetSystemMode(SystemMode::Type systemMode)
{
    // Invoke the private callback. SetSystemModePrivate() 
    // will be called on workerThread2.
    SetSystemModeDelegate(systemMode);
}

由于这个 private 函数总是异步调用到 workerThread2,所以它不需要锁。

void SysDataNoLock::SetSystemModePrivate(SystemMode::Type systemMode)
{
      // Create the callback data
      SystemModeChanged callbackData;

      callbackData.PreviousSystemMode = m_systemMode;
      callbackData.CurrentSystemMode = systemMode;

      // Update the system mode
      m_systemMode = systemMode;

      // Callback all registered subscribers
      if (SystemModeChangedDelegate)
            SystemModeChangedDelegate(callbackData);
}

SysDataNoLock 重调用示例

虽然创建一个单独的 private 函数来创建异步 API 有效,但使用委托,可以直接在另一个线程上重调用同一个函数。执行一个简单的检查,看调用者是否正在期望的控制线程上执行。如果不是,则在堆栈上创建一个临时的异步委托,然后调用它。委托和调用者的所有原始函数参数都会被复制到堆上,并且函数会在 workerThread2 上被重调用。这是用最少的努力创建异步 API 的一种优雅方式。

void SysDataNoLock::SetSystemModeAsyncAPI(SystemMode::Type systemMode)
{
    // Is the caller executing on workerThread2?
    if (workerThread2.GetThreadId() != WorkerThread::GetCurrentThreadId())
    {
        // Create an asynchronous delegate and 
        // re-invoke the function call on workerThread2
        auto delegate = 
          MakeDelegate(this, &SysDataNoLock::SetSystemModeAsyncAPI, &workerThread2);
        delegate(systemMode);
        return;
    }

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedDelegate)
        SystemModeChangedDelegate(callbackData);
}

SysDataNoLock 阻塞重调用示例

阻塞异步 API 可以隐藏在类成员函数中。下面的函数在 workerThread2 上设置当前模式并返回之前的模式。如果调用者不在 workerThread2 上执行,则会在堆栈上创建一个阻塞委托并调用它。对调用者而言,函数看起来是同步的,但委托确保在返回之前调用在正确线程上执行。

SystemMode::Type SysDataNoLock::SetSystemModeAsyncWaitAPI(SystemMode::Type systemMode)
{
    // Is the caller executing on workerThread2?
    if (workerThread2.GetThreadId() != WorkerThread::GetCurrentThreadId())
    {
        // Create an asynchronous delegate and 
        // re-invoke the function call on workerThread2
        auto delegate =
            MakeDelegate(this, &SysDataNoLock::SetSystemModeAsyncWaitAPI, 
                         &workerThread2, WAIT_INFINITE);
        return delegate(systemMode);
    }

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedDelegate)
        SystemModeChangedDelegate(callbackData);

    return callbackData.PreviousSystemMode;
}

Timer 示例

一旦建立了委托框架,创建定时器回调服务就变得微不足道了。许多系统需要一种基于超时生成回调的方法。可能是周期性超时用于低速轮询,也可能是错误超时,以防某些事情未在预期时间内发生。无论哪种情况,回调都必须在指定的控制线程上发生。在 Timer 类中使用的 SinglecastDelegate<void(void)> 可以很好地解决这个问题。

/// @brief A timer class provides periodic timer callbacks on the client's 
/// thread of control. Timer is thread safe.
class Timer 
{
public:
    static const DWORD MS_PER_TICK;

    /// Client's register with Expired to get timer callbacks
    SinglecastDelegate<void(void)> Expired;

    /// Constructor
    Timer(void);

    /// Destructor
    ~Timer(void);

用户创建一个定时器实例并注册过期事件。在这种情况下,MyClass::MyCallback() 将在 1000 毫秒后调用。

m_timer.Expired = MakeDelegate(&myClass, &MyClass::MyCallback, &myThread);
m_timer.Start(1000);

摘要

所有委托都可以使用 MakeDelegate() 创建。函数参数决定了返回的委托类型。

同步委托使用一个参数创建自由函数,使用两个参数创建实例成员函数。

auto freeDelegate = MakeDelegate(&MyFreeFunc);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc);

添加线程参数会创建一个非阻塞异步委托。

auto freeDelegate = MakeDelegate(&MyFreeFunc, &myThread);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc, &myThread);

std::shared_ptr 可以替换同步和非阻塞异步成员委托中的裸实例指针。

std::shared_ptr<MyClass> myClass(new MyClass());
auto memberDelegate = MakeDelegate(myClass, &MyClass::MyMemberFunc, &myThread);

添加 timeout 参数会创建一个阻塞异步委托。

auto freeDelegate = MakeDelegate(&MyFreeFunc, &myThread, WAIT_INFINITE);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc, &myThread, 5000);

委托使用 operator+=operator-= 添加/删除到多播容器。所有容器都接受所有委托类型。

MulticastDelegate<void(int)> multicastContainer;
multicastContainer += MakeDelegate(&MyFreeFunc);
multicastContainer -= MakeDelegate(&MyFreeFunc);

使用异步委托时,请使用线程安全的(thread-safe)多播委托容器,以允许多个线程安全地添加/删除容器。

MulticastDelegateSafe<void(int)> multicastContainer;
multicastContainer += MakeDelegate(&MyFreeFunc, &myThread);
multicastContainer -= MakeDelegate(&MyFreeFunc, &myThread);

使用 operator= 添加和删除单播委托。

SinglecastDelegate<void(int)> singlecastContainer;
singlecastContainer = MakeDelegate(&MyFreeFunc);
singlecastContainer = 0;

使用 operator() 调用所有委托和委托容器。

if (myDelegate)
      myDelegate(123);

在使用返回值或出参前,对阻塞委托使用 IsSuccess()

if (myDelegate) 
{
     int outInt = 0;
     int retVal = myDelegate(&outInt);
     if (myDelegate.IsSuccess()) 
     {
          cout << outInt << retVal;
     }
}

哪种回调实现?

我在 CodeProject 上记录了四种不同的异步多播回调实现。每种版本都有其独特的特性和优点。下面的部分将重点介绍每种解决方案的主要区别。请参阅下文的参考文献部分,获取每篇文章的链接。

C 中的异步多播回调

  • 用 C 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用 memcpy 复制回调参数数据
  • 通过宏提供类型安全
  • 静态数组存储已注册的订阅者回调
  • 已注册订阅者数量在编译时固定
  • C 语言中的固定块内存分配器
  • 紧凑的实现

带线程间消息传递的异步多播回调

  • 用 C++ 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 模板使用量少
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 紧凑的实现

C++ 中的异步多播委托

  • 用 C++ 实现
  • C++ 委托范式
  • 任何回调函数类型(成员、静态、自由)
  • 支持多个回调参数(最多 5 个)
  • 回调参数任意类型(值、引用、指针、指针的指针)
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 大量使用模板
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 较大的实现

现代 C++ 中的异步多播委托

  • 用 C++(即 C++17)实现
  • C++ 委托范式
  • 函数签名委托参数
  • 任何回调函数类型(成员、静态、自由)
  • 支持多个回调参数(支持 N 个参数)
  • 回调参数任意类型(值、引用、指针、指针的指针)
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 大量使用模板
  • 可变参数模板
  • 模板元编程
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • 紧凑的实现(得益于可变参数模板)

限制

Lambda 函数目前不支持作为目标绑定函数。

委托库目前不支持调用位于单独进程或 CPU 中的函数的 远程委托

目前不支持固定块分配器。所有动态内存都使用 operator newdelete 从堆中获取。

参考文献

结论

多年来,我做了大量的多线程应用程序开发。在目标线程上带有数据地调用函数一直是一个手工制作、耗时耗力的过程。这个库将这些构造通用化,并将它们封装到一个用户友好的委托库中。

本文提出了一种现代 C++ 多播委托实现,支持同步和异步函数调用。非阻塞异步委托提供“即发即忘”的调用方式,而阻塞版本允许从目标线程等待返回值和出参。多播委托容器通过允许多个客户端注册回调通知来扩展委托的可用性。通过让库处理调用函数和跨线程移动数据的底层线程细节,简化了多线程应用程序的开发。线程间代码巧妙地隐藏在库中,用户只需与易于使用的委托 API 进行交互。

历史

  • 2020 年 8 月 19 日:首次发布
  • 2020 年 8 月 23 日:添加了新章节并改进了解释。
  • 2020 年 8 月 24 日:修复了“Wait”委托类型的输出参数。附带了新的源代码。
  • 2020 年 9 月 3 日:更新了“Timer”章节。添加了 Timer 类。附带了新的源代码。
  • 2022 年 9 月 20 日:简化代码。移除不必要的锁。附带了新的源代码。
  • 2022 年 12 月 12 日:使用委托与 lambda 的更新。附带了新的源代码。
  • 2024 年 10 月 11 日:添加 CMake 构建。小的代码改进。添加了快速入门和项目构建章节。

 

© . All rights reserved.