65.9K
CodeProject 正在变化。 阅读更多。
Home

一个异常安全的OO线程池框架

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.86/5 (58投票s)

2003年1月14日

CPOL

7分钟阅读

viewsIcon

325337

downloadIcon

2677

提供了一个插件式的多线程环境,使用异常安全的线程池和函数对象。

引言

我查阅了互联网上关于多线程和线程池设计的许多文章,但都无法很好地适应我的需求。我看到的大多数线程池,其线程管理逻辑都与线程正在执行的实际函数交织在一起。我想要一个不同的概念视图。为此,我放弃了从线程派生并分配函数指针来执行,而是转向了命令模式,该模式来自《设计模式:可复用面向对象软件元素》一书中非常有用的Gamma、Helm 等人。请求作为函数对象提交,允许函数对象维护自己的环境。

从根本上说,您将一个函数对象形式的请求提交到一个队列,然后让线程池处理其余的事情。我不想“加入”线程回主线程,或者“等待”完成。线程仅仅是一个执行进程。它不关心它正在执行什么,它只是继续做路径中的下一件事。

特点

  • 异常安全
  • 可配置的线程数
  • 可配置的队列长度

用法

首先,您需要

#include "ThreadPool.h"

然后,创建ThreadPool对象。最简单的方法就是创建ThreadPool对象,如下所示:

ThreadPool myPool;

完成此操作后,您可以调用myPool.accept()来准备线程池接受连接。

第二种方法提供了更多的控制。我们可以从ThreadPool类派生,并覆盖onThreadStartonThreadFinish方法来提供线程特定的信息。

class COMPool : public ThreadPool
{
public:
    void onThreadStart(int threadId)
    {
        ::OleInitialize(NULL);
    }
    void onThreadFinish(int threadId)
    {
        ::OleUninitialize();
    }
};

我们还需要创建一或多个函数对象,以便请求线程池来处理。这些函数对象是通过从ThreadRequest派生来创建的。如果您希望传递参数或从函数对象中检索信息,可以在构造函数中提供它们。此外,任何必要的清理都可以在析构函数中完成。

class SpecialRequest : public ThreadRequest
{
public:
    SpecialRequest(int param1, int param2, int& retStat) : 
        myLocal1(param1), myLocal2(param2), myStatus(retStat)
    {
        // Any constructor setup stuff, like transaction begin or 
        //file opens or whatever else the functor
        // may need to operate with.
    }

    void operator()(int)
    {
        // Do whatever you want in here - 
        //but DON'T let exceptions propogate.
        retStatus = myLocal1 + myLocal2;   
        // OK - so it just adds 2 integers....   but it's multithreaded!!!
    }

private:
    int myLocal1;
    int myLocal2;
    int& retStatus;
};

要提交请求,我们使用预先定义的ThreadPool对象,并将函数对象提交到队列中。在这种情况下,同时创建它:

