65.9K
CodeProject 正在变化。 阅读更多。
Home

适用于现代 C++ 的传送带线程包装器

2017年3月21日

CPOL

13分钟阅读

viewsIcon

28602

downloadIcon

563

新线程包装器 (v. 2.0) 提供基于阻塞队列和由其他线程提供的委托的编程模型

引言

In the queue, you sons of bitches, in line!

M. A. Bulgakov, 狗的心

目录

引言

这是关于线程包装器系列文章中的第二篇。

  1. 现代 C++ 的线程包装器.
  2. 本篇文章。

这篇新文章结合了几个方面。

首先,新的传送带包装器基于 上一篇文章,其中详细描述了线程和线程同步的大部分原理。我将在几个地方引用这篇文章。

下一个重要组成部分是我文章 The Impossibly Fast C++ Delegates, Fixed 中设计的委托。这些委托比 std::function 更快,并且提供了极大的便利性。

最后,所有这些都与阻塞 队列阻塞集合的概念结合在一起。一个著名的例子是 .NET FCL 模板类 BlockingCollection。在我的文章 用于线程通信和线程间调用的简单阻塞队列 中,我提供了我的阻塞队列实现(这项工作起源于 .NET v.3.5,当时 BlockingCollection 尚未引入),并展示了各种有趣的用例,包括与本主题相关的用例。

这种队列是一些被广泛使用的编程模型的一部分,例如 消息队列。通常,一个线程将数据推送到队列,这通常不是一个阻塞调用;而另一个线程从队列接收数据,这通常是一个阻塞调用,用于在队列为空时将线程置于等待状态。多个线程可以在两端参与此传输。

本文的主题是队列的特例,在我关于阻塞队列的文章中进行了说明。队列元素必须仅仅是数据吗?不。一种重要的方法是队列…委托实例。更准确地说,队列元素可以是委托实例,其中包含委托调用所需的参数值。

这是延迟执行多个任务并串行化执行的方法。连续执行一些任务没有问题,恰恰相反,使用过多或不可预测数量的线程是一个很常见的错误。仍然会使用并行性;并且排队的任务集与 UI 等其他线程并行执行。

这种方法涵盖了一大类实际重要的应用程序。通常,我们需要将一个或多个任务排队以在后台执行,同时在其他线程中执行其他活动,如 UI 操作、网络、通信等。最好在整个应用程序生命周期内重用同一个线程,而不是为每次执行后台任务都创建一个新线程。这可以被认为是比线程池更稳定、更可预测的替代方案。

线程包装器设计

ConveyorThreadWrapperBody 方法使用 final说明符进行密封。用户通过其他形式获得对线程中代码的控制:用户可以提供不同的委托并将它们推送到队列。我将把此类委托实例执行的操作称为任务。该类是一个模板类;其唯一的模板参数是委托参数的类型。要将任务加入队列,使用代码会提供一个委托实例和一个用于委托调用的参数值。

此类包装器可以立即使用,无需创建派生类。显然,委托可以基于此类派生类的成员函数,但更自然的是基于用户类。另请参见“委托和委托参数类型”部分。

唯一需要重写的是虚函数 OnCurrentTaskAborted,但即使这样也不是必需的;相反,自v. 2.0 以来,用户代码可以提供另一个委托 CurrentTaskAborted 来实现相同目的。

用户代码可以在两个不同级别请求中止:浅层深层。深层中止会传播到线程的堆栈帧,从而终止整个线程,而浅层中止仅传播到当前正在执行任务的末尾;在此之后,线程会从队列中获取下一个任务,这会导致线程进入等待状态(如果队列为空)或立即执行下一个任务。

演示

要了解这一切是如何工作的,让我们看看演示。目前,有两个演示按顺序显示;第一个是第一个文章的修改版演示。让我们看看第二个。

