65.9K
CodeProject 正在变化。 阅读更多。
Home

现代 C++ 的线程包装器

2017 年 3 月 20 日

CPOL

16分钟阅读

viewsIcon

127063

downloadIcon

1953

基于 std::thread 的线程包装器 (v. 2.0),提供安全封装的同步、状态控制和终止功能

引言

无家可归的想法总会在某个人的家中找到避难所。

Stanisław Jerzy Lec

目录

引言

这是关于线程包装器的短系列文章的第一篇

  1. 当前文章。
  2. 适用于现代 C++ 的传送带线程包装器.

两篇文章共享相同的可下载源代码。

动机

在文章摘要中提到“现代 C++”,我指的是需要 C++11 或更高版本,因为标准库中的多线程和线程同步特性得到了本质性的使用。多线程的标准化是一个巨大的进步,它使得 C++ 成为一种真正的跨平台技术;但这项标准化活动尚未接近完成。

我提出了“线程包装器”的概念和基本设计已有多年,并首先在 CodeProject 上发布了一些相关的代码示例,以响应一些 问答 问题。我首先回答了 .NET 中的以下问题:如何将 ref 参数传递给线程更改线程(生产者)的参数(启动后)C# 中的多线程将参数传递给有线程的长时间运行进程,以及后来许多其他相关问题。CodeProject 会员 VSNetVbHarry 非常好心地 将我的一个代码片段翻译成了 VB.NET。在这些回答中,我涵盖了传递包装器对象的引用(及其所有实例成员)以便从线程代码访问它们,封装线程交错(用于线程间数据交换),线程节流、连接、线程中止及相关问题,这清楚地展示了该概念的好处。

一些主要思想可以追溯到 Ada 任务机制 的设计。

现在我们有了 C++11 的 std::thread 和其他与多线程相关的标准库特性。GCC 4.8.1 是 2013 年第一个功能完整的 C++11 实现,Clang 也于 2013 年准备就绪;Microsoft 从 2015 年才开始支持 C++11。从那时起,用 C++ 开发类似的线程包装器非常有意义。其他促成此发布的因素是该概念的成熟和经验的积累,以及,足够重要的是,CodeProject 会员们不断提出的许多相关问题,以及我希望通过撰写文章和博文一次性回答许多问题的旧想法

为什么需要线程包装器?

线程包装器类基于 std::thread。此类是标准化的,并且非常好。另一个更灵活的选择可能是 boost::thread,它包含在 Boost 库集 中,但这已经是一个超出了官方 C++ 标准的大型库。

关于 std::thread,这是一个原始的标准类,它允许执行标准库 多线程库 所能提供的所有多线程功能。自然,它提供了通过该库可达到的最大灵活性,但其中有大量微妙之处和潜在的或不太明显的使用技巧。相比之下,线程包装器提供了极大的简洁性,同时涵盖了大多数典型或不典型的应用程序。从另一方面讲,线程包装器的设计基本上是基于通过扩展进行编程的,因此任何更高级的行为都可以自然地在派生类中添加到包装器中。

线程包装器用法

思路是:用户创建一个派生类并重写虚函数 ThreadWrapper::Body。实例一旦构造完成,线程尚未启动。它可以在稍后的任何时间启动,通过调用独立函数 ThreadWrapper::Start(bool)。该参数提供了指定分离(后台)模式的选项,这在“连接非连接线程”部分有详细说明。独立启动是一个非常重要的灵活性特征,这是原始 std::thread 中严重缺失的功能。

还有更多的虚函数可以重写,但这是可选的,因为其余的都是伪抽象函数:Aborted()Terminated()ExceptionCaught(std::exception)UnknownExceptionCaught()。我没有展示它们的用法,因为它们相当简单;它们的调用 在这里 显示。

完整的用法示例显示在最后的代码示例中。

派生的线程包装器类和线程体

