65.9K
CodeProject 正在变化。 阅读更多。
Home

C++ 中的异步多播委托

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (63投票s)

2016年12月13日

CPOL

34分钟阅读

viewsIcon

83954

downloadIcon

2351

一个符合 C++ 标准的委托库,能够同步或异步地定位任何可调用函数

引言

似乎没有什么比委托更能引起 C++ 程序员的兴趣了。在其他语言中,委托是头等功能,因此开发人员可以使用这些众所周知的构造。然而,在 C++ 中,委托并非原生可用。但这并不能阻止我们程序员尝试模拟委托存储和调用任何可调用函数的简易性。

委托通常支持同步执行,也就是说,当被调用时,绑定的函数在调用者的控制线程中执行。在多线程应用程序中,理想情况下可以指定目标函数及其应执行的线程,而无需强加函数签名限制。该库负责将委托和所有参数数据传输到目标线程。本文的目的是提供一个 C++ 委托库,其 API 具有一致性,能够对任何可调用函数进行同步和异步调用。

委托库的特性如下:

  1. 任何编译器 – 任何编译器的标准 C++ 代码,无需奇怪的技巧
  2. 任意函数 - 调用任何可调用函数:成员函数、静态函数或自由函数
  3. 任意参数类型 - 支持任何参数类型:值、引用、指针、指针的指针
  4. 多参数 – 支持多个函数参数
  5. 函数式模板参数 - 2022 年库更新新增
  6. 同步调用 – 同步调用绑定的函数
  7. 异步调用 – 在客户端指定的线程上异步调用绑定的函数
  8. 阻塞式异步调用 - 使用阻塞或非阻塞委托进行异步调用
  9. 智能指针支持 - 使用原始对象指针或 std::shared_ptr 绑定实例函数
  10. Lambda 支持 - 使用委托异步绑定和调用 Lambda 函数。
  11. 自动堆处理 – 自动将参数数据复制到堆,以便通过消息队列安全传输
  12. 固定块分配器 – 可选地将堆分配转移到固定块内存池
  13. 任何操作系统 – 轻松移植到任何操作系统。包括 Win32 和 std::thread 移植
  14. 32/64 位 - 支持 32 位和 64 位项目。
  15. CMake 构建 - CMake 支持大多数工具链,包括 Windows 和 Linux。
  16. 单元测试 - 包含委托库的全面单元测试
  17. 无外部库 – 委托不依赖外部库
  18. 易用性 – 尽可能与 FastDelegate API 匹配

委托实现通过在您指定的控制线程上执行带有所有函数参数的委托函数,显著简化了多线程应用程序开发。该框架处理所有底层机制,以安全地在目标线程上调用任何函数签名。

使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。

请查看 GitHub 获取最新源代码: 

查看相关 GitHub 项目

2022 库更新

C++ 委托库已更新,具有以下功能:

  1. 函数式委托语法
  2. AsyncInvoke() 简化异步函数调用
  3. 需要 C++11 或更高版本的 C++ 编译器

新旧语法对比见下。旧语法使用标准模板参数。它还需要将函数参数的数量作为委托类型的一部分(例如,DelegateFree1<> 是一个函数参数委托)。

// Create a delegate bound to a free function then invoke
DelegateFree1<int> delegateFree = MakeDelegate(&FreeFuncInt);
delegateFree(123);

// Create a delegate bound to a member function then invoke
DelegateMember1<TestClass, TestStruct*> delegateMember = 
                MakeDelegate(&testClass, &TestClass::MemberFunc);
delegateMember(&testStruct);

新语法使用函数式模板参数来提高可读性

// Create a delegate bound to a free function then invoke
DelegateFree<void (int)> delegateFree = MakeDelegate(&FreeFuncInt);
delegateFree(123);

// Create a delegate bound to a member function then invoke
DelegateMember<void (TestClass(TestStruct*))> 
    delegateMember = MakeDelegate(&testClass, &TestClass::MemberFunc);
delegateMember(&testStruct);

建议使用 2022 年更新的库版本。

本文示例使用旧语法。然而,解释是准确的。

委托背景

如果您不熟悉委托,这个概念非常简单。委托可以被认为是超级函数指针。在 C++ 中,没有指针类型能够指向所有可能的函数变体:实例成员、虚、const、静态和自由(全局)。函数指针不能指向实例成员函数,而成员函数指针有各种限制。然而,委托类可以以类型安全的方式指向任何函数,只要函数签名匹配。简而言之,委托指向任何具有匹配签名的函数,以支持匿名函数调用。

最著名的 C++ 委托实现大概是 Doug Clugston 的 FastDelegate。我已在许多不同的项目中成功使用此代码。它易于使用,而且我还没有找到它无法工作的编译器。

虽然 FastDelegate 的使用非常流畅(且快速!),但检查代码会发现大量的技巧和“可怕”的类型转换,以使其在不同编译器之间普遍适用。当我第一次研究源代码时,我几乎因为其复杂性而没有在一个项目中使用它。然而,这让我思考:如果我不太关心速度呢?是否有可能设计一个符合 C++ 标准的委托?如果可以,它能否匹配 FastDelegate 的接口和可用性?

在实践中,虽然委托很有用,但多播版本显著扩展了其效用。绑定多个函数指针并按序调用所有注册者的能力,构成了一种有效的发布/订阅机制。发布者代码公开了一个委托容器,一个或多个匿名订阅者向发布者注册以接收回调通知。

多线程系统中回调的问题,无论是基于委托还是基于函数指针,都在于回调是同步发生的。必须注意,来自另一个控制线程的回调不能在非线程安全的代码上调用。多线程应用程序开发很困难。对于最初的设计者来说很困难;由于各种技能水平的工程师必须维护代码,所以很困难;由于错误以困难的方式显现,所以很困难。理想情况下,架构解决方案有助于最大限度地减少错误并简化应用程序开发。

有些人可能会质疑为什么没有使用 std::function 作为异步委托的基础。最初,我开始实现一个使用 std::function 来定位任何可调用函数的版本,并且它除了一个关键特性外都工作正常:相等性。我很快发现你不能比较 std::function 的相等性,而这对于从容器中取消注册是必需的。似乎没有简单、通用的方法来解决这个问题。如果没有一种方法来移除之前添加的可调用函数,设计就会失败。但并非一无所有。我最终创建的委托层次结构实际上最终成为了我试图实现的功能集的优势。而且创建它也很有趣。

我在此 CodeProject 上撰写的文章“带有线程间消息的异步多播回调”提供了一种概念上与此处提出的相似的异步多播回调,但回调签名是固定的,并且只支持一个模板化的函数参数。它还将回调函数类型限制为静态成员函数或自由函数。实例成员函数不受支持。接受这些限制的优点是 AsycnCallback<> 实现要简单紧凑得多。

此 C++ 委托实现功能齐全,允许同步或异步调用任何函数,甚至实例成员函数,以及任何参数。delegate 库使得绑定和调用任何函数都变得轻而易举。

Using the Code

我将首先介绍如何使用代码,然后深入探讨实现细节。

委托库由委托和委托容器组成。委托能够绑定到单个可调用函数。多播委托容器在列表中保存一个或多个委托,以按顺序调用。单播委托容器最多只保存一个委托。

主要的委托类如下所示,其中 X 是目标函数签名中的参数数量。例如,如果目标签名使用一个参数,如 void (int),则使用 DelegateFree1<> 版本。类似地,如果使用三个参数,如 void (int, float, char),则使用 DelgateFree3<>

  • DelegateFreeX<>
  • DelegateFreeAsyncX<>
  • DelegateFreeAsyncWaitX<>
  • DelegateMemberX<>
  • DelegateMemberAsyncX<>
  • DelegateMemberAsyncWaitX<>
  • DelegateMemberSpX<>
  • DelegateMemberSpAsyncX<>
  • DelegateRemoteSendX<>
  • DelegateFreeRemoteRecvX<>
  • DelegateMemberRemoteRecvX<>

DelegateFreeX<> 绑定到自由函数或静态成员函数。DelegateMemberX<> 绑定到类实例成员函数。DelegateMemberSpX<> 使用 std::shared_ptr 而不是原始对象指针绑定到类实例成员函数。所有版本都提供同步函数调用。

DelegateFreeAsyncX<>, DelegateMemberAsyncX<>DelegateMemberSpAsyncX<> 的操作方式与其同步对应物相同;不同之处在于这些版本在指定的控制线程上提供非阻塞异步函数执行。

DelegateFreeAsyncWaitX<>DelegateMemberAsyncWaitX<> 在目标线程上提供阻塞式异步函数执行,并带有调用者提供的最大等待超时。

DelegateRemoteSendX<>, DelegateFreeRemoteRecvX<>DelegateMemberRemoteRecvX<> 在文章《使用 C++ 委托的远程过程调用》中进行了讲解。

三个主要的委托容器类是

  • SinglecastDelegateX<>
  • MulticastDelegateX<>
  • MulticastDelegateSafeX<>