有两个不同的委托实例实现了两个不同的算法。用户每次只需输入一个字符作为命令,然后按 [Enter]。根据输入的字符,线程可以被暂停、唤醒或中止。用户可以选择深层中止(退出)或浅层中止,后者中止当前正在执行的任务。对于其他字符,输入的字符的代码点作为模板参数类型传递给委托。为了处理浅层中止,编写了一个派生自 ConveyorThreadWrapper 的类。

class IntegerConveyor : public ConveyorThreadWrapper<int> {
protected:
    virtual void OnCurrentTaskAborted(const int& current) override {
        std::cout << "Current task aborted: " << current;
        ConveyorThreadWrapper<int>::OnCurrentTaskAborted(current);
    } //OnCurrentTaskAborted
}; //class IntegerConveyor

注意对基类 OnCurrentTaskAborted 方法的调用。通常,这不是预期的。我这样做的唯一原因是为了演示委托 CurrentTaskAborted 的操作,如果没有基类的方法,该委托将无法被调用。

cv.CurrentTaskAborted = [](int) -> void {
    std::cout << ". Enter any character: ";
}; //cv.CurrentTaskAborted

另请参见完整的演示

此包装器及其嵌入的线程由控制台主线程驱动。

class ConveyorThreadWrapperDemo : public ConveyorThreadWrapper<int> {
    enum class command {
        quit = 'q', // entire thread
        abort = 'a', // current task
        sleep = 's',
        wakeUp = 'w',
    };
    static const char* help() { return "a: task abort, q: quit, s: sleep, w: wake up, key: "; }
    static bool commandIs(char c, command cmd) { return (int)cmd == (int)c; }
public:
    using natural = unsigned long long int;
    static void Run(natural delayMs) {
        natural max = 100;
        IntegerConveyor cv;
        cv.CurrentTaskAborted = [](int) -> void {
            std::cout << ". Enter any character: ";
        }; //cv.CurrentTaskAborted
        using cType = decltype(cv);
        auto sleep = [delayMs] {
            std::this_thread::sleep_for(std::chrono::milliseconds(delayMs));
        }; //sleep
        auto Hailstone = [] (natural a) -> natural {
            if (a % 2 == 0)
                return a / 2;
            else
                return 3 * a + 1;
        }; //Hailstone (iteration)
        auto HailstoneStoppingTime = [Hailstone] (natural n) -> natural {
            natural stoppingTime = 0;
            while (n != 1) {
                n = Hailstone(n);
                ++stoppingTime;
            } //loop
            return stoppingTime;
        }; //HailstoneStoppingTime
        auto lambdaHailstone = [HailstoneStoppingTime, sleep, max] (cType::SyncDelegate& sync, int value) {
            for (natural count = 1; count < max; ++count) {
                natural stoppingTime = HailstoneStoppingTime(count + value);
                sync(false);
                std::cout
                    << count << ": " << help()
                    << value << " => Hailstone " << stoppingTime << std::endl;
                sleep();
            } //loop
        }; //lambdaHailstone
        natural mod = 1;
        mod = mod << 32; // NOT 1 < 32 !!!
        natural multiplier = 1664525; // Numerical Recipes
        natural increment = 1013904223; // Numerical Recipes
        auto Lgc = [mod, multiplier, increment](natural n) -> natural {
            return (n * multiplier + increment) % mod;
        }; //Lgc
        auto lambdaLgc = [Lgc, sleep, max](cType::SyncDelegate& sync, int value) {
            natural n = value;
            for (natural count = 0; count < max; count++) {
                n = Lgc(n);
                sync(false);
                std::cout
                    << help()
                    << value << " => LGC " << (n) << std::endl;
                sleep();
            } //loop
        }; //lambdaLgc
        SA::delegate<void(cType::SyncDelegate&, int)> delHailstone = lambdaHailstone;
        SA::delegate<void(cType::SyncDelegate&, int)> delLgc = lambdaLgc;
        cv.Start();
        char cmd;
        while (true) {
            std::cin >> cmd;
            if (commandIs(cmd, command::abort))
                cv.Abort();
            else if (commandIs(cmd, command::quit)) {
                cv.Abort(cType::AbortDepth::entierThread);
                break;
            } else if (commandIs(cmd, command::sleep))
                cv.PutToSleep();
            else if (commandIs(cmd, command::wakeUp))
                cv.WakeUp();
            else {
                int cmdValue = (int)cmd;
                if (cmdValue % 2 == 0)
                    cv.Enqueue(delHailstone, cmdValue);
                else
                    cv.Enqueue(delLgc, cmdValue);
            } //if
        } //loop
        cv.Join();
    } //Run
}; //class ConveyorThreadWrapperDemo