在实现独立函数 ThreadWrapper::Start 时有一个小问题:std::thread 的实例必须是 ThreadWrapper 的成员,因此它使用默认构造函数进行构造,没有参数。线程体仅在稍后发挥作用。

实现这种行为的一种方法是利用 std::thread 函数 thread& operator=(thread&& other)移动语义

class ThreadWrapper {
public:

    void Start(bool background = false) {
        thread = std::thread(BodyWrapper, this);
        if (background)
            thread.detach();
    } //Start

    // ...

};

这就是线程操作符 = 配合移动语义的思路:构造函数调用表达式 std::thread(BodyWrapper, this) 被编译器识别为一个右值表达式,即临时的;因此,期望的是右值引用 && 参数,而不是左值,因为所有其他赋值运算符都被省略(删除)了。因此,操作符的实现可以安全地修改赋值右侧创建的临时线程实例——保证它稍后不会被使用。线程句柄和新构造线程实例的所有属性都将被移动到左操作数,即实例成员 ThreadWrapper::thread

可以通过另一种方式实现类似行为,即通过函数 swap

class ThreadWrapper {
public:

    void Start(bool background = false) {
        std::thread tmp(BodyWrapper, this);
        thread.swap(tmp);
        if (background)
            thread.detach();
    } //Start

    // ...

};

我想知道是否有人会因为这个看似奇怪的函数 swap 而感到困惑——开发者为什么要交换两个线程句柄?上面的代码示例只是一个用法示例。

分离线程的行为是一个非常特殊的问题;处理方法在“连接非连接线程”部分有描述。

函数 BodyWrapper 需要处理异常和连接。这是如何做到的。

class ThreadWrapper {

// ...

private:

    static void BodyWrapper(ThreadWrapper* instance) {
        try {
            instance->Body();
        } catch (ThreadAbortException&) {
            instance->Aborted();
        } catch (std::exception& ex) {
            instance->ExceptionCaught(ex);
        } catch (...) {
            instance->UnknownExceptionCaught();
        } //exception
        std::lock_guard<std::mutex> lock(instance->joinMutex);
        instance->done = true;
        instance->joinEvent.notify_all();
        instance->Terminated();
    } //BodyWrapper

    std::thread thread;
    std::condition_variable stateEvent, joinEvent;
    std::mutex stateMutex, joinMutex;
    Action state = Action::wakeup; // protected by statusMutex, statusEvent
    bool done = false; // protected by joinMutex, joinEvent

}; 

这种将线程与线程包装器实例绑定的方式实际上是包装器的主要思想。一旦我们有了这种绑定,我们就可以安全地封装线程控制(节流、中止、异常处理)和线程间的同步数据交换。

让我们从线程节流开始。

休眠与唤醒

在某些多线程 API 中,可以找到用于挂起和恢复线程的函数。此类 API 的示例是 Windows 的 SuspendThreadResumeThread。在现代线程 API 中,如 C++ 的 std::threadCLI,此类函数从不包含。它们与 TerminateThread (我们将在下面讨论线程终止)一样危险。

此类 API 的问题在于它们与线程执行完全异步。一个可能的问题是线程在拥有互斥对象时被挂起。在这种情况下,一个线程的挂起将间接导致所有试图获取相同互斥对象的线程被挂起——这些线程将被置于等待状态,直到被挂起的线程释放互斥对象。但这还不是最糟的情况。更糟糕的是,想要唤醒持有互斥对象的线程的同一线程,在挂起后但在释放之前,可能会试图获取相同的互斥对象。这会造成死锁,即两个(或更多)线程处于等待状态并互相等待。甚至更糟的是,这种情况发生的概率非常低,以至于测试无法发现问题;而死锁可能会在最尴尬的情况下发生在客户身上,这完全符合墨菲定律

