65.9K
CodeProject 正在变化。 阅读更多。
Home

原生 C++ 中的跨线程调用

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (38投票s)

2006年12月10日

Apache

20分钟阅读

viewsIcon

121181

downloadIcon

1227

本文讨论了多线程应用程序中同步的必要性,并提供了一个用于跨线程调用的通用框架:ThreadSynch。

前言

感谢那些花时间阅读本文的读者。如果您觉得可以留下一个投票(尤其是低分投票),请附上评论说明问题所在。我收到的关于这篇文章的投票大部分都很高,除了偶尔的1分或2分,我真的很想知道是什么最让这些投票者感到不满。反馈是改进的动力。

引言

本文以及附带的库+演示项目旨在描述一种跨线程同步调用的方法。跨线程调用本质上是一个线程指示另一个线程调用函数的过程。

对某些代码段、函数和类进行调用序列化是多线程编程的现实,但有一些方法可以在不引入过多代码或复杂逻辑的情况下完成这项工作。如果您有兴趣,请花时间阅读以下段落。这是对概念的介绍,以及我对解决此问题方法的描述。如果您时间紧迫,请直接跳到“使用代码”部分,其中简要介绍了库ThreadSynch

ThreadSynch 需要

  • VC++ 8 (2005)。未在早期版本上进行测试,但由于其标准符合性问题,不太可能在 VC6 上编译。
  • 优秀的 boost 库 (https://boost.ac.cn)。已在版本 1.33.1 上编译和测试。
  • 爱。

ThreadSynch 的特性

  • 一种通用的跨线程调用方式,可通过拾取策略轻松扩展。
  • C++ 异常在线程间传输。
  • 参数和返回值在线程间传输。
  • 保证在调度超时时不会发生跨线程调用——延迟作业是安全的。

背景

同步调用简介

我假设您,读者,至少对线程有所了解,以及在处理公共数据时它们带来的所有陷阱。如果您不了解,请继续阅读,但您可能会发现自己不明白一开始所有的喧嚣是为了什么。

一个典型的例子是工作线程,它会启动 GUI 类中的回调函数来渲染一些更新的输出。在这种情况下,有许多不同的方法,更不用说模式(例如 Observer)可以使用了。我将完全忽略模式,而专注于实际数据和通知。进行跨线程调用的动机是:1. 简化线程间通知;2. 避免类和函数中存在不必要的同步代码。

想象一下工作类 Worker 和 GUI 类 SomeWindow。它们如何关联几乎没有区别,重要的是 Worker 应该调用 SomeWindow 中的一个函数,和/或更新 SomeWindow 中的数据。应用程序有两个线程。一个“驻留”在 Worker 中,另一个驻留 SomeWindow 中。假设在某个时间点,Worker 对象决定向 SomeWindow 发出通知。这该如何做到?我可以总结几种可能的方法,包括主要的优缺点。

  1. Worker 访问并更新 SomeWindow 中的一个数据成员。
    • 优点:快速。
    • 缺点:不规范。更具体地说,它破坏了封装。如果此操作在没有某种锁定(互斥锁/临界区/信号量/等)的情况下进行,则工作线程和窗口线程可能会同时尝试访问数据成员,这肯定会对我们的应用程序造成灾难。如果我们运气好,只会导致访问冲突。如果 SomeWindow 公开了用于锁定的对象,我们将进一步破坏封装,从而引发死锁等问题。
  2. Worker 调用 SomeWindow 中的一个函数,该函数为我们更新数据成员。
    • 优点:在正确锁定的前提下,相对安全。
    • 缺点:SomeWindow 将充斥着锁定代码,在最坏的情况下,每个可更新的数据成员都有一个锁对象。这些锁的引入也削弱了内聚性。以冗长的方式处理线程、锁定和同步的复杂性对于 GUI 类来说并非理想选择。
  3. Worker 向 SomeWindow 发送一个窗口消息,并将更新数据放在结构中。SomeWindow 处理消息并以某种方式处理数据。
    • 优点:相对安全(如果使用 SendMessage)。
    • 缺点:内聚性略有减弱。参数的转换和传输可能很繁琐,因为每个独特的数值组合都需要自定义或通用结构。这种方法的最大缺点是与窗口消息的联系;它对于非 GUI 场景并不真正实用。
  4. Worker 调用 SomeWindow 中的一个函数,该函数通过同步重调用为我们更新数据成员。
    • 优点:安全。相对有效。没有值得一提的冗余代码。
    • 缺点:内聚性略有减弱。代码基础比没有线程时要复杂一点,但绝非难以理解,最终的代码将相当令人满意。

一个框架的诞生

在过去的几年里,我曾对这个领域的问题进行过多种尝试。通常,我最终会使用上述 #2 和 #3 的混合方法。虽然我做了一些抽象,并将其集成到线程库中,但这并没有什么重大意义。直到我接触了 .NET 框架,特别是 InvokeRequired / BeginInvoke 技术,我才开始考虑在原生框架中做同样的事情。.NET 框架的方法从使用角度来看确实很有吸引力,因为它引入了最少的外部代码来处理业务逻辑。虽然许多人会认为理想的方法是完全避免同步,并依赖操作系统来处理与跨线程调用和并发数据访问相关的复杂性;但这不太可能在任何注重效率的应用程序中很快实现。

我不会详细介绍我最初的几个同步框架,而是将重点放在我专门为这篇阅读文章编写的框架上。正如前面提到的,它是基于 .NET 框架的思想,但并不完全相同。考虑到原生代码和托管代码之间的差异,以及语法上的不平等,其机制必须有所不同,使用方式也不同。该框架的动机显然是简化跨线程调用,这些调用可能访问也可能不访问共享资源。它在安全性、灵活性和可靠性方面都付出了巨大的努力,以兑现对用户的承诺。灵活性是通过引入用于线程间通知的模板策略以及 Boost 的函数对象和参数绑定来实现的。我稍后会详细介绍可靠性方面。

基本原理很简单。线程 A 需要更新或处理与线程 B 逻辑相关的数据。为此,A 希望在B 的上下文中发出调用。线程 B 的性质允许它睡眠或等待来自外部源的命令,因此这将是A 可以采取行动的窗口。线程 B 理想情况下应与 GUI 相关,或为网络服务器/客户端、Observer(如 Observer 模式)或类似功能。

需要做的是

  1. 线程 A 必须调用一个函数来安排在线程 B 中执行,无论是否有参数。
  2. 在调用等待执行时,线程 A 必须暂停。如果调用未在关键时间内完成,则必须将控制权交还给线程 A,并通知调用失败。如果A 被通知调用超时,则必须保证调用不会发生。
  3. 线程 B 被通知应执行调用。我们将此称为拾取策略,因为B 将不得不从A 拾取指令来执行某项任务。这就是策略发挥作用的地方。
  4. 线程 B 将执行计划的调用,该调用可能返回也可能不返回一个值,然后继续执行其业务。
  5. 线程 A 返回结果值,并从中断处继续执行。

拾取策略,或者更具体地说,线程 A线程 B 传递通知的方式,可能涉及多种技术。值得一提的有用户模式异步过程调用 (UserAPC) 和窗口消息。QueueUserAPC() 允许将一个函数排队以便在另一个线程的上下文中调用,并依赖于该线程进入可警报等待状态才能执行调用。可警报等待有其自身的问题,但此处暂不讨论。对于 GUI 类型线程,窗口消息是更好的选择。拾取策略是这个流程中相对简单的一部分,但它们在灵活性方面仍然很重要。

使用跨线程调用的同步示例

好的,我们已经讨论了动机和一些要求。现在是时候举例说明该机制的用法了。为了极其简单起见,我现在不会引入类和对象。只需想象以下简单的控制台程序

char globalBuffer[20];

DWORD WINAPI testThread(PVOID)
{
    // Keep sleeping while the event is unset
    while(WaitForSingleObjectEx(hExternalEvent, INFINITE, TRUE) != 
          WAIT_OBJECT_0)
    {
        Sleep(10);
    }

    // Alter the global data
    for(int i = 0; i < sizeof(globalBuffer) - 1; ++i)
    {
        globalBuffer[i] = 'b';
    }
    globalBuffer[sizeof(globalBuffer) - 1] = 0; // null terminate

    // Return and terminate the thread
    return 0;
}

int main()
{
    DWORD dwThreadId;
    CreateThread(NULL, 0, testThread, NULL, 0, &dwThreadId);
    ...

到目前为止没有什么不寻常的。我们有入口点main,以及一个函数testThread。当main 执行时,它将创建一个新线程并启动testThread。在此示例中,testThread 所做的就是等待一个外部事件被信号,然后修改数据结构globalBuffer。重要的是该线程正在等待某事发生,而在等待期间我们可以指示它执行其他操作。因此,我们的目标是让该线程调用另一个函数testFunction

string testFunction(char c)
{
    for(int i = 0; i < sizeof(globalBuffer) - 1; ++i)
    {
        globalBuffer[i] = c;
    }
    globalBuffer[sizeof(globalBuffer) - 1] = 0; // null terminate
    return globalBuffer;
}

testfunction 将修改全局缓冲区,将除最后一个元素外的所有元素设置为字符参数c 的值,然后将其以 null 结尾,最后返回一个包含全局缓冲区内容的新字符串。我们一眼就可以看出,testFunctiontestThread 可能会修改同一个缓冲区。如果我们的主线程直接执行testFunciton,它可能会在被 CPU 切换出去之前修改全局缓冲区的前 10 个左右元素。如果此时testThread 中的外部事件被信号,该线程也将开始修改缓冲区。从testFunction 返回的字符串显然不会是我们期望的。

虽然此示例本身在现实应用中意义不大,但其概念非常实际。如果愿意,请想象全局缓冲区代表对话框中编辑框的文本,并且testThread 应该基于计时器修改此文本。在特定时间间隔,外部线程也可能希望用其他信息更新同一个编辑框,因此它们会调用 GUI 的类(在此简化示例中由testFunction 表示)。为了避免崩溃、文本混乱或其他奇怪的结果,我们需要同步访问。我们不想在代码中添加大量的互斥锁或临界区,而只是让 GUI 线程调用更新文本的函数。当 GUI 线程单独负责更新其资源时,我们可以保证所有操作都按有序的方式进行。换句话说:不会有令人头疼的崩溃和愤怒的客户。

因此,我们不向testThreadtestFunction(两者都修改全局缓冲区)添加大量锁定代码,而是使用一个跨线程调用库来让拥有共享数据的线程完成所有工作。

int main()
{
    DWORD dwThreadId;
    CreateThread(NULL, 0, testThread, NULL, 0, &dwThreadId);
  
    CallScheduler<APCPickupPolicy>* scheduler = 
        CallScheduler<APCPickupPolicy>::getInstance();
  
    try
    {
        // Create a boost functor with a bound parameter.
        // The functor returns a string, and so will the
        // synchronized call.
        boost::function<string()> callback = 
            boost::bind(testFunction, 'a');

        // Make the other thread call it. The return value
        // is deduced from the functor.
        string dataString = scheduler->syncCall
            (
                dwThreadId,                     // Target thread
                callback,                       // Functor with parameter
                500                             // Milliseconds to wait
            );

        cout << "testFunction returned: " << 
                dataString << endl;
    }
    catch(CallTimeoutException&)
    {
        // deal with the problem
    }
    catch(CallSchedulingFailedException&)
    {
        // deal with the problem
    }
  
    return 0;
}

CallScheduler 在这里发挥了关键作用。通过获取该单例类的指针,并指定首选的拾取策略(在此示例中为APCPickupPolicy),我们可以安排在其他线程的上下文中进行调用,前提是该线程对拾取策略使用的任何机制都开放。在当前示例中,我们知道testThread 的等待是可警报的,这非常适合 APC 策略。为了尝试在另一个线程中执行调用,我们调用syncCall 函数,并传递几个参数。模板参数是我们希望执行的函数的返回类型,在此示例中为string。第一个参数是我们希望执行操作的线程的 ID,第二个参数是 Boost 函数对象,第三个参数是我们愿意等待调用启动的毫秒数。Boost 函数对象的使用还允许我们及时绑定参数。如上面的调用所示,testFunction 应该以字符 'a' 作为其唯一参数被调用。

此时,我们等待。调用将被安排,并有望完成。如果拾取策略能够正常工作,调用将在另一个线程中执行,我们很快就会从testFunction 收到syncCall 返回的字符串。如果拾取失败或超时,将抛出异常。考虑这个例子——它真的应该让一切都变得很清楚。在附带的源代码中,有两个示例项目。一个包含上面展示的代码,另一个更贴近实际应用——在 GUI/工作线程应用程序中。

限制,总是有限制

像这里描述的框架的使用有一些限制。有些只是需要注意的方面,而有些则是“致命错误”。

传递给将从另一个线程调用的函数的参数,不应使用 TLS(线程局部存储)说明符——这不言而喻。声明为 TLS 的变量(__declspec(thread))在它被访问的每个线程都有一个副本。就前面的示例而言,即使值通过同步调用机制传递给testFunction主线程也不会必然看到与testThread 相同的数据。简而言之:没有什么能阻止你传递 TLS,但这样做你会看到一些奇怪的行为。一般指导原则是仔细思考。在确切知道后果之前,不要在线程之间传递任何东西。即使跨线程同步调用的机制,或者更确切地说,原理,付出了巨大的努力来使任务保持简单;总会有绊倒的方式。

线程 A 通过线程 B 同步调用函数 F 的示例场景中,有关参数传递和返回的一些指南和要求:

  • 如果函数 F 需要返回指针或引用,请将其设为 const,以便线程 A 无法修改它们。即使它们是 const,线程 B 也可以释放它们,从而导致线程 A 的读取崩溃。除非您绝对确定不会发生这种情况,否则不要使用指针或引用。返回的指针或引用属于线程 B
  • 如果函数 F 接受来自线程 A 的指针或引用作为参数,请确保在F 返回后,线程 B 不会引用它们,既不读取也不写入。传递的指针或引用属于线程 A
  • 函数 F 按值返回给线程 A 的类类型必须提供一个公共复制构造函数,无论是编译器生成的还是用户实现的。
  • 线程 B 抛出到线程 A 的异常必须提供一个公共复制构造函数,并且不能被副本损坏。

此外,虽然这真的不言而喻,但在多线程环境中同步的代码块或多或少会造成一些“交通堵塞”。在某些情况下,重新设计可能是解决方案,而在其他情况下;这是不可避免的。如果多线程应用程序要安全地访问同一资源,则无法省略锁定,因此一切都取决于您如何最大程度地利用这种情况。一个快速的比喻

想象一下您必须用一堆汽车在 A 和 B 之间来回运送包裹。一次驾驶所有五辆汽车是一项艰巨的任务,所以您请求(阅读:欺骗)一些朋友帮忙。有几辆车的负载比其他车轻,所以它们能够比其余的车更快地通过整个路段。过了一会儿,这些飙车司机实际上已经超过了队伍,绕了一圈,又追上了你们其他人。当然,这完全没问题,如果不是因为只有一条车道的路段。更快的车无法超车,直到道路再次提供多条车道。显然非常低效,因此决定速度快的车在被迫减速时就停下来。一旦停下来,它们就可以做很多其他否则找不到时间做的事情,例如在网上冲浪。当较慢的司机快要到达单车道公路的尽头时,他们会打电话给等待的人——告诉他们道路正在畅通,让他们加速通过。

尽管这个例子在现实世界中可能显得很愚蠢,但对于多线程应用程序来说,它非常有意义。如果线程 A 无法访问一段代码,因为线程 B 放置了一个块,那么 A 最好在重新尝试锁定之前执行更多的计算或操作。因此,即使上述交通堵塞在很大程度上是不可避免的,也有方法可以最大程度地利用它们。

使用代码

我们几乎涵盖了所有需要说的内容。附带的文件中有两个示例项目,此外还有文档化的代码和来自 doxygen 标签的生成 HTML。我将快速总结一下基本知识。

该库严格基于头文件,所以本质上您要做的就是包含ThreadSynch.h 和您选择的拾取策略。

要获取调度程序实例,使用包含的 APCPickupPolicy

WMPickupPoilcy 将通过用户 APC 通知目标线程,前提是目标线程在指定调用超时结束之前进入可警报等待状态。

CallScheduler<APCPickupPolicy>* scheduler = 
    CallScheduler<APCPickupPolicy>::getInstance();

在附带的ThreadSynchTest项目中有一个这方面的示例。

要获取调度程序实例,使用包含的 WMPickupPolicy

WMPickupPoilcy 将通过用户定义的窗口消息通知目标线程。

typedef WMPickupPolicy<WM_USER + 1> WMPickup;
CallScheduler<WMPickup>* scheduler = 
    CallScheduler<WMPickup>::getInstance();

传递给WMPickupPolicy 的模板参数表示要发布到线程的消息。接收线程应在其消息循环中处理此消息代码,例如

while(GetMessage(&msg, NULL, 0, 0)) 
{ 
    if(msg.message == WMPickup::WM_PICKUP)
    {
        WMPickup::executeCallback(msg.wParam, msg.lParam);
        continue;
    }

    TranslateMessage(&msg);
    DispatchMessage(&msg);
}

在附带的 ThreadSynchWM 项目中有一个这方面的示例。

通过另一个线程跨线程调用函数

scheduler->syncCall(dwThreadId, function, timeoutInMS);

跨线程调用返回 int 的类成员函数,参数为 const string 引用

// Class in which our target function resides
MyClass classInstance;

// String parameter
string myString = "hello world";

// Init functor
boost::function<int(const string&)> myFunctor = 
    boost::bind(&MyClass::someFunction,     // Function
                &classInstance,             // Instance
                boost::ref(myString));      // Parameter
    
// Make the call
// The return value template specification can be ommited
// in this case, as it's also deduced from the boost functor.
// I've included it here to show how it can be specified,
// and how it must be specified if mere function pointers
// are used in place of the functors.
int x = scheduler->syncCall
    <
        int            // Return type.
    >
    (
        dwThreadId,    // Target thread
        myFunctor,     // Functor to call from target thread
        timeoutInMS    // Number of milliseconds to wait 
                       // for the call to begin
    );

跨线程调用返回 string 的类成员函数,参数为 const string 引用,并期望可能会抛出一些异常

// Class in which our target function resides
MyClass classInstance;

// String parameter

string myString = "hello world";

// String for the return value
string myReturnedString;

try
{
    // Init functor
    boost::function<string(const string&)> myFunctor = 
        boost::bind(&MyClass::someFunction,    // Function
                    &classInstance,            // Instance
                    boost::ref(myString));     // Parameter
    
    // Make the call
    myReturnedString = scheduler->syncCall
        <
            string,        // Return type
            ExceptionTypes<std::exception, MyException>
        >
        (
            dwThreadId,    // Target thread
            myFunctor,     // Functor to call from target thread
            timeoutInMS    // Number of milliseconds to 
                           // wait for the call to begin
        );
}
catch(CallTimeoutException&)
{
    // The call timed out, do some other stuff and try again
}
catch(CallSchedulingFailedException&)
{
    // The call scheduling failed, 
    // probably caused by the pickup policy not doing its job
}
catch(std::exception& e)
{
    // Deal with e
}
catch(MyException& e)
{
    // Deal with e
}
catch(UnexpectedException&)
{
    // We didn't expect this one. 
    // It's time to read someFunction's docs.
}

您显然不必每次都捕获所有这些异常,但如果您愿意,也可以这样做。这完全取决于您。也就是说,您是否想要一个异常安全的应用程序。

在大多数情况下,您会希望一个函数在函数“所有者线程”的上下文中重调自身,而不是调用特定函数,如上所示。附带的ThreadSynchTestWM 示例演示了如何在updateText 函数中做到这一点。

内部机制

ThreadSynch 库大量使用模板和预处理器宏(例如通过 boost 的 MPL)。如果您想确切了解库如何(以及为何)工作;您应该通读源代码。话虽如此,我将在下面介绍一些基本知识。

每次跨线程调用都有两个主要参与者,“客户端”线程 A 和“目标”线程 B。在跨线程调用之前,线程 B 处于未知状态。由拾取策略来强制改变该状态,或在线程 B 可用时(例如进入可警报等待状态)优雅地处理计划的调用。

线程 A

线程 A 将调用CallScheduler::syncCall,并带有一组模板参数,以及目标线程、函数对象和超时时间。要快速了解接下来会发生什么,请查看此活动图

CallScheduler::syncCall 函数基本上会分配一个CallHandler 实例,该实例是在另一个线程拾取拾取策略所做的通知后负责实际调用的结构。CallHandler 包含用于在线程 B 中捕获异常和返回值的包装类。syncCall 新创建的CallHandler 实例会被排入特定目标线程的调用队列,这可以在此活动图中看到。

调用被排入队列,并且拾取策略已收到通知后,syncCall 将等待事件或超时发生。无论哪个先发生,syncCall 都将通过锁定CallHandler 来进行后续操作。如果计划的调用在超时发生时已经开始执行(但未完成),则此锁定将等待调用完成。获取锁定后,将检查计划调用的状态。如果完成,结果将传回syncCall 的调用者——要么是重新抛出异常,要么是返回返回值。但是,如果在获取CallHandler 锁时调用尚未完成或开始,则该调用将从目标线程的队列中删除。这保证了返回值、异常和参数不会丢失。syncCall 返回的状态将是线程 B 中所发生情况的准确状态。

线程 B

稍作回顾,线程 B 照常进行其业务。然后,在某个任意(但由策略定义的)时间点,拾取策略介入,并让该线程调用CallScheduler 中的一个函数。该函数executeScheduledCalls 将以先进先出的顺序获取并执行当前线程安排的所有CallHandler 回调。请参阅此活动图以了解 CallScheduler::executeScheduledCalls

计划的调用将通过getNextCall 函数获取,直到没有更多调用为止。请参阅此活动图以了解 CallScheduler::getNextCall。此函数关键部分是锁定CallHandler。与库中使用的所有其他锁类型相反,如果CallHandler 已经被锁定,此锁将立即返回。此时找到锁定状态的唯一原因是调用已超时,并且syncCall 即将删除它。这个出队和删除将在线程 A 获得对CallHandler 和线程队列的锁定后进行,它将在getNextCall 返回(从而释放其作用域锁定)时进行。 CallHandler

对于每个执行的CallHandler,有两个层。一个实用类负责异常捕获(ExceptionExpecter),另一个负责返回值捕获(FunctorRetvalBinder)。这两个层的结果将放置在CallHandler 中,并在线程 B 完成调用并释放其锁时由线程 A 进行处理。我不会详细介绍这些层,因为它们已在附加代码中进行了文档化。

进一步研究

如果您打算使用此库,我强烈建议您花时间阅读源代码。考虑到本文提供的信息,应该不难理解事情的流程。如果您发现任何特定的部分令人困惑,请在此处发表评论。这也适用于本文——任何建议都欢迎。

历史

  • 2006 年 12 月 11 日(文章):上传了文章和库版本 0.7.0。
  • 2006 年 12 月 12 日(文章):进行了一些澄清和扩展。
  • 2006 年 12 月 13 日(Bugfix
    • FunctorRetvalBinder 添加了丢失的返回值析构函数。
    • 库版本 0.7.1。
  • 2006 年 12 月 15 日(文章):修复了低分辨率的格式。
  • 2007 年 3 月 13 日(文章 & Bugfix
    • 更新了文章中的一个示例。
    • 修复了syncCall 模板签名的次要问题。
    • 添加了一个单元测试项目。
    • 库版本 0.7.2。
  • 2007 年 7 月 15 日(Bugfix
    • 修复了CallScheduler 中的 scoped_lock 错误,该错误可能导致调用超时时出现未定义行为。
    • 修复了CallHandler 析构函数,它未能清理 exception expecter 实例(效果是轻微的内存泄漏)。
    • 修复了scoped_try_lock 循环,以便在CallScheduler::getNextCallFromQueue 中正确迭代已锁定的CallHandlers
    • 库版本 0.7.3。
© . All rights reserved.