此演示展示了lambda 表达式的使用,在许多情况下它们非常方便。或者,委托机制也允许使用静态函数和常量或非常量实例函数,如关于委托的文章中所述。

顺便说一句,注意一个非常明显但重要的 lambda 表达式技术:通过闭包而不是通过参数列表传递一些对象。例如,lambda 表达式 sleep 使用方法 Run 的参数值 delayMs,如 auto sleep = [delayMs]

对于此应用程序,这是至关重要的。它可以作为参数列表传递,但让我们看看 sleep 是如何使用的。最终,所有这些 lambda 表达式都由 lambda 表达式 lambdaHailstonelambdaLgc 调用,并且也通过它们的闭包传递。这两个最终的 lambda 表达式在此上下文中从不被调用。相反,它们被赋值给委托实例。它们的参数列表应符合委托的配置文件(在此处显示),并且不能接受其他实际参数。

请注意,在某个任务执行期间可以连续输入多个字符。这演示了在某个任务执行期间新任务被排队的情况。如果例如在另一个任务执行期间排队了 N 个任务,则最多需要 N+1 次当前任务中止才能将线程带入等待状态。当当前正在执行的任务被中止时,将从队列中获取下一个任务进行执行。

如果一段时间内没有执行浅层中止,当前任务最终将完成执行。这将使线程休眠。通过向队列添加另一个任务可以唤醒线程。我故意指定了较少次数的迭代,以演示此条件。用户代码无需添加另一个虚拟函数或委托来挂钩此事件,因为如果需要,可以将适当的操作简单地写在任务委托代码中。

在此代码示例中,我试图在纯数学算法的代价上缩短文本,而保留“系统”方面的内容。同时,有趣的部分在于这些算法。

演示算法

让我们进行一个与本文主题关系不大的简短有趣休息。在演示应用程序中,为了演示将不同的委托传递给队列,我实现了两种选择(选择的算法取决于用户输入的字符的代码点的奇偶性):计算Hailstone 数列的总停止时间,以及线性同余生成器

第一件事是一个非常复杂的数学问题,很可能在撰写本文时尚未解决;这个问题被认为是“一个极其困难的问题,远远超出了当今数学的范畴”。这个问题被称为 Collatz 猜想,或 3n+1 猜想。它表明,通过递归过程定义的数字序列,对于偶数 nn->n/2,对于奇数 nn->3n+1,最终将达到 1 对于任何初始 n。到目前为止,我们不知道不符合此行为的数字,但尚未证明不存在这样的数字。在我的演示中,我计算了 Hailstone 数列本身,但仅显示了达到 1 所需的迭代次数,这对于给定的初始数字 n 被称为总停止时间。对于初始 n,我取用户输入的字符的代码点与外部循环迭代次数的总和。这个总停止时间值及其对初始数字的依赖性是真正的数学奇迹之一——请阅读文章;它非常有趣。

第二个算法是一个非常简单的生成确定性伪随机数序列的算法:线性同余生成器。该算法需要三个精心选择的数字参数,我根据 Numerical Recipes 来选取。

这些算法的实现过于简单,在此不再展示;欢迎大家在可下载的源代码中查看。

为包装器分配任务

任务就是这样入队的

template<typename PARAM>
class ConveyorThreadWrapper : public ThreadWrapper {
public:
    
    // ...

