创建 C++ 线程类






4.74/5 (27投票s)
A look at platform independent threading in C++.
引言
最近,我弟弟问我是否有简单的方法可以创建一个支持面向对象线程的 C++ 类。我过去写过许多多线程库;然而,它们都是用 C 写的。C 语言一直是我进行底层编程的首选语言;我用 C++ 进行 GUI 开发。尽管 CodeProject 上有许多优秀的面向对象线程的示例,但没有一个引入的类能满足我弟弟所有的需求和我自己的好奇心。他想要一个具有以下属性的线程类:
- 它支持事件驱动和基于间隔的异步线程。
- 它支持同类和专用线程。
- 它提供一个 FCFS(先进先出)堆栈队列,用于发布和处理多个任务。
- 它是可移植的。
- 它易于实现。
为了支持新类 CThread,还开发了其他支持类。这些类包括 CMutexClass、CEventClass 和 CTask 类。CMutexClass 和 CEventClass 提供资源管理,而 CTask 类是从派生类派生的基类,用于支持同类异步线程。
什么是线程?
每个进程至少有一个控制线程,并且每个进程一次可以执行至少一个任务。具有多个控制线程的进程定义了一个多线程进程。多线程进程允许多个任务在进程环境中异步运行。
资源管理—线程同步
由于多线程进程中的线程共享相同的资源,因此需要操作系统级别的控制机制来确保数据完整性。当一个线程正在修改一个变量而另一个线程试图读取它,或者两个线程试图同时修改同一个变量时,就会发生数据完整性丢失。为了防止这种情况,操作系统提供了一个互斥对象,简称互斥锁。在多线程应用程序中,通过编程部署的互斥锁可以防止多个线程同时访问单个资源。当一个线程需要访问资源时,它必须首先获取互斥锁。一旦一个线程获得了互斥锁,其他试图获取同一互斥锁的线程将被阻塞,并进入低 CPU 使用率的等待状态。一旦一个线程完成了数据访问,它就会释放相应的互斥锁;这允许其他线程获取它并访问相应的数据。
互斥锁的糟糕实现可能导致资源耗尽,也称为死锁。当一个或多个线程争夺同一资源时,就会发生资源耗尽。
示例
| 线程 A | 线程 B | 
|---|---|
| 获取互斥锁(1)以修改数据项 1 | 获取互斥锁(2)以修改数据项 2 | 
| 想要互斥锁(2)以查看数据项 2 | 想要互斥锁(1)以查看数据项 1 | 
上面的示例中发生死锁是因为线程 A 正在阻塞,试图获取线程 B 持有的互斥锁(2)。线程 B 正在阻塞,试图获取线程 A 持有的互斥锁(1)。
与互斥锁类似,条件变量(在 UNIX 中)是另一种形式的同步机制。条件变量允许线程进行会合。它们允许一个线程通知另一个线程发生了变化。在 Windows 中,这些是事件。
操作系统调用
下表列出了用于实现 CMutexClass、CEventClass、CTask 和 CThread 类中线程的各种函数。
| 函数 | 操作系统 | 描述 | 使用的类 | 
|---|---|---|---|
| CreateThread | Windows | 创建一个 Windows 线程 | CThread | 
| pthread_create | UNIX - POSIX 线程 | 创建一个 UNIX 线程 | CThread | 
| pthread_join | UNIX - POSIX 线程 | 等待 UNIX 线程终止 | CThread | 
| pthread_attr_init | UNIX - POSIX 线程 | 将线程属性结构初始化为默认值 | CThread | 
| pthread_attr_setstacksize | UNIX - POSIX 线程 | 设置线程属性结构的大小值 | CThread | 
| WaitForSingleObject | Windows | 等待一个对象被信号化 | CThread、CMutexClass、CEventClass | 
| CreateMutex | Windows | 创建命名或未命名的互斥锁 | CMutexClass | 
| CloseHandle | Windows | 释放分配给 Windows 句柄的资源 | CMutexClass、CEventClass、CThread | 
| ReleaseMutex | Windows | 释放由 WaitForSingleObject之前锁定的互斥锁 | CMutexClass、CEventClass | 
| pthread_mutexattr_init | UNIX - POSIX 线程 | 初始化互斥锁属性结构 | CMutexClass、CEventClass | 
| pthread_mutex_init | UNIX - POSIX 线程 | 使用提供的属性结构初始化互斥锁 | CMutexClass、CEventClass | 
| pthread_mutex_lock | UNIX - POSIX 线程 | 锁定互斥锁 | CMutexClass、CEventClass | 
| pthread_mutex_unlock | UNIX - POSIX 线程 | 解锁之前由 pthread_mutex_lock锁定的互斥锁 | CMutexClass、CEventClass | 
| pthread_mutex_destroy | UNIX - POSIX 线程 | 释放分配给互斥锁的资源 | CMutexClass、CEventClass | 
| CreateEvent | Windows | 创建一个 Windows 事件对象 | CEventClass | 
| SetEvent | Windows | 将 Windows 事件对象设置为信号状态 | CEventClass | 
| pthread_cond_signal | UNIX - POSIX 线程 | 解除在 pthread_cond_wait上阻塞的线程 | CEventClass | 
| pthread_cond_wait | UNIX - POSIX 线程 | 阻塞在条件变量上 | CEventClass | 
| pthread_cond_init | UNIX - POSIX 线程 | 初始化条件变量 | CEventClass | 
CMutexClass 类
CMutexClass 类封装了系统级别的互斥锁函数和互斥锁同步对象。互斥锁创建发生在对象实例化期间,互斥锁创建为未阻塞状态。该类提供了两个成员函数:Lock 和 Unlock。Lock 成员函数锁定一个互斥锁,将其分配给调用线程。互斥锁将保持锁定状态,直到调用线程使用 Unlock 成员函数释放它。试图通过调用 Lock 成员函数获取已锁定互斥锁的线程将被阻塞,并进入低 CPU 消耗的等待状态,直到阻塞线程释放互斥锁。
成员函数
| 函数 | 描述 | 
|---|---|
| void CMutexClass() | 构造函数 | 
| void Lock() | 锁定互斥锁对象,如果阻塞则等待 | 
| void Unlock() | 解锁/解除之前阻塞的互斥锁 | 
示例
int g_iStorage = 0; CMutexClass MyMutex; void StoreValue( int *pInt ) { MyMutex.Lock(); //the gate keeper. only one thread //allowed in at a time g_iStorage = *pInt; //protected data, critical code section MyMutex.Unlock(); //unblocks, allowing another thread to //access g_iStorage }
CEventClass 类
CEventClass 类封装了 Windows 事件函数、Windows 事件对象、UNIX 条件变量函数以及 UNIX 条件变量。包含在 CEventClass 类中的函数是在 Windows 下的 SetEvent 和 CreateEvent,以及在 UNIX 下的 phtread_cond_init、pthread_cond_destroy、pthread_cond_signal 和 pthread_cond_wait。事件同步对象在 UNIX 中称为条件变量,但为了简化,我将同时称条件变量和事件对象为事件对象。
成员函数
| 函数 | 描述 | 
|---|---|
| void Set() | 将事件状态设置为信号状态,通知被阻塞的线程。 | 
| BOOL Wait() | 将调用线程置于阻塞状态,直到事件状态被设置为信号状态。成功返回 TRUE,失败返回FALSE。 | 
| void Reset() | 将已信号化的事件重置为未信号化。 | 
接收线程使用的事件对象的示例
CEventClass event; . . //thread code . . while(bContinueRunning) { event.Wait(); // wait for an event to occur // perform some task . . event.Reset(); // reset the event to un-signaled } . .
一个线程用于信号化另一个线程的事件对象的示例
CEventClass event; . . // change some data . . event.Set(); // notify thread that an event has occured, // set event to signaled . .
CTask 类和非专用线程
在我见过的许多线程编程示例中,线程处理的数据存储在全局变量中,并受互斥锁保护。操作数据的指令集成在线程函数中。我将这种形式的线程定义为专用异步线程(SAT)。理想情况下,数据和处理数据的相应功能应封装在同一个对象中。我将这种形式的线程定义为同类异步线程(HAT)。在 HAT 下,线程不是专用的。例如,在 HAT 解决方案中不会有打印线程和 I/O 线程。取而代之的是,单个线程可以执行这两种类型的任务,因为任务被实现为完整的对象;也就是说,它们包含处理数据所需的数据和功能。CTask 类是支持 HAT 线程的基类。
typedef enum { TaskStatusNotSubmitted, TaskStatusWaitingOnQueue, TaskStatusBeingProcessed, TaskStatusCompleted } TaskStatus_t; class CTask { private: CMutexClass m_mutex; TaskStatus_t m_state; ThreadId_t m_dwThread; public: void SetTaskStatus(TaskStatus_t state) { m_mutex.Lock(); m_state=state; m_mutex.Unlock(); } void SetId(ThreadId_t *pid) { memcpy(&m_dwThread,pid,sizeof(ThreadId_t)); } /** * * Wait * waits for upto timeoutSeconds for a task * to complete * **/ BOOL Wait(int timeoutSeconds) { timeoutSeconds = timeoutSeconds * 1000; if( Status() != TaskStatusCompleted && timeoutSeconds > 0 ) { Sleep(100); timeoutSeconds = timeoutSeconds - 100; } if( Status() == TaskStatusCompleted ) return TRUE; return FALSE; } /** * * Where * returns current state of a task * **/ TaskStatus_t Status() { TaskStatus_t state ; m_mutex.Lock(); state = m_state; m_mutex.Unlock(); return state; } void Thread(ThreadId_t *pId) { memcpy(pId,&m_dwThread,sizeof(ThreadId_t)); } CTask(){m_state=TaskStatusNotSubmitted; memset(&m_dwThread,sizeof(ThreadId_t),0); } ~CTask(){} virtual BOOL Task()=0; };
成员函数
| 函数 | 描述 | 
|---|---|
| m_mutex | 互斥锁对象同步对象。 | 
| virtual BOOL Task() | 由 CThread对象调用以执行任务。 | 
| TaskStatus_t Status() | 确定任务状态: TaskStatusNotSubmitted、TaskStatusWaitingOnQueue、TaskStatusBeingProcessed或TaskStatusCompleted。 | 
| void Thread(ThreadId_t *pid) | 返回处理线程的线程 ID。 | 
| BOOL Wait(int iTimeInSeconds) | 将调用线程置于等待状态,直到任务完成或 iTimeInSeconds超时。如果任务未在iTimeInSeconds内完成,则返回FALSE;否则返回TRUE。 | 
我没有定义 CThread 类;然而,它的定义对于理解它如何与 CTask 对象交互不是必需的。下面的列表显示了这两种对象类型如何交互的概述。
处理 CTask 对象的流程
- 将 CTask对象传递给CThread对象进行处理。
- CThread对象将- CTask对象放入先进先出队列。
- CThread对象将- CTask对象的状态设置为- TaskStatusWaitingOnQueue。
- CThread对象从等待队列中弹出- CTask对象。
- CThread对象将- CTask对象的状态更改为- TaskStatusBeingProcessed。
- CThread对象调用- CTask对象的成员函数“task”来执行任务。
- CThread对象将- CTask对象的状态更改为- TaskStateCompleted。
CThread 类,整合所有内容
成员函数
| 函数 | 描述 | 
|---|---|
| void CThread() | 构造函数初始化对象数据并启动线程。 | 
| void ~CThread() | 如果线程正在运行,则终止它,并释放资源。 | 
| BOOL Event(LPVOID lpvData) | 将数据块放入事件堆栈/队列,并通知对象的线程数据正在等待处理。 | 
| BOOL Event(CTask *pTask) | 将 CTask对象放入事件堆栈/队列,并通知对象的线程任务正在等待执行。 | 
| int GetEventsPending() | 返回事件堆栈上等待的事件数。 | 
| ThreadId_t GetId() | 返回对象的线程 ID。 | 
| DWORD GetErrorFlags() | 返回对象的错误标志。如果没有错误,则返回 0 ( NO_ERRORS)。如果存在错误,则会设置以下一个或多个标志:MUTEX_CREATION(无法创建互斥锁对象)、EVENT_CREATION(无法创建事件对象)、THREAD_CREATION(无法创建对象的线程)、ILLEGAL_USE_OF_EVENT(对基于间隔的线程调用了Event成员函数)。 | 
| BOOL PingThread(DWORD dwTimeoutMilli) | 确定对象的线程是否正在运行。如果线程正在运行,则返回 TRUE;如果不在运行,则返回FALSE。超时以秒为单位。 | 
| SetPriority(DWORD dwPriority) | 设置线程优先级,仅限 Windows。 | 
| BOOL Start() | 启动对象的线程。 | 
| BOOL Stop() | 停止对象的线程。 | 
| void SetIdle(DWORD dwIdle) | 更改线程的空闲时间(以毫秒为单位),用于基于间隔的线程。 | 
| SetThreadType(ThreadType_t typ,DWORD dwIdle) | 在 ThreadTypeEventDriven和ThreadTypeIntervalDriven之间更改线程类型。 | 
| m_mutex | 用于同步的互斥锁对象,请参阅 CMutexClass。 | 
| ThreadState_t ThreadState() | 返回线程的状态: ThreadStateBusy(线程正在处理事件)、ThreadStateWaiting(线程正在等待新事件)、ThreadStateDown(线程未运行)、ThreadStateShutingDown(线程正在关闭过程中)。 | 
现在您已经了解了支持类,是时候看看主类 CThread 类了——它是主力。CThread 类支持两种类型的线程:事件驱动和间隔驱动。事件驱动线程是一种线程,它保持在等待状态,阻塞在事件对象上,直到事件对象的状态从无信号变为有信号。当另一个线程将任务放入 CThread 对象的队列并向其事件对象发送信号以通知对象线程时,就会发生新事件。一旦被信号化,线程就会唤醒并从其事件队列中弹出任务,直到队列为空。
CThread 对象为每个任务调用 OnTask 成员函数。任务按先进先出(FCFS)顺序处理。因此,首先处理放入 CThread 对象队列的第一个任务,然后是第二个,依此类推。互斥锁对象同步队列访问,允许在线程处理旧事件的同时将其他事件放入队列。一旦队列为空,线程就会将事件对象重置为无信号状态,并返回等待事件对象。CThread 类支持两种类型的事件驱动线程:专用线程和非专用线程,请参阅 CTask。
要实现专用线程,必须从 CThread 类派生一个新类。派生类应包含 OnTask 的重新定义实现,以处理对象的各种数据类型。
示例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask( LPVOID lpv ) { ThreadId_t id; GetId(&id); if( lpv ) { int *pInt = (int *)lpv; //don't use cout here, output could be broken up due to //threading printf("\tthread(%ld, counter+%d=%d, counter incremented\n", id,*pInt,(counter+=*pInt)); } return TRUE; } virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("\tthread(%ld, counter++= %d, counter incremented)\n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[]) { // object allocated and thread started CIncrementThread MyThread; int two=2; while( MyThread.GetValue() < 20 ) { MyThread.Event(); // increment value by one Sleep(100); // pauses the root thread for 100 // milliseconds } MyThread.Reset(); while( MyThread.GetValue() < 40 ) { MyThread.Event(&two); Sleep(100); } } OUTPUT: thread(5220, counter++= 1, counter incremented) thread(5220, counter++= 2, counter incremented) thread(5220, counter++= 3, counter incremented) thread(5220, counter++= 4, counter incremented) thread(5220, counter++= 5, counter incremented) thread(5220, counter++= 6, counter incremented) thread(5220, counter++= 7, counter incremented) thread(5220, counter++= 8, counter incremented) thread(5220, counter++= 9, counter incremented) thread(5220, counter++= 10, counter incremented) thread(5220, counter++= 11, counter incremented) thread(5220, counter++= 12, counter incremented) thread(5220, counter++= 13, counter incremented) thread(5220, counter++= 14, counter incremented) thread(5220, counter++= 15, counter incremented) thread(5220, counter++= 16, counter incremented) thread(5220, counter++= 17, counter incremented) thread(5220, counter++= 18, counter incremented) thread(5220, counter++= 19, counter incremented) thread(5220, counter++= 20, counter incremented) thread(5220, counter+2=2, counter incremented thread(5220, counter+2=4, counter incremented thread(5220, counter+2=6, counter incremented thread(5220, counter+2=8, counter incremented thread(5220, counter+2=10, counter incremented thread(5220, counter+2=12, counter incremented thread(5220, counter+2=14, counter incremented thread(5220, counter+2=16, counter incremented thread(5220, counter+2=18, counter incremented thread(5220, counter+2=20, counter incremented thread(5220, counter+2=22, counter incremented thread(5220, counter+2=24, counter incremented thread(5220, counter+2=26, counter incremented thread(5220, counter+2=28, counter incremented thread(5220, counter+2=30, counter incremented thread(5220, counter+2=32, counter incremented thread(5220, counter+2=34, counter incremented thread(5220, counter+2=36, counter incremented thread(5220, counter+2=38, counter incremented thread(5220, counter+2=40, counter incremented
在上面的示例中,我从 CThread 类派生了一个 CIncrementThread 类。在类定义中,我重新定义了 OnTask() 和 OnTask(LPVOID) 虚成员函数。在 OnTask() 实现中,我给对象的计数器变量加一。另一个 OnTask 成员函数接受一个指向整数值的指针,并将指针的值添加到计数器成员变量。这个例子说明了线程可以处理的两种类型的事件。由于计数器变量可能会被多个线程访问,我使用 CThread::m_mutex 对象来确保它只被一个线程访问。
HAT(同类异步线程)线程使用 CThread 和 CTask 类来实现。
示例
#include "Thread.h" class CTaskIncrementer: public CTask { private: int counter; int incr; public: void SetIncr(int iValue) { m_mutex.Lock(); incr = iValue; m_mutex.Unlock(); } int GetIncrementValue() { int incrValue; m_mutex.Lock(); incrValue=incr; m_mutex.Unlock(); return incrValue; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } BOOL Task() { ThreadId_t id; Thread(&id); m_mutex.Lock(); printf("\tthread(%ld, counter+%d=%d, counter incremented\n", id,incr,(counter+=incr)); m_mutex.Unlock(); return TRUE; } CTaskIncrementer(){counter=0;} ~CTaskIncrementer(){} }; int main(int argc, char *argv[]) { CTaskIncrementer incr; CThread thr; incr.SetIncr(2); while( incr.GetValue() < 40 ) thr.Event(&incr); } OUTPUT: thread(5700, counter+2=2, counter incremented thread(5700, counter+2=4, counter incremented thread(5700, counter+2=6, counter incremented thread(5700, counter+2=8, counter incremented thread(5700, counter+2=10, counter incremented thread(5700, counter+2=12, counter incremented thread(5700, counter+2=14, counter incremented thread(5700, counter+2=16, counter incremented thread(5700, counter+2=18, counter incremented thread(5700, counter+2=20, counter incremented thread(5700, counter+2=22, counter incremented thread(5700, counter+2=24, counter incremented thread(5700, counter+2=26, counter incremented thread(5700, counter+2=28, counter incremented thread(5700, counter+2=30, counter incremented thread(5700, counter+2=32, counter incremented thread(5700, counter+2=34, counter incremented thread(5700, counter+2=36, counter incremented thread(5700, counter+2=38, counter incremented thread(5700, counter+2=40, counter incremented
间隔驱动线程是一种线程,它会在预设的间隔唤醒,检查环境是否有变化,处理环境中的变化,然后进入下一个间隔的睡眠,再次唤醒并重复所有操作。要实现间隔驱动线程,需要派生一个重定义了 OnTask(LPVOID) 的 CThread 类。实例化线程后,调用 SetThreadType 成员函数,并将参数设置为 ThreadTypeIntervalDriven 以及以毫秒为单位的间隔。
示例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("\tthread(%ld, counter++= %d, counter incremented)\n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[] ) { CIncrementThread thr; thr->SetThreadType(ThreadTypeIntervalDriven,100); Sleep(500); } OUTPUT: thread(6104, counter++= 12, counter incremented) thread(6104, counter++= 13, counter incremented) thread(6104, counter++= 14, counter incremented) thread(6104, counter++= 15, counter incremented) thread(6104, counter++= 16, counter incremented)
结论
就这样,一个功能齐全的线程对象。我在 Linux 上进行了测试,类工作正常。我还没有在 SunOS 或这些类应该支持的任何其他 UNIX 平台上进行测试。在 Windows 上编译时,请务必指定 /Mt 或 /Mtd 用于代码生成;这会将您的应用程序标识为多线程应用程序。对于 Linux,以下 makefile 有效
CC=g++
LIBS=-lpthread -lrt
CFLAGS=-DLINUX -DNANO_SECOND_SLEEP
OBJS=Thread.cpp EventClass.cpp MutexClass.cpp main.cpp
EXECS = thread
all: $(EXECS)
thread: $(OBJS)
    $(CC) $(CFLAGS) -o thread $(OBJS) $(LIBS)
clean:; rm -f *.o $(EXECS)
历史
- (2007 年 10 月 31 日)
- 增加了对 beginthreadex的支持。要编译以使用beginthreadex,请定义USE_BEGIN_THREAD。默认使用CreateThread进行线程创建。附加的项目在预处理器部分定义了USE_BEGIN_THREAD。
- 我忘了 CreateThread如果与 C 运行时函数一起使用,很容易导致内存泄漏。
- (2007 年 11 月 1 日)
- 我引入了更多的逻辑和 ASSERTION来确保CThread对象的完整性。同类和专用线程类型都可以通过SetThreadType成员函数进行物理设置。如果未设置线程类型,线程将根据对成员函数的调用来确定其类型;但是,这不适用于基于间隔的线程。基于间隔的线程必须使用SetThreadType成员函数进行隐式标识。新的完整性测试是为了确保与CThread对象使用的一致性。
- 新增了 AtCapacity和PercentCapacity成员函数,用于确定线程是否真正繁忙。AtCapacity在以下两种情况之一时返回TRUE:线程正在处理事件且其队列已满,或者线程未运行。PercentCapacity返回对象队列的满百分比。这些新函数允许将线程对象放入数组中,并根据其工作负载进行任务分配。
- 新成员函数 SetQueueSize允许调整CThread对象队列的大小。如果队列中等待的元素数量大于请求的队列大小,该函数将返回false。
- Event成员函数已修改为在发布事件之前验证线程是否正在运行。这消除了在首次传递事件时调用- PingThread的需要。
- 错误标志在调用某些成员函数时会自动重置;这使得错误发生只与特定的调用序列相关。
- (2007 年 11 月 2 日)
- 根据 On Freund 的评论(见下方消息),删除了 CEventClass类中对 UNIX 平台不必要的代码。
- (2007 年 11 月 2 日)
- 添加了 try/catch块以确保所有类都能正确使用,响应 On 的消息。
- (2007 年 11 月 8 日)
- 我忘记删除几个用于测试 CEventClass的try-catch块的连续调用m_event.Reset。这只会影响非 Windows 平台。此外,我将m_event.Reset移到了m_event.Wait之后。由于我们处理的是队列,这可能会导致瓶颈。我注释掉了旧代码,以防有人更喜欢旧的方式。
- 示例 main.cpp 包含几个连续调用 m_mutex.lock的情况,这些调用会抛出异常;它们是为了测试CMutexClass的try-catch块。这些调用已被删除。
进行了很多改进
这是本文的最后一期变更。我将开始写我的下一篇文章。由于它将使用这些类,因此将在此处发布其他修复。