SinglecastDelegateX<> 是一个接受单个委托的委托容器。单播版本的优点是它稍微小一些,并且允许绑定函数中返回类型不是 void

MulticastDelegateX<> 是一个委托容器,它实现为一个单向链表,接受多个委托。只有绑定到返回类型为 void 的函数的委托才能添加到多播委托容器中。

MultcastDelegateSafeX<> 是一个线程安全容器,实现为单向链表,接受多个委托。如果多个线程访问容器实例,请务必使用线程安全版本。

每个容器按值存储委托。这意味着委托会根据模式在内部复制到堆内存或固定块内存中。用户无需在插入容器之前手动在堆上创建委托。通常,重载的模板函数 MakeDelegate() 用于根据函数参数创建委托实例。

同步委托

所有委托都使用重载的 MakeDelegate() 模板函数创建。编译器使用模板参数推导来选择正确的 MakeDelegate() 版本,从而无需手动指定模板参数。例如,这是一个简单的自由函数。

void FreeFuncInt(int value)
{
      cout << "FreeCallback " << value << endl;
}

要将自由函数绑定到委托,请使用 MakeDelegate() 创建一个 DelegateFree1<int> 实例。DelegateFree 模板参数是 int 函数参数。MakeDelegate() 返回一个 DelegateFree1<int> 对象,以下一行使用委托调用函数 FreeFuncInt

// Create a delegate bound to a free function then invoke
DelegateFree1<int> delegateFree = MakeDelegate(&FreeFuncInt);
delegateFree(123);

成员函数以相同的方式绑定到委托,只是这次 MakeDelegate() 使用两个参数:一个类实例和一个成员函数指针。两个 DelegateMember1 模板参数是类名和函数参数。

// Create a delegate bound to a member function then invoke
DelegateMember1<TestClass, TestStruct*> delegateMember = 
                MakeDelegate(&testClass, &TestClass::MemberFunc);
delegateMember(&testStruct);

通常使用委托容器来容纳一个或多个委托,而不是创建具体的自由或成员委托。委托容器可以容纳任何委托类型。例如,下面显示了一个绑定到任何具有 void (int) 函数签名的函数的多播委托容器。

MulticastDelegate1<int> delegateA;

单播委托以相同的方式创建。

SinglecastDelegate1<int> delegateB;

通过添加一个额外的模板参数,可以定义一个返回值,例如 `float (int)` 的函数签名。

SinglecastDelegate1<int, float> delegateC;

SinglecastDelegate<> 可以绑定到一个返回值的函数,而多播版本不能。原因是当调用多个回调时,应该使用哪个回调函数的返回值?正确的答案是没有,所以多播容器只接受返回类型为 void 的函数签名的委托。

更多的函数参数意味着使用 MulticastDelegate2MulticastDelegate3 版本。目前,该库支持最多五个函数参数。

MulticastDelegate2<int, int> delegateD;

MulticastDelegate3<float, int, char> delegateE;

当然,支持的不仅仅是内置的按值传递参数类型。

MulticastDelegate3<const MyClass&, MyStruct*, Data**> delegateF;

使用重载的 MakeDelegate() 函数和 operator+= 可以创建委托实例并将其添加到多播委托容器中。绑定自由函数或静态函数只需要一个函数指针参数。

delegateA += MakeDelegate(&FreeFuncInt);

实例成员函数也可以添加到任何委托容器中。对于成员函数,MakeDelegate() 的第一个参数是指向类实例的指针。第二个参数是指向成员函数的指针。

delegateA += MakeDelegate(&testClass, &TestClass::MemberFunc);

首先检查注册的客户端,然后为所有注册的委托调用回调。如果多个委托存储在 MulticastDelegate1<int> 中,则每个委托将按顺序调用。

// Invoke the delegate target functions
if (delegateA)
      delegateA(123);

从委托容器中删除委托实例使用 operator-=

delegateA -= MakeDelegate(&FreeFuncInt);

或者,使用 Clear() 删除容器中的所有委托。

delegateA.Clear();

使用 operator= 将委托添加到单播容器中。

SinglecastDelegate1<int, int> delegateF;
delegateF = MakeDelegate(&FreeFuncIntRetInt);

通过 `Clear()` 或赋值 `0` 进行移除。

delegateF.Clear();
delegateF = 0;

异步非阻塞委托

到目前为止,所有委托都是同步的。异步功能是在同步委托实现之上分层实现的。要使用异步委托,需要一个线程安全且可由多个线程安全访问的委托容器。锁保护类 API 免受同时访问。“Safe”版本如下所示。

MulticastDelegateSafe1<TestStruct*> delegateC;

作为 MakeDelegate() 的最后一个参数的线程指针强制创建异步委托。在这种情况下,添加线程参数会使 MakeDelegate() 返回一个 DelegateMemberAsync1<> 而不是 DelegateMember1<>

delegateC += MakeDelegate(&testClass, &TestClass::MemberFunc, &workerThread1);

调用与同步版本相同,但这次回调函数 TestClass::MemberFunc() 是从 workerThread1 调用的。

if (delegateC)
      delegateC(&testStruct);

这是另一个在 workerThread1 上调用异步委托的示例,带有 std::stringint 参数。

// Create delegate with std::string and int arguments then asychronously
// invoke on a member function
MulticastDelegateSafe2<const std::string&, int> delegateH;
delegateH += MakeDelegate(&testClass, &TestClass::MemberFuncStdString, &workerThread1);
delegateH("Hello world", 2016);

在同步和异步委托之间,库的使用方式是一致的。唯一的区别是 MakeDelegate() 增加了线程指针参数。请记住,在使用异步委托进行跨线程回调时,始终使用线程安全的 MulticastDelegateSafeX<> 容器。

委托库在调用非阻塞异步委托时的默认行为是,未按值传递的参数将被复制到堆内存中,以便安全地传输到目标线程。这意味着所有参数都将被复制。如果您的数据不是普通旧数据 (POD) 并且不能按位复制,那么请务必实现适当的复制构造函数来处理复制。

实际上,有一种方法可以避免复制,真正地传递指针而不复制它所指向的内容。但是,开发人员必须确保 (a) 在目标线程调用绑定函数时,被指向的数据仍然存在,并且 (b) 被指向的对象是线程安全的。这种技术将在文章后面描述。

更多示例请参阅附件源代码中的 main.cppDelegateUnitTests.cpp

绑定到 std::shared_ptr

绑定到实例成员函数需要一个指向对象的指针。委托库支持使用原始指针和 std::shared_ptr 智能指针进行绑定。用法符合预期;只需在调用 MakeDelegate() 时用 std::shared_ptr 替换原始对象指针即可。根据是否向 MakeDelegate() 传递线程参数,将返回 DelegateMemberSpX<>DelegateMemberSpAsyncX<> 实例。

// Create a shared_ptr, create a delegate, then synchronously invoke delegate function
std::shared_ptr<TestClass> spObject(new TestClass());
auto delegateMemberSp = MakeDelegate(spObject, &TestClass::MemberFuncStdString);
delegateMemberSp("Hello world using shared_ptr", 2016);

随附的 VC2008 无法使用 std::shared_ptr,因为编译器不支持该功能。请运行 VS2015 项目以获取使用 std::shared_ptr 的工作示例。

使用原始对象指针的注意事项

某些异步委托使用模式可能导致回调在已删除的对象上发生。问题在于:一个对象函数被绑定到委托并异步调用,但在目标线程上调用发生之前,目标对象已被删除。换句话说,在目标线程消息队列有机会调用回调之前,绑定到委托的对象可能会被删除。以下代码揭示了这个问题。

// Example of a bug where the testClassHeap is deleted before the asychronous delegate 
// is invoked on the workerThread1. In other words, by the time workerThread1 calls
// the bound delegate function the testClassHeap instance is deleted and no longer valid.
TestClass* testClassHeap = new TestClass();
auto delegateMemberAsync = 
    MakeDelegate(testClassHeap, &TestClass::MemberFuncStdString, &workerThread1);
delegateMemberAsync("Function async invoked on deleted object. Bug!", 2016);
delegateMemberAsync.Clear();
delete testClassHeap;

上面的例子是人造的,但它清楚地表明,在等待异步调用发生时,没有什么能阻止对象被删除。在许多嵌入式系统架构中,注册可能发生在单例对象或其生命周期贯穿整个执行的对象上。通过这种方式,应用程序的使用模式可以防止回调到已删除的对象。然而,如果对象出现、临时订阅委托进行回调,然后稍后被删除,则消息队列中可能存在的潜在委托可能会在已删除的对象上调用函数。

幸运的是,C++ 智能指针正是解决这些复杂对象生命周期问题的良方。DelegateMemberSpAsyncX<> 委托使用 std::shared_ptr 而不是原始对象指针进行绑定。现在委托拥有共享指针,对象被过早删除的危险就消除了。共享指针只会在所有引用不再使用时才删除所指向的对象。在下面的代码片段中,客户端代码移除了 testClassSp 的所有引用,但委托复制到队列中阻止了 TestClass 在异步委托回调发生之前被删除。