    void Enqueue(TaskDelegate& task, PARAM argument) {
        std::lock_guard<std::mutex> lock(queueMutex);
        queue.push(new QueueItem(task, argument));
        queueCondition.notify_one();
    } //Enqueue

    // ...

}; //class ConveyorThreadWrapper

线程会被通知有新任务。这很重要,因为如果队列当前为空,线程处于等待状态,需要被唤醒。让我们看看队列同步。

队列同步

队列中存储的元素是 QueueItem 类型,它同时包含指向委托实例的指针和调用所需的参数值。同步原语 queueMutexqueueCondition 用于同步 上面显示的 Enqueue 函数和运行任务的线程的操作。

template<typename PARAM>
class ConveyorThreadWrapper : public ThreadWrapper {
// ...
private:

    // ...

    struct QueueItem {
        QueueItem() = default;
        QueueItem(TaskDelegate& del, PARAM param) {
            delegate = &del;
            parameter = param;
        } //QueueItem
        TaskDelegate* delegate;
        PARAM parameter;
    }; //struct QueueItem

    std::queue<QueueItem*> queue; // protected by queueState and queueMutex
    std::mutex queueMutex;
    std::condition_variable queueCondition;

    void getTask(QueueItem& itemCopy) {
        std::unique_lock<std::mutex> ul(queueMutex);
        queueCondition.wait(ul, [=] {
            return queue.size() > 0;
        });
        auto front = queue.front();
        itemCopy.delegate = front->delegate;
        itemCopy.parameter = front->parameter;
        queue.pop();
        delete front;
    } //getTask
}; //class ConveyorThreadWrapper

此外,我们还需要使任务对任务节流(在上一篇文章中详细描述)和中止保持响应。

线程状态同步

ThreadWrapper 类中,函数 SyncPoint 可以作为受保护的成员函数直接被线程代码访问。在委托方法中,这个函数是不可访问的。取而代之的是,一个基于该函数的委托实例被传递给每个任务委托调用。ThreadWrapper 类提供了一个静态函数,派生类可以使用该函数来创建此类委托实例。

class ThreadWrapper {
// ...
protected:

    // ...

    static void InitializeSyncPointDelegate(SA::delegate<void(bool)>& del, ThreadWrapper* instance) {
        del = SA::delegate<void(bool)>::create<ThreadWrapper, &ThreadWrapper::SyncPoint>(instance);
    } //InitializeSyncPointDelegate

    // ...

}; //class ThreadWrapper

要查看任务委托是如何调用的,我们需要查看重写的函数 Body。我们还可以看到浅层中止是如何处理的。

处理浅层中止

让我们看看重写的方法 Body

class ThreadWrapper {
// ...
protected:

    void Body() override final {
        while (true) {
            QueueItem task;
            getTask(task);
            try {
                (*task.delegate)(syncDelegate, task.parameter);
            } catch (ShallowThreadAbortException&) {
                SetAbort(false);
                OnCurrentTaskAborted(task.parameter);
            } //exception
        } //loop
    } //Body

    // ...

}; //class ThreadWrapper

这段代码示例展示了任务委托是如何调用的。另外,让我们看看 ShallowAbortException 的处理。首先,请注意中止条件已清除。这对于下一个任务在有新元素入队时立即执行是必需的。中止条件在基类 ThreadWrapper 中设置/清除,并进行同步。这在上一篇文章中有所展示。

请记住,所有其他异常,包括常规的“深层ThreadAbortException,都在 Body 的外部上下文中处理。这在上一篇文章中已详细说明。

用户可以通过这种方式选择浅层深层中止。

class ThreadWrapper {
public:

    // ...

    enum class AbortDepth { currentTask, entierThread, };
    void Abort(AbortDepth depth = AbortDepth::currentTask) {
        SetAbort(true, depth == AbortDepth::currentTask);
    } //Abort

    // ...

}; //class ThreadWrapper

委托和委托参数类型

这里是所涉及委托类型的定义方式。

