65.9K
CodeProject 正在变化。 阅读更多。
Home

ThreadPool 实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.36/5 (8投票s)

2005年4月28日

5分钟阅读

viewsIcon

99299

downloadIcon

1818

本文描述了一个线程池的实现。

引言

本文是关于线程池的。线程池管理客户端的请求。一个请求是指向某个函数或方法的指针、该函数的参数、请求的标识号和优先级号。管理是指存储客户端的请求,并通过不同的线程并行执行它们。执行顺序按优先级,这个调度器是非抢占式的——一旦线程开始执行,就没有任何东西可以停止它。

该代码包含了面向对象设计(OOD)、多态、多线程、同步、泛型编程(模板)和 STL 容器等有趣的主题。

我附上了 `pool` 类的源代码和一个简单的 main 函数,展示了如何使用它。

接口

线程池类的接口非常简单,只包含几个函数。

  • 在构造函数 `ctor` 中,它接收一个限制并行运行线程数量的数字。例如,如果这个数字是 1,我们就得到一个顺序执行。
  • Run 函数创建了主线程,它负责所有的管理工作。这个函数不应该被调用超过一次;每个对象池只有一个池。如果你想创建很多池,请以正确的方式创建它们(例如:Pool p1 (2), p2 (2) ;),线程池类不是单例模式。
  • Stop 函数会终止主线程。它会保存正在提交的请求。
  • Enqueue 函数向线程池添加一个新请求。之所以命名为 enqueue(入队),是因为请求被存储在优先级队列中。这个函数是线程安全的。
  • Wait 函数等待直到所有请求都完成。
  • Wait for request 函数等待直到一个特定的请求完成。
  • 在析构函数 `dtor` 中,它会停止管理,也就是说,没有 `Stop`、`Pause` 或类似的功能。如果你想停止一个线程池,就销毁它。

调用函数的顺序并不重要。你可以先调用 Run,然后再 Enqueue 一个新请求。

结构

实现

请求

请求是要执行的函数或方法。函数是指 C 风格函数,也称为“__cdecl”。方法是指类成员函数——类内部的非静态函数,也称为“__thiscall”。对于函数来说,传递函数指针很简单,它就可以工作,但在方法的情况下,事情就不那么容易了。要执行一个方法,你需要一个类的对象。这是执行方法的一种方式。请参阅 ThreadRequestMethod.h 文件。

template <typename ClassT, typename ParamT = LPVOID> 
class ThreadRequestMethod : public ThreadRequestBase 
{ 
      ... 
      virtual void Execute() { (m_Client->*m_ClientMethod)(m_Param); 
      ... 
 private: 
      ClassT *m_Client; 
      void (ClassT::*m_ClientMethod)(ParamT *param); 
      ParamT *m_Param; 
};

优先级队列

为了实现优先级队列,我使用了 STL。STL 中有一个现成的优先级队列。有一个方法可以“教会”STL 队列如何处理队列中成员的优先级。

这种方法是定义一个仿函数:一个继承自“binary_function”对象的结构,并重写 operator ()。这个仿函数定义在线程池的私有 struct 中。

// functor - used in stl container - priority queue.
template <class ClassT>
struct less_ptr : public binary_function<ClassT, ClassT, bool> 
{
    bool operator()(ClassT x, ClassT y) const
    {
        return x->GetPriority() < y->GetPriority();
    }
};

这个仿函数对于包含“int GetPriority ()”函数的任何类都适用。现在,优先级队列的定义如下:

priority_queue <
        RequestWrapper*, 
        vector<RequestWrapper*>, 
        less_ptr<RequestWrapper*>   
    > RequestQueue;

第三个模板定义了如何管理优先级。

队列包含请求包装器(Request Wrappers)。它封装了来自客户端的请求,并包含一个指向线程池的指针。这是必要的,因为可以在单独线程中执行的函数必须是 C 函数或类静态函数(我不知道其他选项)。因此,包装器包含一个指向“this”的指针。

多线程和同步

每个请求都会获得一个线程来执行其函数。可能会有很多线程并行运行,并且有很多变量是每个线程都会访问的。为了保护这些变量的安全,我使用 Windows API 的“临界区”来保护它们。在这个线程池的第一个版本中,我使用了互斥锁(Mutexes),但这样做效率不高,正如 MSDN 上的下表所示:

MSDN

表 1. 同步对象摘要

名称

相对速度

跨进程

资源计数

支持的平台

临界区

快速

否(独占访问)

95/NT/CE

互斥体

Slow

否(独占访问)

95/NT/CE

信号量

Slow

95/NT

事件

Slow

是*

95/NT/CE

受限区域

快速

95/NT/CE

* 事件(Events)可用于资源计数,但它们不会为您跟踪计数。

这是确保一次只有一个线程执行“// do something …”的一种方法。

CRITICAL_SECTION m_CSecQueue; 
InitializeCriticalSection(&m_CSecQueue); 

EnterCriticalSection(&m_CSecQueue);
// do something …
LeaveCriticalSection(&m_CSecQueue);
DeleteCriticalSection(&m_CSecQueue);

我使用的另一个 Windows API 功能是事件(events)。我使用它的原因是为了防止 CPU 浪费。这是创建事件的一种方法:

HANDLE event = CreateEvent (NULL, false, false, "");

这是使用事件的一种方法:

  1. WaitForSingleObject (event, INFINITE);
  2. SetEvent (event);

请参阅“RunMainThread”函数,了解它是如何节省 CPU 时间的。

它是如何工作的

这是请求的生命周期。首先是客户端方面:

创建

ThreadRequestBase *r = new ThreadRequest<Param>(&func, param, priority);

提交到线程池

pool->Enqueue(r);

启动一个线程池

 bool res = Pool->Run();

管理

这是线程池方面(或线程池内部):

请求在 Enqueue 函数中被添加到优先级队列。在主线程函数中,有一个无限循环,循环内部会检查线程池是否可以执行另一个请求。它会从队列中 Dequeue(出队)一个请求,并运行一个线程池函数(不是客户端请求函数),然后增加一个计数正在运行线程数的变量。在线程池函数内部,会执行客户端函数,完成后减少正在运行线程数,并通过 SetEvent 向主线程发出信号,表明一个请求已完成。使用这个事件可以防止主线程浪费 CPU。然后,主线程不能执行另一个请求,它在一个阻塞函数中等待这个信号。

待改进

线程池为每个请求创建一个新线程。当请求完成时,线程就结束了。这不是一种有效的方式,但它非常简单。因此,线程池管理的有效解决方案是创建正确数量的线程并使用它们来执行请求。

© . All rights reserved.