// Example of the smart pointer function version of the delegate. 
// The testClassSp instance is only deleted after workerThread1 
// invokes the callback function thus solving the bug.
std::shared_ptr<TestClass> testClassSp(new TestClass());
auto delegateMemberSpAsync = 
    MakeDelegate(testClassSp, &TestClass::MemberFuncStdString, &workerThread1);
delegateMemberSpAsync
   ("Function async invoked using smart pointer. Bug solved!", 2016);
delegateMemberSpAsync.Clear();
testClassSp.reset();

实际上,这种技术可以用来调用对象函数,然后对象在回调发生后自动删除。使用上述示例,创建共享指针实例,绑定委托,然后调用委托。现在 testClassSp 可以超出作用域,TestClass::MemberFuncStdString 仍然会在 workerThread1 上安全地调用。在回调完成后,通过 std::shared_ptr<TestClass>TestClass 实例的引用计数变为 0 后会自动删除,无需任何额外的程序员干预。

std::shared_ptr<TestClass> testClassSp(new TestClass());
auto delegateMemberSpAsync = 
    MakeDelegate(testClassSp, &TestClass::MemberFuncStdString, &workerThread1);
delegateMemberSpAsync("testClassSp deletes after delegate invokes", 2016);

异步阻塞委托

阻塞委托会一直等待,直到目标线程执行绑定的委托函数。与非阻塞委托不同,阻塞版本不会将参数数据复制到堆上。它们还允许函数返回类型为 void 以外的类型,而非阻塞委托只能绑定到返回 void 的函数。由于函数参数未修改地传递给目标线程,因此函数的执行方式与您期望的同步版本完全相同,包括传入/传出指针和引用。

按指针/引用传递的堆栈参数不需要是线程安全的。原因是调用线程会阻塞,等待目标线程完成。这意味着委托实现保证只有一个线程可以访问堆栈分配的参数数据。

阻塞委托必须指定一个以毫秒为单位的超时或 WAIT_INFINITE。与保证调用的非阻塞异步委托不同,如果阻塞委托的超时过期,则不会调用该函数。使用 IsSuccess() 来确定委托是否成功。

将超时作为 MakeDelegate() 的最后一个参数,将返回一个 DelegateFreeAsyncWaitX<>DelegateMemberAsyncWaitX<> 实例,具体取决于绑定的是自由函数还是成员函数。“Wait”委托通常不添加到委托容器中。典型的使用模式是在堆栈上创建委托和函数参数,然后调用。下面的代码片段创建了一个阻塞委托,其函数签名为 int (std::string&)。该函数在 workerThread1 上调用。函数 MemberFuncStdStringRetInt() 将更新传出字符串 msg 并向调用者返回一个整数。

DelegateMemberAsyncWait1<TestClass, std::string&, int> delegateI =
              MakeDelegate(&testClass, &TestClass::MemberFuncStdStringRetInt, 
                           &workerThread1, WAIT_INFINITE);
std::string msg;
int year = delegateI(msg);
if (delegateI.IsSuccess())
    cout << msg.c_str() << " " << year << endl;

使用 auto 关键字与委托可以大大简化语法。

auto delegateI = 
    MakeDelegate(&testClass, &TestClass::MemberFuncStdStringRetInt, 
                 &workerThread1, WAIT_INFINITE);
std::string msg;
int year = delegateI(msg);
if (delegateI.IsSuccess())
    cout << msg.c_str() << " " << year << endl;

异步 Lambda 调用

委托可以异步调用非捕获 Lambda 函数。下面的示例在 workerThread1 上调用 LambdaFunc1

auto LambdaFunc1 = +[](int i) -> int
{
    cout << "Called LambdaFunc1 " << i << std::endl;
    return ++i;
};

// Asynchronously invoke lambda on workerThread1 and wait for the return value
auto lambdaDelegate1 = MakeDelegate(LambdaFunc1, &workerThread1, WAIT_INFINITE);
int lambdaRetVal2 = lambdaDelegate1(123);