myPool.submitRequest(new SpecialRequest(1, 2, returnVal);

请注意,SpecialRequest有两个参数正在传递,1和2,以及一个returnValue的引用,供函数对象在完成后填充。请注意,函数对象必须使用new在堆内存上创建,因为线程池将删除它。

一旦我们完成了线程池的工作,就可以使用以下命令将其关闭:

myPool.shutdown();

因此,我们的主循环如下所示:

int main(int, char*)
{
    int result;
    COMPool myCOMThreadPool;
    // We tell it to accept requests now
    myCOMThreadPool.accept();
    // Add 1 and 2, and store in result     
    myCOMThreadPool.submitRequest(new SpecialRequest(1, 2, result));
    myCOMThreadPool.shutdown();
     // And output the result.
    std::cout << result;
}

演示项目包含一个更明确的示例,使用了多个线程、循环和线程统计信息。

注意:处理代码可能抛出的任何异常非常重要,您需要在重写的operator()(int)中进行处理。acceptHandler不会允许任何异常传播出其循环,以维护完整性。这实际上不是一件坏事,因为您的函数对象无论如何都应该能够处理自己的异常。

细节

对于那些真正想知道它是如何工作的个人来说,整个cpp和h文件都使用doxygen进行了注释,棘手的聪明之处也进行了注释。我在这里将尝试解释我为什么做了某些事情。

我试图遵循的基本原则是,线程是一个完全独立的实体,与它正在执行的事物分开。这样做的目的是完全分离线程的管理和执行。这将使线程池能够轻松地用于数据库调用——有效地模拟异步调用——通过fast-cgi处理HTTP响应,以及计算PI到(在此处插入任意大数)小数位,同时为我制作咖啡。我不希望线程知道函数/函数对象的执行状态,也不希望函数对象知道它是在线程中执行的。

这提供了一种与预期略有不同的编程模型。在我看到的许多实现中,工作线程负责发出完成信号,而“主”执行线程负责等待直到工作线程发出完成信号。然后,主线程需要清理工作线程,或查询工作线程以获取工作线程的结果。其中一些实现传递了任意数量的指针来表示参数和返回值。

在此实现中,工作线程根本不等待。它执行任务然后自行清理。如果您需要获取结果,可以在构造函数中提供一个指向结构的指针或引用,并使用函数对象在其实现中填充该结构。例如,您可以将数据库提交的结果发送到传递的构造函数中,并随时检查它是否已提交。或者,您可以将整个业务对象,如采购订单,传递到构造函数中,并让函数对象负责填充值。线程不在乎。它所做的就是填充并自行清理。如果您的应用程序需要等待某事发生,您可以在函数对象中包含一个信号(事件),并将主线程置于睡眠状态,直到信号被发出。(建议?)同样,您是在等待函数对象完成,而不是线程。这是一个概念上的区别,但我认为它是一个更准确的表示。

我必须添加的第二件事是onThreadStartonThreadFinish调用。在简单的情况下,这是一个空操作,但是通过从ThreadPool派生,您可以使它们执行任何您喜欢的操作。我必须添加这些,因为在我将它用于OLEDB调用时,COM需要每个线程进行初始化。

当队列达到最大长度时,队列本身会阻塞,在提供的示例中,这也会阻塞主线程。其效果是阻止额外的请求,从而为池腾出赶上时间。

请参阅底部的我的待办事项列表,了解我对此的一些改进想法。

我之所以使用函数对象,是因为对于OLEDB调用,我确实需要事务的完整性。为了提供这一点,函数对象的构造函数可以执行beginTransaction(),析构函数则会根据私有的transactionSuccess标志调用commit()rollback()。这确保了无论结果如何,都会调用commit或rollback,从而大大提高了异常安全性。在这样实现之后,我意识到函数对象是多么有效,所以我最终将它们用于通用解决方案。

这样做的优点是函数对象包含执行所需的所有内容。因为它可以维护状态,所以您可以实际在构造函数中传递参数,并使用这些参数来获取有关当前函数对象的信息,包括其执行状态和返回的数据。但最棒的是,通过使用函数对象,您可以很容易地在多线程环境之外测试函数对象本身——从而轻松集成单元测试。

函数对象的另一个好处是它们可以维护自己的环境。在FastCGI应用程序的情况下,我需要传递输出结构的地址,以便将输出返回给Web服务器。我通过在构造函数中将环境(包括CGI参数、错误流和输出流)传递给函数对象来实现这一点。这意味着同一个函数对象是完全线程安全的,因为它是一个完全独立的对象的,但它却能访问当时创建的环境。在执行时,它总是会写入正确的输出流。这提供了线程安全,而无需互斥锁和关键部分等。

acceptHandler已被声明为不抛出异常。这是必需的,因为异常可能会破坏其他线程的安全性。如果函数对象抛出异常,并且它没有在函数对象中处理(这是糟糕的!),处理程序将简单地吞噬该异常,您将看不到它。但是,您不应该依赖此功能,因为它可能会更改。(不确定如何或为什么,但如果有人提出了更好的方法,我会立即修改它)

待办事项

所有这些都是初步的想法,在实际需要之前我不会实现它们。

  • 更多的调试信息——即线程信息块等。
  • 线程池监控
    • 平均队列长度
    • 平均响应时间
  • 一个可选的QueueFullException,而不是仅仅阻塞。我可能会将其作为模板策略类来实现(参见Modern C++ Design,Andrei Alexandrescu)。
  • 动态添加和删除线程。
  • 强制关闭

致谢

特别感谢Taka审阅了本文的代码。

更改和编辑

  • 2002/01/18:修正了它上面的撇号,因为它让我感到烦恼。
© . All rights reserved.