同时,线程的挂起和恢复是一个非常重要的功能。在某些应用程序中,这是至关重要的。而且它可以是完全安全的。只要将挂起与线程执行同步即可。需要挂起的线程应该反复调用一个用于使其进入等待状态的函数,但要有条件地进行。在当前的线程包装器实现中,这个函数是受保护的函数 ThreadWrapper::SyncPoint。它使用基于 std::condition_variable 的机制。结合 std::mutex,它提供线程节流效果,方式类似于 CLI 中的 System.Threading.EventWaitHandle

这是它的工作原理。

class ThreadWrapper {

    // ...

protected:

    void SyncPoint(bool autoReset = false) {
        std::unique_lock<std::mutex> ul(stateMutex);
        stateEvent.wait(ul, [=] {
            auto result = ((int)state & (int)Action::wakeup) > 0;
            if (state == Action::deepAbort)
                throw ThreadAbortException();
            else if (state == Action::shallowAbort)
                throw ShallowThreadAbortException();
            if (autoReset)
                state = Action::sleep;
            return result;
        });
    } //SyncPoint

};

此函数也用于下面讨论的线程终止。相关的线程同步字段(stateEventstateMutexstate)的声明已在显示 BodyWrapper 的代码示例中展示。

线程包装器用于节流线程的方法被故意命名为不同的名称,PutToSleepWakeUp。即使命名有些难听,它也强调了这些函数与令人厌恶的“suspend”和“resume”无关。从另一方面讲,这种命名表明了线程实际发生的情况:当线程调用 SyncPoint 且其状态设置为 suspend 时,它被置于等待状态,也就是说,它被关闭并且在被唤醒之前不会被重新调度执行。等待状态不意味着自旋等待,没有轮询;线程实际上是通过事件通知机制被唤醒的。

线程节流就是这样实现的。

class ThreadWrapper {
public:

    // ...

    void PutToSleep() {
        std::lock_guard<std::mutex> lock(stateMutex);
        state = Action::sleep;
        stateEvent.notify_one();
    } //PutToSleep

    void WakeUp() {
        std::lock_guard<std::mutex> lock(stateMutex);
        state = Action::wakeup;
        stateEvent.notify_one();
    } //WakeUp

    // ...

}; 

线程终止

对于 std::thread,没有任何类似于 CLI 中的 System.Threading.Thread.Abort 的东西,后者是一种完全与线程执行异步的线程终止机制。使用这种机制有时会引发争论,所以我在这里不讨论它。

相比之下,线程包装器的线程可以通过与线程执行同步的相同 SyncPoint 函数 上面显示 来由另一个线程中止。注意,有两个注意,有两个中止级别:深度和浅度。在 ThreadWrapper 类中,只使用了深度中止,而浅度中止则为后继类保留,并在 单独的文章 中描述的 ConveyorThreadWrapper 类中使用。对于 ThreadWrapper 类,重要的是 SyncPoint 抛出的两个异常彼此派生。

class ThreadWrapper {

// ...

private:

    // ...

    class ThreadAbortException : std::exception {};

protected:

    class ShallowThreadAbortException : ThreadAbortException {};

    // ...

}; 

由于 ThreadAbortExceptionbodyWrapper 函数中被捕获,它也捕获了 ShallowThreadAbortException。这意味着,如果某个派生类抛出此异常(间接,如下所示)并且不处理它,它仍将被捕获。另请参阅“异常”部分。

函数 ThreadWrapper::AbortPutToSleep/WakeUp上面显示)非常相似。

class ThreadWrapper {
public:

    // ...

    void Abort() {
        SetAbort(true);
    } //Abort

    // ...

private:

    enum class Action {
        sleep = 0, wakeup = 1,
        shallowAbort = wakeup | 2, deepAbort = wakeup | 4
    };

    void SetAbort(bool set, bool shallow = true) {
        std::lock_guard<std::mutex> lock(statusMutex);
        if (set) {
            if (shallow)
                state = Action::shallowAbort;
            else
                state = Action::deepAbort;
        } else //clear abort
            state = Action::wakeup;
        statusEvent.notify_one();
    } //SetAbort