委托是可调用的,因此可以传递给标准库。下面的示例展示了 CountLambda 通过 std::count_ifworkerThread1 上异步执行。

    std::vector<int> v{ 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    auto CountLambda = +[](int v) -> int
    {
        return v > 2 && v <= 6;
    };
    auto countLambdaDelegate = MakeDelegate(CountLambda, &workerThread1, WAIT_INFINITE);

    const auto valAsyncResult = std::count_if(v.begin(), v.end(),
        countLambdaDelegate);
    cout << "Asynchronous lambda result: " << valAsyncResult << endl;

委托库

委托库包含许多类。单个 include 文件 DelegateLib.h 提供了对所有委托库功能的访问。DelegateOpt.h 中的定义设置了库选项。该库被包装在 DelegateLib 命名空间中。包含的单元测试有助于确保稳健的实现。下表显示了委托类层次结构。

  • DelegateBase
  • Delegate0<>
  • DelegateFree0<>
  • DelegateFreeAsync0<>
  • DelegateFreeAsyncWaitBase0<>
  • DelegateFreeAsyncWait0<>
  • DelegateMember0<>
  • DelegateMemberAsync0<>
  • DelegateMemberAsyncWaitBase0<>
  • DelegateMemberAsyncWait0<>
  • DelegateMemberSp0<>
  • DelegateMemberSpAsync0<>
  • Delegate1<>
  • DelegateFree1<>
  • DelegateFreeAsync1<>
  • DelegateFreeAsyncWaitBase0<>
  • DelegateFreeAsyncWait0<>
  • DelegateMember1<>
  • DelegateMemberAsync1<>
  • DelegateMemberAsyncWaitBase1<>
  • DelegateMemberAsyncWait1<>
  • DelegateMemberSp1<>
  • DelegateMemberSpAsync1<>

等等...

在接下来的讨论中,我将使用委托的单参数版本。

DelegateBase 是所有委托实例共有的非模板抽象基类。比较运算符和 Clone() 方法定义了接口。

class DelegateBase {
public:
      virtual ~DelegateBase() {}

      /// Derived class must implement operator== to compare objects.
      virtual bool operator==(const DelegateBase& rhs) const = 0;
      virtual bool operator!=(const DelegateBase& rhs) { return !(*this == rhs); }


      /// Use Clone to provide a deep copy using a base pointer. Covariant
      /// overloading is used so that a Clone() method return type is a
      /// more specific type in the derived class implementations.
      /// @return A dynamic copy of this instance created with operator new.
      /// @post The caller is responsible for deleting the clone instance.
      virtual DelegateBase* Clone() const = 0;
};

Delegate1<> 提供了一个带有模板化函数参数的模板类。operator() 函数允许使用正确的函数参数调用委托函数。Clone() 的协变重载提供了一个更具体的返回类型。

委托容器类需要 Clone() 函数。委托容器需要复制委托以存储到列表中。由于委托容器只知道抽象基类 Delegate1<> 实例,因此在创建副本时必须使用 Clone() 函数。

template <class Param1, class RetType=void>
class Delegate1 : public DelegateBase {
public:
      virtual RetType operator()(Param1 p1) const = 0;
      virtual Delegate1<Param1, RetType>* Clone() const = 0;
};

将实例成员函数和自由函数有效地存储在同一个类中被证明是困难的。相反,为每种类型的绑定函数创建了两个类。DelegateMember1<> 处理实例成员函数。DelegateFree1<> 处理自由函数和静态函数。

DelegateMember1<> 绑定到实例成员函数。在继承自 Delegate1<> 基类所需的 Param1RetType 之上,添加了一个 TClass 模板参数。TClass 是绑定到实例成员函数所必需的。然而,委托容器无法知道 TClass。容器列表只能存储 DelegateMember1<>DelegateFree1<> 的最常见祖先,即 Delegate1<> 接口。

Clone() 创建类的新实例。Bind() 接受一个类实例和一个成员函数指针。operator() 函数允许调用通过 Bind() 分配的委托函数。

template <class TClass, class Param1, class RetType=void> 
class DelegateMember1 : public Delegate1<Param1, RetType> {
public:
    typedef TClass* ObjectPtr;
    typedef RetType (TClass::*MemberFunc)(Param1); 
    typedef RetType (TClass::*ConstMemberFunc)(Param1) const; 

    DelegateMember1(ObjectPtr object, MemberFunc func) { Bind(object, func); }
    DelegateMember1(ObjectPtr object, ConstMemberFunc func) { Bind(object, func);    }
    DelegateMember1() :    m_object(0), m_func(0) { }

    /// Bind a member function to a delegate. 
    void Bind(ObjectPtr object, MemberFunc func) {
        m_object = object;
        m_func = func; }

    /// Bind a const member function to a delegate. 
    void Bind(ObjectPtr object, ConstMemberFunc func)    {
        m_object = object;
        m_func = reinterpret_cast<MemberFunc>(func); }

    virtual DelegateMember1* Clone() const { return new DelegateMember1(*this); }

    // Invoke the bound delegate function
    virtual RetType operator()(Param1 p1) {
        return (*m_object.*m_func)(p1); }

    virtual bool operator==(const DelegateBase& rhs) const     {
        const DelegateMember1<TClass, Param1, RetType>* derivedRhs = 
            dynamic_cast<const DelegateMember1<TClass, Param1, RetType>*>(&rhs);
        return derivedRhs &&
            m_func == derivedRhs->m_func && 
            m_object == derivedRhs->m_object; }

    bool Empty() const { return !(m_object && m_func); }
    void Clear() { m_object = 0; m_func = 0; }

    // New-school safe bool
    explicit operator bool() const { return !Empty();  }

private:
    ObjectPtr m_object;      // Pointer to a class object
    MemberFunc m_func;       // Pointer to an instance member function
};

DelegateFree1<> 绑定到自由函数或静态成员函数。请注意,它像 DelegateMember1<> 一样继承自 Delegate1<>Bind() 接受一个函数指针,operator() 允许随后调用绑定的函数。

template <class Param1, class RetType=void> 
class DelegateFree1 : public Delegate1<Param1, RetType> {
public:
    typedef RetType (*FreeFunc)(Param1); 

    DelegateFree1(FreeFunc func) { Bind(func); }
    DelegateFree1() : m_func(0) { }

    /// Bind a free function to the delegate.
    void Bind(FreeFunc func) { m_func = func; }

    virtual DelegateFree1* Clone() const { return new DelegateFree1(*this); }

    /// Invoke the bound delegate function. 
    virtual RetType operator()(Param1 p1) {
        return (*m_func)(p1); }

    virtual bool operator==(const DelegateBase& rhs) const {
        const DelegateFree1<Param1, RetType>* derivedRhs = 
            dynamic_cast<const DelegateFree1<Param1, RetType>*>(&rhs);
        return derivedRhs &&
            m_func == derivedRhs->m_func; }

    bool Empty() const { return !m_func; }
    void Clear() { m_func = 0; }

    // New-school safe bool
    explicit operator bool() const { return !Empty();  }

private:
    FreeFunc m_func;        // Pointer to a free function
};

DelegateMemberAsync1<> 是委托的非阻塞异步版本,允许在客户端指定的控制线程上调用。operator() 函数实际上并不调用目标函数,而是将委托和所有函数参数打包到堆中的 DelegateMsg1<> 实例中,以便使用 DispatchDelegate() 通过消息队列发送。

template <class TClass, class Param1> 
class DelegateMemberAsync1 : public DelegateMember1<TClass, Param1>, 
      public IDelegateInvoker {
public:
    // Constructors take a class instance, member function, and callback thread
    DelegateMemberAsync1(ObjectPtr object, MemberFunc func, DelegateThread* thread) { 
        Bind(object, func, thread); }
    DelegateMemberAsync1(ObjectPtr object, ConstMemberFunc func, 
                         DelegateThread* thread) { 
        Bind(object, func, thread); }
    DelegateMemberAsync1() : m_thread(0) { }

    /// Bind a member function to a delegate. 
    void Bind(ObjectPtr object, MemberFunc func, DelegateThread* thread) {
        m_thread = thread; 
        DelegateMember1<TClass, Param1>::Bind(object, func); }

    /// Bind a const member function to a delegate. 
    void Bind(ObjectPtr object, ConstMemberFunc func, DelegateThread* thread)    {
        m_thread = thread;
        DelegateMember1<TClass, Param1>::Bind(object, func); }

    virtual DelegateMemberAsync1<TClass, Param1>* Clone() const {
        return new DelegateMemberAsync1<TClass, Param1>(*this); }

    virtual bool operator==(const DelegateBase& rhs) const     {
        const DelegateMemberAsync1<TClass, Param1>* derivedRhs = 
            dynamic_cast<const DelegateMemberAsync1<TClass, Param1>*>(&rhs);
        return derivedRhs &&
            m_thread == derivedRhs->m_thread && 
            DelegateMember1<TClass, Param1>::operator == (rhs); }

    /// Invoke delegate function asynchronously
    virtual void operator()(Param1 p1) {
        if (m_thread == 0)
            DelegateMember1<TClass, Param1>::operator()(p1);
        else
        {
            // Create a new instance of the function argument data and copy
            Param1 heapParam1 = DelegateParam<Param1>::New(p1);

            // Create a clone instance of this delegate 
            DelegateMemberAsync1<TClass, Param1>* delegate = Clone();

            // Create a new message instance 
            DelegateMsg1<Param1>* msg = new DelegateMsg1<Param1>(delegate, heapParam1);

            // Dispatch message onto the callback destination thread. DelegateInvoke()
            // will be called by the target thread. 
            m_thread->DispatchDelegate(msg);
        }
    }

    /// Called by the target thread to invoke the delegate function 
    virtual void DelegateInvoke(DelegateMsgBase** msg) {
        // Typecast the base pointer to back to the templatized instance
        DelegateMsg1<Param1>* delegateMsg = static_cast<DelegateMsg1<Param1>*>(*msg);

        // Get the function parameter data
        Param1 param1 = delegateMsg->GetParam1();

        // Invoke the delegate function
        DelegateMember1<TClass, Param1>::operator()(param1);

        // Delete heap data created inside operator()
        DelegateParam<Param1>::Delete(param1);
        delete *msg;
        *msg = 0;

        // Do this last before returning!
        delete this;
    }

private:
    /// Target thread to invoke the delegate function
    DelegateThread* m_thread;
};

参数有不同的类型:按值、按引用、指针和指向指针的指针。对于非阻塞委托,除了按值传递之外,任何数据都需要在堆上创建,以确保数据在目标线程上有效。能够将每个参数保存到 DelegateMsg1<> 中的关键是 DelegateParam<> 类,如下面的 operator() 函数中所用。

/// Invoke delegate function asynchronously
virtual void operator()(Param1 p1) {
    if (m_thread == 0)
        DelegateMember1<TClass, Param1>::operator()(p1);
    else
    {
        // Create a new instance of the function argument data and copy
        Param1 heapParam1 = DelegateParam<Param1>::New(p1);

        // Create a clone instance of this delegate 
        DelegateMemberAsync1<TClass, Param1>* delegate = Clone();

        // Create a new message instance 
        DelegateMsg1<Param1>* msg = new DelegateMsg1<Param1>(delegate, heapParam1);

        // Dispatch message onto the callback destination thread. DelegateInvoke()
        // will be called by the target thread. 
        m_thread->DispatchDelegate(msg);
    }
}

DelegateMemberSpAsync1<> 是一个非阻塞异步委托,它绑定到 std::shared_ptr 而不是原始对象指针。实现与非 Sp 版本相同,只是 TClass* 的所有位置都被替换为 std::shared_ptr<TClass>

template <class TClass, class Param1, class RetType=void> 
class DelegateMemberSp1 : public Delegate1<Param1, RetType> {
public:
    typedef std::shared_ptr<TClass> ObjectPtr;
    typedef RetType (TClass::*MemberFunc)(Param1); 
    typedef RetType (TClass::*ConstMemberFunc)(Param1) const; 

    DelegateMemberSp1(ObjectPtr object, MemberFunc func) { Bind(object, func); }
    DelegateMemberSp1(ObjectPtr object, ConstMemberFunc func) { Bind(object, func); }
    DelegateMemberSp1() : m_object(0), m_func(0) { }

etc...

DelegateMemberAsyncWait1<> 是一个阻塞式异步委托,绑定到类实例成员函数。DelegateMemberAsyncWait1<> 具有 void 返回类型的模板特化,因此支持带返回值和不带返回值的函数。特化共享一个公共基类 DelegateMemberAsyncWaitBase1<>。下面显示了两个关键函数。请注意,该实现需要一个信号量来阻塞调用线程,以及一个软件锁来保护共享数据。

/// Invoke delegate function asynchronously
virtual RetType operator()(Param1 p1) {
    if (this->m_thread == 0)
        return DelegateMemberAsyncWaitBase1<TClass, Param1, RetType>::operator()(p1);
    else {
        // Create a clone instance of this delegate 
        DelegateMemberAsyncWait1<TClass, Param1, RetType>* delegate = Clone();
        delegate->m_refCnt = 2;
        delegate->m_sema.Create();
        delegate->m_sema.Reset();

        // Create a new message instance 
        DelegateMsg1<Param1>* msg = new DelegateMsg1<Param1>(delegate, p1);

        // Dispatch message onto the callback destination thread. DelegateInvoke()
        // will be called by the target thread. 
        this->m_thread->DispatchDelegate(msg);

        // Wait for target thread to execute the delegate function
        if ((this->m_success = delegate->m_sema.Wait(this->m_timeout)))
            m_retVal = delegate->m_retVal;

        bool deleteData = false;
        {
            LockGuard lockGuard(&delegate->m_lock);
            if (--delegate->m_refCnt == 0)
                deleteData = true;
        }
        if (deleteData) {
            delete msg;
            delete delegate;
        }
        return m_retVal;
    }
}

/// Called by the target thread to invoke the delegate function 
virtual void DelegateInvoke(DelegateMsgBase** msg) {
    bool deleteData = false;
    {
        // Typecast the base pointer to back to the templatized instance
        DelegateMsg1<Param1>* delegateMsg = static_cast<DelegateMsg1<Param1>*>(*msg);

        // Get the function parameter data
        Param1 param1 = delegateMsg->GetParam1();

        LockGuard lockGuard(&this->m_lock);
        if (this->m_refCnt == 2) {
            // Invoke the delegate function then signal the waiting thread
            m_retVal = 
              DelegateMemberAsyncWaitBase1
              <TClass, Param1, RetType>::operator()(param1);
            this->m_sema.Signal();
        }

        // If waiting thread is no longer waiting then delete heap data
        if (--this->m_refCnt == 0)
            deleteData = true;
    }
    if (deleteData) {
        delete *msg;
        *msg = 0;
        delete this;
    }
}

参数堆复制

非阻塞异步调用意味着所有参数数据都必须复制到堆中,以便传输到目标线程。DelegateParam<> 类用于 new/delete 参数。模板特化用于根据参数类型定义不同版本的 DelegateParam<>:按值传递、引用、指针、指向指针的指针。下面的代码片段显示了如何使用它在堆上复制函数参数 p1

// Create a new instance of the function argument data and copy
Param1 heapParam1 = DelegateParam<Param1>::New(p1);

上面实际调用的 New() 函数实现取决于 Param1 参数类型。按值传递将调用下面所示的模板版本。它实际上不在堆上创建任何东西,而是直接返回调用者的输入值。原因是按值传递不需要堆复制,因为它本身已经是复制。因此,Delete() 不做任何事情,因为没有要删除的数据。

template <typename Param>
class DelegateParam
{
public:
      static Param New(Param param) { return param; }
      static void Delete(Param param) { }
};

下面的 DelegateParam<Param *> 模板特化处理所有指针类型参数。与按值传递不同,指针指向某物。该某物必须在堆上创建,以便目标线程在回调调用时拥有完整的副本。New() 创建副本,Delete() 删除它。

template <typename Param>
class DelegateParam<Param *>
{
public:
      static Param* New(Param* param) {
            Param* newParam = new Param(*param);
            return newParam;
      }


      static void Delete(Param* param) {
            delete param;
      }
};

同样,还有处理引用和指针到指针的模板特化。这样,无论参数类型如何,delegate 库都能以一致且正确的方式运行,而无需用户了解或付出特殊努力。

绕过参数堆复制

有时,您可能不希望 delegate 库复制您的指针/引用参数。相反,您只是希望目标线程拥有指向原始副本的指针。也许对象很大或无法复制。或者它是一个保证存在的 static 实例。无论哪种方式,这里是如何真正发送指针而不复制所指向的对象的方法。

诀窍是为您的特定类/结构定义一个 DelegateParam<> 模板特化。在下面的示例中,结构 TestStructNoCopy 将不会被委托库复制。New() 函数只返回原始指针,而 Delete() 什么也不做。现在,任何 TestStructNoCopy* delegate 函数参数都将使用您的 New()/Delete(),而不是库的默认实现。

template <>
class DelegateParam<TestStructNoCopy *>
{
public:
      static TestStructNoCopy* New(TestStructNoCopy* param) { return param; }
      static void Delete(TestStructNoCopy* param) {}
};

使用此技术意味着您传递的指针在目标线程实际调用回调函数时必须存在。此外,如果多个线程正在访问该实例,则类中的代码需要是线程安全的。

在阻塞委托上不需要此方法,因为参数不会被复制。

数组参数堆复制

根据 C 标准,数组函数参数会调整为指针。简而言之,任何声明为 T a[]T a[N] 的函数参数都被视为声明为 T *a。这意味着默认情况下,对于数组类型参数,将调用委托库 DelegateParam<Param *>。由于数组大小未知,DelegateParam<Param *> 将只复制第一个数组元素,这肯定不是预期或希望的结果。例如,下面的函数

void ArrayFunc(char a[]) {}

需要一个委托参数 char*,因为 char a[] 被“调整”为 char *a

MulticastDelegateSafe1<char*> delegateArrayFunc;
delegateArrayFunc += MakeDelegate(&ArrayFunc, &workerThread1);

无法异步按值传递 C 风格数组。最好的方法是使用前面描述的模板特化技术按指针传递数组。下面的类将每个 char*char a[]char a[N] 作为 char* 传递,数组指针将传递给被调用的函数,而无需尝试复制。请记住,您有责任确保指针在目标线程上保持有效。

template <>
class DelegateParam<char *>
{
public:
      static char* New(char* param) { return param; }
      static void Delete(char* param) {}
};

我的建议是,如果可能,在使用异步委托时避免使用 C 风格数组,以避免混淆和错误。

工作线程 (Win32)

operator() 函数完成并且 DelegateMsg1<> 被放入消息队列后,WorkerThread::Process() 最终将在目标线程上调用 DelegateInvoke()。下面的 Win32 线程循环代码来自 WorkerThreadWin.cpp/h

unsigned long WorkerThread::Process(void* parameter)
{
    MSG msg;
    BOOL bRet;

    // Start periodic timer
    MMRESULT timerId = timeSetEvent(100, 10, &WorkerThread::TimerExpired, 
                       reinterpret_cast<DWORD>(this), TIME_PERIODIC);

    while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
    {
        switch (msg.message)
        {
            case WM_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg.wParam != NULL);

                // Get the ThreadMsg from the wParam value
                ThreadMsg* threadMsg = reinterpret_cast<ThreadMsg*>(msg.wParam);

                // Convert the ThreadMsg void* data back to a DelegateMsg* 
                DelegateMsgBase* delegateMsg = static_cast<DelegateMsgBase*>
                                               (threadMsg->GetData()); 

                // Invoke the callback on the target thread
                delegateMsg->GetDelegateInvoker()->DelegateInvoke(&delegateMsg);

                // Delete dynamic data passed through message queue
                delete threadMsg;
                break;
            }

            case WM_USER_TIMER:
                Timer::ProcessTimers();
                break;

            case WM_EXIT_THREAD:
                timeKillEvent(timerId);
                return 0;

            default:
                ASSERT();
        }
    }
    return 0;
}