using SyncDelegate = SA::delegate<void(bool)>;
using TaskDelegate = SA::delegate<void(SyncDelegate&, PARAM)>;

TaskDelegate 类型用于存储在队列中,而 TaskDelegate 反映了 SyncPoint 的配置文件。基于 SyncPoint 的委托实例作为第一个参数传递给任务函数,以便应用程序开发者有机会使任务对线程节流和中止保持响应。请注意,委托的实现不必在派生自线程包装器类的类中实现,而可以在用户的类中实现,而该类无法访问 SyncPoint 函数本身。

正如我在关于委托的文章中所解释的那样,委托实例可以由静态函数、实例函数、常量实例函数或lambda 表达式构成。闭包捕获也受支持,这是一个非常微妙的问题(例如,传递 lambda 表达式的实例会破坏捕获)。

类用户负责委托实例的内存管理。首先,委托应在所使用的 lambda 表达式的作用域内调用。C++ 标准明确规定,捕获的上下文不会在 lambda 表达式声明的作用域之外保留。

PARAM 值按值传递并存储在包装器队列中。这似乎是合理的。整个队列的目的是保留委托调用所需的一切。同时,并非所有参数类型都适合作为 ConveyorThreadWrapper 的模板参数。要列出所有要求并不容易,但潜在问题在应用程序开发过程中很容易识别。显然,不应该删除默认构造函数,类型应支持按值复制运算符等。特别是,如果参数结构包含直接或间接指针,那么应用程序开发人员有责任确保在委托调用期间这些指针指向有效的内存对象。这些指针的另一个问题是:如果它们可以在包装器线程中使用,那么它们和被指向的对象应该成为线程同步的主题。应用程序开发人员应负责所有此类问题。一种适合大多数典型情况的封装同步机制在上一篇文章中提供。

兼容性和构建

所有线程包装器解决方案都包含在仅两个文件中,解决方案包含在仅两个文件中。

  • “ThreadWrapper.h”,
  • “InterlockedProperty.h”,
  • “DelegateBase.h”,
  • “Delegate.h”,
  • “ConveyorThreadWrapper.h”,

它们可以添加到任何项目中。

编译器应支持 C++11 或更高版本。对于 GCC,这应该设置为 -std=c++11-std=c++14 等选项。

演示项目提供两种形式:1)使用 Microsoft C++ 编译器和Clang 的 Visual Studio 2015 解决方案和项目 - 请参阅“ThreadWrapper.sln”;2)使用GCC 的 Code::Blocks 项目 - “ThreadWrapper.cbp”。对于所有其他选项,可以通过在解决方案文件目录的子目录“Cpp”中添加所有“*.h”和“*.cpp”文件来组装项目或 make 文件。

我已在 Visual Studio 2015、Clang 4.0.0、GCC 5.1.0 上测试了代码。

C++ 选项包括“禁用语言扩展”(Microsoft 和 Clang 为 /Za),这对于 Microsoft 似乎至关重要。

版本

1.0

初始版本,2017 年 3 月 21 日

2.0

2017年3月24日

通过更好地重用 ThreadWrapper 代码,ConveyorThreadWrapper 类得到了简化,ThreadWrapper 也得到了更新。线程状态同步与队列同步分离,这可能对吞吐量更有利。演示两个类的演示代码经过重新设计。上一篇文章 现代 C++ 的传送带线程包装器 已更新。

2.1

2017年10月29日

修复了 ThreadWrapper::ExceptionCaught 函数签名中的一个错误。应该是:virtual void ExceptionCaught(std::exception& exception) {}

结论

我确实认为,ThreadWrapperConveyorThreadWrapper 这两种线程包装器类型涵盖了大多数典型实际案例中最重要的编程模型。此外,它们还可以帮助简化和优化一般的线程编程。这两种包装器类型中的第一种,由于专业化程度较低,因此不对利用通用 C++ 线程的全部功能施加任何限制。

我将非常感谢所有信息性的批评、问题报告、注释和改进建议。

© . All rights reserved.