    // ...

};

请注意,abort 状态是位图值与 wakeup 结合。SyncPoint实现显示,线程是基于从状态中提取的 wakeup 位被唤醒的。这是为了确保线程可以在等待(休眠)状态下被中止。继续使用 abort 状态执行将抛出 ThreadWrapper::ThreadAbortException 类型的异常,该异常将在线程体的最顶层堆栈帧中被捕获。

使用异常处理进行线程中止是完全安全的。显然,重要的是将线程执行带到最顶层堆栈帧,在该帧中,它将无条件地退出其体函数。如果体函数编写正确,堆栈展开机制将涉及正确销毁到中止点为止构造的所有对象并进行完整清理。

异常

所有线程异常都应在线程堆栈的最顶层堆栈帧处处理,最迟也是如此。如果未这样做,则会调用 std::terminate;最终整个进程将被终止。

C++ 没有一个通用的基类用于所有异常类型,但有一个标准基类用于一些标准异常,即 std:exception。所有派生自 std:exception 的类型的异常都可以通过处理该异常来捕获。因此,应首先处理此异常类型;所有其他异常都应在 try 块的末尾处理。这就是在 BodyWrapper 方法中所做的——该方法构成了线程的最顶层堆栈帧,正如从第一个代码示例中可以看到的那样。

同样的原因,异常处理程序有三个不同的钩子函数Aborted()ExceptionCaught(std::exception)UnknownExceptionCaught()。尽管 ThreadAbortException 在技术上是一个异常类,但抛出和传播此异常不应被视为异常情况。

连接非连接线程

原始 std::thread 的一个导致许多错误的问题是它的 join 函数。它只是阻塞调用线程直到目标线程终止。只有当线程可连接时才能使用它,而当线程被分离时,它就不能使用。分离线程类似于 CLR 中的后台线程:一个分离的或后台线程如果所有前台线程都已终止,则不会使进程继续运行。当 join 用于确保线程在应用程序最终关闭之前终止时,它对于后台线程可能不是必需的。

但是,如果我们仍然需要将调用线程与给定线程的终止同步,该怎么办?std::thread 对象(或其包装器)的状态仍然可访问,因此首先想到的是轮询此状态,即自旋等待。当然,这种解决方案不能被认为是可接受的。

这个问题可以使用相同的 std::condition_variable 机制来解决。

class ThreadWrapper {
public:

    // ...

    void Join() {
        if (thread.joinable())
            thread.join();
        else { //fallback for the case the thread was detached:
            std::unique_lock<std::mutex> ul(joinMutex);
            joinEvent.wait(ul, [=] { return done; });
        } // if
    } //Join

    // ...
};

自然,在线程体最顶层的堆栈帧上,将在线程终止时发送通知。重要的是,与线程节流方法不同,所有可能与线程终止同步的线程都会收到通知。这在 BodyWrapper 的代码中上面显示。

原子化属性

一般情况下,派生自 ThreadWrapper 的类的开发者可以轻松地以封装的方式组织线程间的交错;包装器类使得这非常方便。该类的所有公共成员都有可能从任何其他线程访问。如果它们也在包装线程内部使用,则应应用线程同步。派生包装器可以以某些私有包装器成员的形式添加所有必需的线程同步原语,并在公共成员的实现中使用它们。

然而,在此基础上,我想添加一个辅助模板类来覆盖大多数典型的互斥执行模式,这些模式将涵盖绝大多数应用程序。开始描述这些技术的最简单方法可能是展示整个类。

#include <mutex>

template<typename T>
class InterlockedProperty {
public:

    InterlockedProperty() : InterlockedProperty(nullptr, nullptr) { }
    InterlockedProperty(const T &value) : InterlockedProperty(nullptr, &value) { }
    InterlockedProperty(std::mutex& sharedMutex) : InterlockedProperty(&sharedMutex, nullptr) { }
    InterlockedProperty(std::mutex& sharedMutex, const T &value) : InterlockedProperty(&sharedMutex, &value) { }