请注意,线程循环与大多数系统不同,后者通常有一个庞大的 switch 语句来处理各种传入数据消息,对 void* 数据进行类型转换,然后调用特定函数。该框架通过单个 WM_DISPATCH_DELEGATE 消息支持所有委托调用。一旦设置好,相同的微小线程循环处理每个委托。新的异步委托随着系统的设计而出现和消失,但中间的代码不会改变。

这是一个巨大的优势,因为在许多系统中,线程之间的数据传输需要大量手动步骤。您必须不断地修改每个线程循环,在发送时创建数据,在接收时销毁数据,并调用各种操作系统服务和类型转换。在这里,您无需执行这些操作。所有中间的东西都为用户整齐地处理了。

DelegateMemberAsync1<>::DelegateInvoke() 函数调用目标函数并删除通过消息队列在 DelegateMsg1<> 内部传输的数据。委托在返回之前删除所有堆数据和自身。

virtual void DelegateInvoke(DelegateMsgBase** msg) const {
        // Typecast the base pointer to back to the templatized instance
        DelegateMsg1<Param1>* delegateMsg = static_cast<DelegateMsg1<Param1>*>(*msg);

        // Get the function parameter data
        Param1 param1 = delegateMsg->GetParam1();

        // Invoke the delegate function
        DelegateMember1<TClass, Param1, RetType>::operator()(param1);

        // Delete heap data created inside operator()
        DelegateParam<Param1>::Delete(param1);
        delete *msg;
        *msg = 0;

        // Do this last before returning!
        delete this;
}

工作线程 (std::thread)

除了 Win32 API 之外,还包含了一个使用 std::thread 类的替代实现。任何支持 std::thread 的 C++11 编译器都能够构建和使用委托库。在 DelegateOpt.h 中,定义 USE_STD_THREADS 而不是 USE_WIN32_THREADS 以使用 WorkerThreadStd.cpp/h 中包含的 WorkerThread 类。LockGuardSemaphore 类也经过条件编译,以使用 C++ 标准库而不是 Win32 API。std::thread 实现的线程循环如下所示:

void WorkerThread::Process()
{
    m_timerExit = false;
    std::thread timerThread(&WorkerThread::TimerThread, this);

    while (1)
    {
        std::shared_ptr<ThreadMsg> msg;
        {
            // Wait for a message to be added to the queue
            std::unique_lock<std::mutex> lk(m_mutex);
            while (m_queue.empty())
                m_cv.wait(lk);

            if (m_queue.empty())
                continue;

            msg = m_queue.front();
            m_queue.pop();
        }

        switch (msg->GetId())
        {
            case MSG_DISPATCH_DELEGATE:
            {
                ASSERT_TRUE(msg->GetData() != NULL);

                // Convert the ThreadMsg void* data back to a DelegateMsg* 
                auto delegateMsg = static_cast<DelegateMsgBase*>(msg->GetData());

                // Invoke the callback on the target thread
                delegateMsg->GetDelegateInvoker()->DelegateInvoke(&delegateMsg);
                break;
            }

            case MSG_TIMER:
                Timer::ProcessTimers();
                break;

            case MSG_EXIT_THREAD:
            {
                m_timerExit = true;
                timerThread.join();
                return;
            }

            default:
                ASSERT();
        }
    }
}

委托调用

绑定的委托函数通过函数 operator() 调用。从容器中调用委托时,需要三个函数调用。委托容器中的一个非虚 operator(),委托上的第二个 virtual operator(),最后调用绑定的函数。

对于多播委托,容器的 operator() 函数遍历列表,调用每个委托的 operator()。请注意,在多播委托容器中执行委托函数时没有返回值。

template<typename Param1>
class MulticastDelegate1 : public MulticastDelegateBase
{
public:
    void operator()(Param1 p1) {
        InvocationNode* node = GetInvocationHead();
        while (node != 0) {
            const Delegate1<Param1, void>* delegate = 
                static_cast<const Delegate1<Param1, void>*>(node->Delegate);
            (*delegate)(p1);    // Invoke delegate callback
            node = node->Next;
        }
    }
};

对于单播委托,委托容器的 operator() 只是调用委托的 operator()。请注意,使用单播容器允许有返回值。

RetType operator()(Param p1) {
      return (*m_delegate)(p1); } // Invoke delegate function

容器对具体的委托类型没有感知。如果容器中存储的委托是同步版本,则同步调用绑定的函数。当异步委托被调用时,委托和参数通过消息队列发送到目标线程。

当使用没有容器的委托时,调用从委托的 virtual operator() 转到绑定的函数。下面的代码片段演示了如何调用实例成员函数。

// Invoke the bound delegate function
virtual RetType operator()(Param1 p1) const {
      return (*m_object.*m_func)(p1); }

获取委托的最大速度并不是优先事项,尤其是当涉及到消息队列时;一些额外的指令可能不会有太大影响。一个符合标准的、能够派生额外功能的委托胜过任何此类优化。

委托容器

委托容器存储一个或多个委托。委托容器层次结构如下所示:

  • MulticastDelegateBase
  • MulticastDelegate0
  • MulticastDelegateSafe0
  • MulticastDelegate1<>
  • MulticastDelegateSafe1<>

等等...

  • SinglecastDelegate0<>
  • SinglecastDelegate1<>

等等...

MulticastDelegateBase 提供了一个非模板基类,用于在列表中存储非模板 DelegateBase 实例。operator+= 将委托添加到列表中,operator-= 将其删除。

class MulticastDelegateBase
{
public:
    /// Constructor
    MulticastDelegateBase() : m_invocationHead(0) {}

    /// Destructor
    virtual ~MulticastDelegateBase() { Clear(); }

    /// Any registered delegates?
    bool Empty() const { return !m_invocationHead; }

    /// Removal all registered delegates.
    void Clear();

protected:
    struct InvocationNode
    {
        InvocationNode() : Next(0), Delegate(0) { }
        InvocationNode* Next;
        DelegateBase* Delegate;
    };

    /// Insert a delegate into the invocation list. A delegate argument 
    /// pointer is not stored. Instead, the DelegateBase derived object is 
    /// copied (cloned) and saved in the invocation list.
    /// @param[in] delegate - a delegate to register. 
    void operator+=(DelegateBase& delegate);

    /// Remove a delegate previously registered delegate from the invocation
    /// list. 
    /// @param[in] delegate - a delegate to unregister. 
    void operator-=(DelegateBase& delegate);

...

MulticastDelegate1<> 提供了函数 operator(),用于顺序调用列表中每个委托。需要进行简单的类型转换,才能将 DelegateBase 类型转换回更具体的 Delegate1<> 实例。

template<typename Param1>
class MulticastDelegate1 : public MulticastDelegateBase
{
public:
    MulticastDelegate1() { }
    void operator()(Param1 p1) {
        InvocationNode* node = GetInvocationHead();
        while (node != 0) {
            Delegate1<Param1>* delegate = 
                static_cast<Delegate1<Param1>*>(node->Delegate);
            (*delegate)(p1);    // Invoke delegate callback
            node = node->Next;
        }
    }
    void operator+=(Delegate1<Param1>& delegate) 
                   { MulticastDelegateBase::operator+=(delegate); }
    void operator-=(Delegate1<Param1>& delegate) 
                   { MulticastDelegateBase::operator-=(delegate); }

private:
    // Prevent copying objects
    MulticastDelegate1(const MulticastDelegate1&);
    MulticastDelegate1& operator=(const MulticastDelegate1&);
};

MulticastDelegateSafe1<> 提供了一个围绕委托 API 的线程安全封装。每个函数都提供一个锁保护,以防止同时访问。锁使用了资源获取即初始化 (RAII) 技术。

template<typename Param1>
class MulticastDelegateSafe1 : public MulticastDelegate1<Param1>
{
public:
    MulticastDelegateSafe1() { LockGuard::Create(&m_lock); }
    ~MulticastDelegateSafe1() { LockGuard::Destroy(&m_lock); }

    void operator+=(Delegate1<Param1>& delegate) { 
        LockGuard lockGuard(&m_lock);
        MulticastDelegate1<Param1>::operator +=(delegate); 
    }
    void operator-=(Delegate1<Param1>& delegate)    { 
        LockGuard lockGuard(&m_lock);
        MulticastDelegate1<Param1>::operator -=(delegate); 
    }
    void operator()(Param1 p1) {
        LockGuard lockGuard(&m_lock);
        MulticastDelegate1<Param1>::operator ()(p1); 
    }
    bool Empty() { 
        LockGuard lockGuard(&m_lock);
        return MulticastDelegate1<Param1>::Empty();
    }
    void Clear() {
        LockGuard lockGuard(&m_lock);
        return MulticastDelegate1<Param1>::Clear();
    }
    explicit operator bool() {
        LockGuard lockGuard(&m_lock);
        return MulticastDelegateBase::operator bool();
    }

private:
    // Prevent copying objects
    MulticastDelegateSafe1(const MulticastDelegateSafe1&);
    MulticastDelegateSafe1& operator=(const MulticastDelegateSafe1&);

    /// Lock to make the class thread-safe
    LOCK m_lock;
};

SysData 示例

一些实际示例将演示常见的委托使用模式。首先,SysData 是一个简单的类,展示了如何公开一个传出的异步接口。该类存储系统数据,并在模式更改时提供异步订阅者通知。类接口如下所示:

class SysData
{
public:
      /// Clients register with MulticastDelegateSafe1 
      /// to get callbacks when system mode changes
      MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;

      /// Get singleton instance of this class
      static SysData& GetInstance();

      /// Sets the system mode and notify registered clients 
      /// via SystemModeChangedDelegate.
      /// @param[in] systemMode - the new system mode.
      void SetSystemMode(SystemMode::Type systemMode);     

private:
      SysData();
      ~SysData();

      /// The current system mode data
      SystemMode::Type m_systemMode;

      /// Lock to make the class thread-safe
      LOCK m_lock;
};

用于接收回调的订阅者接口是 SystemModeChangedDelegate。调用 SetSystemMode() 将新模式保存到 m_systemMode 中,并通知所有已注册的订阅者。

void SysData::SetSystemMode(SystemMode::Type systemMode)
{
      LockGuard lockGuard(&m_lock);

      // Create the callback data
      SystemModeChanged callbackData;
      callbackData.PreviousSystemMode = m_systemMode;
      callbackData.CurrentSystemMode = systemMode;

      // Update the system mode
      m_systemMode = systemMode;

      // Callback all registered subscribers
      if (SystemModeChangedDelegate)
            SystemModeChangedDelegate(callbackData);
}

SysDataClient 示例

SysDataClient 是一个委托订阅者,并在构造函数中注册 SysData::SystemModeChangedDelegate 通知。

// Constructor
SysDataClient() :
     m_numberOfCallbacks(0)
{
     // Register for async delegate callbacks
     SysData::GetInstance().SystemModeChangedDelegate += 
           MakeDelegate(this, &SysDataClient::CallbackFunction, &workerThread1);
     SysDataNoLock::GetInstance().SystemModeChangedDelegate += 
           MakeDelegate(this, &SysDataClient::CallbackFunction, &workerThread1);
}

当系统模式改变时,SysDataClient::CallbackFunction() 现在在 workerThread1 上被调用。

void CallbackFunction(const SystemModeChanged& data)
{
    m_numberOfCallbacks++;
    cout << "CallbackFunction " << data.CurrentSystemMode << endl;
}

当调用 SetSystemMode() 时,所有对模式更改感兴趣的订阅者都会收到通知,具体是同步还是异步取决于注册的委托类型。

// Set new SystemMode values. Each call will invoke callbacks to all
// registered client subscribers.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);

SysDataNoLock 示例

SysDataNoLock 是一个替代实现,它使用一个 private MulticastDelegateSafe1<> 来异步且无需锁地设置系统模式。

class SysDataNoLock
{
public:
    /// Clients register with MulticastDelegateSafe1 to get callbacks 
    /// when system mode changes
    MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;