    InterlockedProperty& operator=(InterlockedProperty& other) {
        this->mutex = other.mutex;
        this->value = other.value;
        return *this;
    } //operator=

    void UseSharedMutex(std::mutex& mutext) {
        this->mutex = mutex;
    } //UseSharedMutex

    operator T() const {
        std::lock_guard<std::mutex> lock(*mutex);
        return value;
    } //operator T

    T operator=(const T &value) {
        std::lock_guard<std::mutex> lock(*mutex);
        return this->value = value;
    } //operator=

private:

    InterlockedProperty(std::mutex * sharedMutex, const T * value) {
        if (sharedMutex == nullptr)
            mutex = &uniqueMutex;
        else
            mutex = sharedMutex;
        if (value != nullptr) // don't use mutex to interlock value here: it could be not yet fully constructed
            this->value = *value;
    } //InterlockedProperty

    std::mutex uniqueMutex;
    std::mutex * mutex;
    T value;

}; // class InterockedProperty

模板类 InterlockedProperty 仅定义了读写属性值的两个操作符,并将访问值封装在同一个互斥对象中。

当然,并非所有类型都可以用作模板参数;没有默认构造函数的类不能被使用。

InterlockedProperty 实例的初始化需要一些理解。

InterlockedProperty 的初始化

看看 InterlockedProperty 公共构造函数集。其中两个使用了类内部构造的 uniqueMutex 对象,另外两个使用了构造函数提供的某个外部互斥对象实例。更简单的构造函数(没有外部互斥参数的那些)实现了最简单的情况,即属性的访问是在线程包装器的线程和其他线程之间交错的。这可能涵盖了大多数典型应用程序。

然而,对于另一种同样非常典型的应用程序来说,这还不够。线程包装器可能不止一个属性,并且某些属性必须一起同步。例如,一个线程包装器可能操作两个属性,这两个属性的值不是随机的,而是遵循某种不变量;也就是说,并非所有属性值的组合都是有效的。换句话说,在一个线程中修改一个属性,在另一个线程中修改另一个属性是不正确的,因为线程包装器的中间状态最终可能变得不正确。一组某些属性的修改应该是互斥的。实际上,这就是互斥量的通用目的。

因此,所有这些属性都应该简单地共享同一个互斥实例,这是可以通过使用带有 sharedMutex 参数的构造函数来实现的。共享互斥量的其他方法是:使用 InterlockedProperty& operator=(InterlockedProperty&)(仅适用于相同模板参数类型的两个 InterlockedProperty 实例)或 UseSharedMutex(std::mutex&)(例如,可以从线程包装器构造函数的体中调用)。

现在,有两个构造函数可用于初始化属性值。需要注意的是,构造函数中对属性值的修改不是交错的。首先,它不是必需的。更重要的是,它在所有情况下都不会起作用。一种这样的情况在下面显示的用法示例中。在此示例中,共享互斥量是线程包装器类的成员,并且它在构造函数的初始化列表中传递给属性实例,其中调用了 InterlockedProperty 构造函数,但此时 mutex 对象尚未完全构造。这很容易在调试器下观察到;而 std::mutex 类在这种状态下不能用于锁定。通常,在尝试调用传递给构造函数初始化列表的对象函数时要格外小心。

用法,整合所有

既然所有部分都已解释清楚,我现在可以展示一个或多或少全面的用法示例,其中包含同步在一起的交错属性。

using natural = unsigned long long int;