    /// Get singleton instance of this class
    static SysDataNoLock& GetInstance();

    /// Sets the system mode and 
    /// notify registered clients via SystemModeChangedDelegate.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemMode(SystemMode::Type systemMode);    

    /// Sets the system mode and notify registered clients 
    /// via a temporary stack created
    /// asynchronous delegate. 
    /// @param[in] systemMode - The new system mode. 
    void SetSystemModeAsyncAPI(SystemMode::Type systemMode);    

    /// Sets the system mode and notify registered clients 
    /// via a temporary stack created asynchronous delegate. 
    /// This version blocks (waits) until the delegate callback
    /// is invoked and returns the previous system mode value. 
    /// @param[in] systemMode - The new system mode. 
    /// @return The previous system mode. 
    SystemMode::Type SetSystemModeAsyncWaitAPI(SystemMode::Type systemMode);

private:
    SysDataNoLock();
    ~SysDataNoLock();

    /// Private callback to get the SetSystemMode call onto a common thread
    MulticastDelegateSafe1<SystemMode::Type> SetSystemModeDelegate;

    /// Sets the system mode and notify registered clients via SystemModeChangedDelegate.
    /// @param[in] systemMode - the new system mode. 
    void SetSystemModePrivate(SystemMode::Type);    

    /// The current system mode data
    SystemMode::Type m_systemMode;
};

构造函数将 SetSystemModePrivate() 注册到 private SetSystemModeDelegate

SysDataNoLock::SysDataNoLock() :
      m_systemMode(SystemMode::STARTING)
{
      SetSystemModeDelegate += 
           MakeDelegate(this, &SysDataNoLock::SetSystemModePrivate, &workerThread2);
      workerThread2.CreateThread();
}

下面的 SetSystemMode() 函数是异步传入接口的一个例子。对调用者而言,它看起来像一个普通函数,但底层通过委托异步调用了一个私有成员函数。在这种情况下,调用 SetSystemModeDelegate 会导致在 workerThread2 上调用 SetSystemModePrivate()

void SysDataNoLock::SetSystemMode(SystemMode::Type systemMode)
{
      // Invoke the private callback. SetSystemModePrivate() 
      // will be called on workerThread2.
      SetSystemModeDelegate(systemMode);
}

由于这个 private 函数始终在 workerThread2 上异步调用,因此它不需要锁。

void SysDataNoLock::SetSystemModePrivate(SystemMode::Type systemMode)
{
      // Create the callback data
      SystemModeChanged callbackData;
      callbackData.PreviousSystemMode = m_systemMode;
      callbackData.CurrentSystemMode = systemMode;

      // Update the system mode
      m_systemMode = systemMode;

      // Callback all registered subscribers
      if (SystemModeChangedDelegate)
            SystemModeChangedDelegate(callbackData);
}

SysDataNoLock 重入示例

虽然创建单独的 private 函数来创建异步 API 确实可行,但使用委托,只需在不同的线程上重新调用相同的函数即可。执行一个简单的检查,判断调用者是否在期望的控制线程上执行。如果不是,则在堆栈上创建一个临时异步委托并调用它。委托和所有调用者的原始函数参数都在堆上复制,并且函数在 workerThread2 上重新调用。这是一种以最小的工作量创建异步 API 的优雅方式。

void SysDataNoLock::SetSystemModeAsyncAPI(SystemMode::Type systemMode)
{
    // Is the caller executing on workerThread2?
    if (workerThread2.GetThreadId() != WorkerThread::GetCurrentThreadId())
    {
        // Create an asynchronous delegate and 
        // re-invoke the function call on workerThread2
        auto delegate = 
             MakeDelegate(this, &SysDataNoLock::SetSystemModeAsyncAPI, &workerThread2);
        delegate(systemMode);
        return;
    }

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedDelegate)
        SystemModeChangedDelegate(callbackData);
}

SysDataNoLock 阻塞式重入示例

阻塞式异步 API 可以隐藏在类成员函数内部。下面的函数在 workerThread2 上设置当前模式并返回之前的模式。如果调用者不在 workerThread2 上执行,则在堆栈上创建一个阻塞式委托并调用它。对调用者来说,该函数是同步的,但委托确保在返回之前在适当的线程上执行调用。

SystemMode::Type SysDataNoLock::SetSystemModeAsyncWaitAPI(SystemMode::Type systemMode)
{
    // Is the caller executing on workerThread2?
    if (workerThread2.GetThreadId() != WorkerThread::GetCurrentThreadId())
    {
        // Create an asynchronous delegate and 
        // re-invoke the function call on workerThread2
        auto delegate = 
             MakeDelegate(this, &SysDataNoLock::SetSystemModeAsyncWaitAPI, 
             &workerThread2, 
             WAIT_INFINITE);
        return delegate(systemMode);
    }

    // Create the callback data
    SystemModeChanged callbackData;
    callbackData.PreviousSystemMode = m_systemMode;
    callbackData.CurrentSystemMode = systemMode;

    // Update the system mode
    m_systemMode = systemMode;

    // Callback all registered subscribers
    if (SystemModeChangedDelegate)
        SystemModeChangedDelegate(callbackData);

    return callbackData.PreviousSystemMode;
}

Timer 示例

一旦委托框架到位,创建计时器回调服务就变得轻而易举。许多系统需要一种根据超时生成回调的方法。也许是用于低速轮询的周期性超时,或者是在预期时间内没有发生某些事情时的错误超时。无论哪种方式,回调都必须在指定的控制线程上发生。在 Timer 类内部使用 SinglecastDelegate0<> 能够很好地解决这个问题。

class Timer
{
public:
    SinglecastDelegate0<> Expired;

    void Start(UINT32 timeout);
    void Stop();

    //...
};

用户创建计时器实例并注册过期事件。在这种情况下,MyClass::MyCallback() 将在 1000 毫秒后被调用。

m_timer.Expired = MakeDelegate(&myClass, &MyClass::MyCallback, &myThread);
m_timer.Start(1000);

堆与固定块

堆用于创建委托和函数参数的副本。当将委托添加到多播列表时,它会使用运算符 new 进行克隆。异步委托支持需要复制委托和所有参数以放置到消息队列中。通常,内存来自堆。在许多系统中,这不是问题。然而,一些系统不能以不受控制的方式使用堆,因为存在堆碎片内存故障的可能性。这发生在长时间运行后堆内存被切割成小块,从而导致内存请求失败。

源代码中包含了一个固定块内存分配器。只需取消注释 DelegateOpt.h 中的 USE_XALLOCATOR 定义,即可启用固定分配器。启用后,所有源自 delegate 库的动态内存请求都会路由到固定块分配器。xallocator 还具有比堆更快的执行速度的优点,从而限制了动态内存分配对速度的影响。

整个 delegate 层次结构通过 DelegateBase 中的单个 XALLOCATOR 宏路由到固定块使用。

class DelegateBase {
#if USE_XALLOCATOR
      XALLOCATOR
#endif

委托库在必要时复制函数参数以支持异步。内存请求通过 DelegateParam<> 类路由到固定块分配器。请注意,如果定义了 USE_ALLOCATOR,则 New() 中获取的内存来自 xmalloc(),并且使用 placement new 语法在固定块区域内构造对象。在 Delete() 内部,手动调用析构函数,然后使用 xfree() 返回固定块内存。

template <typename Param>
class DelegateParam<Param *>
{
public:
      static Param* New(Param* param) {
#if USE_XALLOCATOR
            void* mem = xmalloc(sizeof(*param));
            Param* newParam = new (mem) Param(*param);
#else
            Param* newParam = new Param(*param);
#endif
            return newParam;
      }

      static void Delete(Param* param) {
#if USE_XALLOCATOR
            param->~Param();
            xfree((void*)param);
#else
            delete param;
#endif
      }
};

有关更多信息,请参阅文章“用快速固定块内存分配器替换 malloc/free”。

移植

该代码很容易移植到任何平台。只需要三个操作系统服务:线程、信号量和软件锁。代码分为五个目录。