class MyThread : public ThreadWrapper {
public:
    MyThread() : id(mutex, 2), name(mutex) {}
    InterlockedProperty<int> id;
    InterlockedProperty<const char*> name, help;
    InterlockedProperty<natural> delayMs;
protected:
    void Body() override {
        auto sleep = [=] {
            std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
        }; //sleep
        int count = 0;
        name = oldName;
        while (true) {
            this->SyncPoint();
            std::cout << count++ << help;
            std::cout << "id: " << id << ", name: " << name << std::endl;
            sleep();
        } //loop
    } //Body
private:
    const char* oldName = "none";
    std::mutex mutex;
}; //class MyThread

class ThreadWrapperDemo {
    enum class command {
        abort = 'a', // abort thread
        quit = 'q', // also abort thread
        sleep = 's',
        wakeUp = 'w',
    };
    static const char* help() { return " a, q: quit, s: sleep, w: wake up, else: change property; "; }
    static bool commandIs(char c, command cmd) { return (int)cmd == (int)c; }
public:
    static void Run(natural delayMs) {
        const char* newName = "new";
        MyThread thread;
        thread.help = help();
        thread.delayMs = delayMs;
        //thread.Suspend(); // can be suspended before start, waken up after
        thread.Start(); // thread.Start(true) for the detached (background)
        char cmd;
        while (true) {
            std::cin >> cmd;
            if (commandIs(cmd, command::abort) || commandIs(cmd, command::quit)) {
                thread.Abort();
                break;
            } else if (commandIs(cmd, command::sleep))
                thread.PutToSleep();
            else if (commandIs(cmd, command::wakeUp))
                thread.WakeUp();
            else {
                thread.id = thread.id + 1; // no ++ defined
                thread.name = newName;
            } //if
        } //loop
        thread.Join();
    } //Run
}; // class ThreadWrapperDemo

此代码片段还显示了线程终止以及从两个线程访问线程成员。两个注释显示了在等待(休眠)状态下启动线程的选项,以及在后台(分离)模式下使用线程的选项,但仍然能够连接它。

兼容性和构建

所有线程包装器解决方案都包含在两个文件中。

  • “ThreadWrapper.h”,
  • “InterlockedProperty.h”,

它们可以添加到任何项目中。

编译器应支持 C++11 或更高版本。对于 GCC,这应该设置为 -std=c++11-std=c++14 等选项。

演示项目有两种形式:1) 使用 Microsoft C++ 编译器和 Clang 的 Visual Studio 2015 解决方案和项目——参见“ThreadWrapper.sln”;2) 使用 GCC 的 Code::Blocks 项目——“ThreadWrapper.cbp”。对于所有其他选项,可以通过将解决方案文件目录的子目录“Cpp”中的所有“*.h”和“*.cpp”文件添加到项目中来组装一个项目或 Makefile。

我已在 Visual Studio 2015、Clang 4.0.0、GCC 5.1.0 上测试了代码。

C++ 选项包括“禁用语言扩展”(Microsoft 和 Clang 为 /Za),这对于 Microsoft 似乎是必需的。

版本

初始版本

2017 年 3 月 20 日

1.0

2017 年 3 月 21 日

添加了派生自 ThreadWrapperConveyorThreadWrapper。此类在单独的文章 现代 C++ 的传送带线程包装器 中有详细描述。重新设计了演示应用程序。

2.0

2017 年 3 月 24 日

更改了 ConveyorThreadWrapper 的设计。在 ThreadWrapper 中,扩展了受保护成员的集合,以支持 ConveyorThreadWrapper。线程状态同步与 ConveyorThreadWrapper 阻塞队列同步分开。

2.1

2017 年 10 月 29 日

修复了 ThreadWrapper::ExceptionCaught 函数签名的错误。应为:virtual void ExceptionCaught(std::exception& exception) {}

最终注释

添加了带有 Threading.ThreadWrapper 类的 C# 项目以供参考。我可能会决定撰写更多关于相关主题的文章。在这种情况下,我可能会升级并共享从当前文章页面下载的源代码。

我希望收到关于本文的富有信息性的反馈、批评和建议。感谢您的时间和耐心。

© . All rights reserved.