  1. Delegate - 核心委托库实现文件
  2. Port – 线程特定文件
  3. Examples – 示例代码展示用法
  4. VS2008 – Visual Studio 2008 项目文件
  5. VS2015 – Visual Studio 2015 项目文件

Eclipse 项目文件位于项目根目录(.cproject.project)。使用 File > Import... > General > Existing Projects into Workspace 选项将项目添加到您的工作区。

该库有一个单独的 abstractDelegateThread,带有一个需要在每个目标操作系统上实现的纯 virtual 函数。

virtual void DispatchDelegate(DelegateMsgBase* msg) = 0;

在大多数项目中,我将底层原始操作系统调用封装到一个线程类中,以封装并强制执行正确的行为。在这里,我提供 ThreadWin 类作为 CreateThread() Windows API 的包装器。

一旦你有了线程类,只需继承 DelegateThread 接口并实现 DispatchDelegate() 函数。使用 Win32 API,只需简单地向消息队列发送一个 post 即可

void ThreadWin::DispatchDelegate(DelegateMsgBase* msg)
{
      // Create a new ThreadMsg
      ThreadMsg* threadMsg = new ThreadMsg(WM_DISPATCH_DELEGATE, msg);

      // Post the message to the this thread's message queue
      PostThreadMessage(WM_DISPATCH_DELEGATE, threadMsg);
}

使用 C++ 标准库的另一种实现将消息添加到由互斥量保护的 std::queue 中。

void WorkerThread::DispatchDelegate(DelegateLib::DelegateMsgBase* msg)
{
    ASSERT_TRUE(m_thread);

    // Create a new ThreadMsg
    std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_DISPATCH_DELEGATE, msg));

    // Add dispatch delegate msg to queue and notify worker thread
    std::unique_lock<std::mutex> lk(m_mutex);
    m_queue.push(threadMsg);
    m_cv.notify_one();
}

软件锁由 LockGuard 类处理。此类的锁可以根据您的选择进行更新,或者您可以使用不同的机制。锁只在少数地方使用。Semaphore 类封装了阻塞委托实现所需的 Windows 事件对象或 std::mutex

简而言之,该库通过在 DelegateOpt.h 中定义 USE_WIN32_THREADSUSE_STD_THREADS 来支持 Win32 和 std::thread 模型。如果您的 C++11 或更高版本编译器支持 std::thread,那么您就可以开始了。对于其他操作系统,只需提供 DelegateThread::DispatchDelegate() 的实现,更新 LockGuardSemaphore 类,并在您的线程循环中加入少量代码来调用 DelegateInvoke(),那么 delegate 库就可以部署在任何平台上了。

摘要

所有委托都可以通过 MakeDelegate() 创建。函数参数决定返回的委托类型。

同步委托使用一个参数(对于自由函数)和两个参数(对于实例成员函数)创建。

auto freeDelegate = MakeDelegate(&MyFreeFunc);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc);

添加线程参数将创建一个非阻塞异步委托。

auto freeDelegate = MakeDelegate(&MyFreeFunc, &myThread);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc, &myThread);

如果使用 C++11,std::shared_ptr 可以替代同步和非阻塞异步成员委托中的原始实例指针。

std::shared_ptr<MyClass> myClass(new MyClass());
auto memberDelegate = MakeDelegate(myClass, &MyClass::MyMemberFunc, &myThread);

添加超时参数将创建一个阻塞式异步委托。

auto freeDelegate = MakeDelegate(&MyFreeFunc, &myThread, WAIT_INFINITE);
auto memberDelegate = MakeDelegate(&myClass, &MyClass::MyMemberFunc, &myThread, 5000);

委托使用 operator+=operator-= 从多播容器中添加/删除。所有容器都接受所有委托类型。

MulticastDelegate1<int> multicastContainer;
multicastContainer += MakeDelegate(&MyFreeFunc);
multicastContainer -= MakeDelegate(&MyFreeFunc);

当使用异步委托以允许多个线程安全地添加/从容器中删除时,请使用线程安全的多播委托容器。

MulticastDelegateSafe1<int> multicastContainer;
multicastContainer += MakeDelegate(&MyFreeFunc, &myThread);
multicastContainer -= MakeDelegate(&MyFreeFunc, &myThread);

单播委托使用 operator= 进行添加和删除。

SinglecastDelegate1<int> singlecastContainer;
singlecastContainer = MakeDelegate(&MyFreeFunc);
singlecastContainer = 0;

所有委托和委托容器都使用 operator() 调用。

if (myDelegate)
      myDelegate(123);

在使用返回值或传出参数之前,请在阻塞委托上使用 IsSuccess()

if (myDelegate) {
     int outInt = 0;
     int retVal = myDelegate(&outInt);
     if (myDelegate.IsSuccess()) {
            cout << outInt << retVal;
     }        
}

哪种回调实现?

我在这里的 CodeProject 上记录了四种不同的异步多播回调实现。每个版本都有其独特的特性和优点。以下部分重点介绍了每种解决方案之间的主要区别。有关每个文章的链接,请参阅下面的“参考资料”部分。

这种异步委托实现旨在通过使用 C++ 委托在线程之间调用函数和传递数据来简化线程间通信。远程委托将库扩展到包括进程间和处理器间通信。有关使用 C++ 委托实现远程过程调用的解释,请参阅下面的“参考资料”部分。

C 中的异步多播回调

  • 用 C 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用 memcpy 复制回调参数数据
  • 通过宏提供类型安全
  • 静态数组存储已注册的订阅者回调
  • 已注册订阅者数量在编译时固定
  • C 语言中的固定块内存分配器
  • 紧凑的实现

带线程间消息传递的异步多播回调

  • 用 C++ 实现
  • 回调函数仅限自由函数或静态成员
  • 支持一个回调参数
  • 回调参数必须是指针类型
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 模板使用量少
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 紧凑的实现

C++ 中的异步多播委托

  • 用 C++ 实现
  • C++ 委托范式
  • 任何回调函数类型(成员、静态、自由)
  • 支持多个回调参数(最多 5 个)
  • 回调参数任意类型(值、引用、指针、指针的指针)
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 大量使用模板
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • C++ 中的固定块内存分配器
  • 较大的实现

现代 C++ 中的异步多播委托

  • 用 C++ 实现(即 C++17)
  • C++ 委托范式
  • 函数签名委托参数
  • 任何回调函数类型(成员、静态、自由)
  • 支持多个回调参数(支持 N 个参数)
  • 回调参数任意类型(值、引用、指针、指针的指针)
  • 使用复制构造函数复制回调参数数据
  • 通过模板提供类型安全
  • 大量使用模板
  • 可变参数模板
  • 模板元编程
  • 已注册订阅者回调的动态列表
  • 已注册订阅者数量在运行时扩展
  • 紧凑实现(由于可变参数模板)

结论

多年来我做了很多多线程应用程序开发。在目标线程上用数据调用函数一直是一个手工制作、耗时的过程。这个库将这些构造泛化并封装到一个用户友好的委托库中。

本文提出了一种 C++ 多播委托实现,支持同步和异步函数调用。非阻塞异步委托提供“即发即忘”的调用方式,而阻塞版本允许等待目标线程的返回值和传出引用参数。多播委托容器通过允许多个客户端注册回调通知来扩展委托的实用性。通过让库处理调用函数和跨线程传输数据的底层线程细节,多线程应用程序开发得以简化。线程间代码巧妙地隐藏在库中,用户只需与易于使用的委托 API 进行交互。

参考文献

历史

  • 2016年12月13日
    • 首次发布
  • 2016年12月14日
    • 修正语法错误
    • 修复示例代码中的错误
  • 2016年12月16日
    • 修复了错误并更新了随附的源代码
  • 2016年12月19日
    • 在文章和附件源代码中添加了重入示例
  • 2016年12月20日
    • 为附件源代码添加了 std::shared_ptr 支持
    • 更新文章以解释新功能
  • 2016年12月29日
    • 添加了阻塞委托
    • 对现有实现进行了各种改进
    • 更新文章以解释新功能
    • 更新了附带的源代码
  • 2017年1月1日
    • 更新了委托类,使其能够在 GCC 5.4.0 上构建
  • 2017年1月7日
    • 向库添加了 std::thread 支持
    • 修复了各种错误
    • 添加了用于使用 GCC 构建源代码的 Eclipse 项目文件
    • 更新文章以解释新功能
  • 2017年4月21日
    • 修复了处理指向指针的异步函数参数的错误。更新了 DelegateParam<Param **> 类。
    • 更新了附带的源代码
  • 2019年1月18日
  • 2020年1月28日
    • 修复了 DelegateMsg.h 中的错误。上传了新源代码。
  • 2020年4月4日
    • 向此库添加了远程委托功能。请参阅参考部分。
    • 更新了附带的源代码
  • 2020年10月2日
    • 添加了 Timer
    • 更新了 Win32 和 std::thread 类实现
    • 次要文章更新。添加了对新“现代”委托 C++ 实现文章的引用。
    • 更新了附带的源代码
  • 2022年10月14日
    • 2022年库更新
    • 更新了附带的源代码
  • 2022年9月20日
    • 更新以简化2022年库实现
    • 更新了附带的源代码
  • 2022年12月12日
    • 使用 lambda 的委托更新
    • 附带了新源代码。
© . All rights reserved.