创建 C++ 线程类






4.74/5 (27投票s)
A look at platform independent threading in C++.
引言
最近,我弟弟问我是否有简单的方法可以创建一个支持面向对象线程的 C++ 类。我过去写过许多多线程库;然而,它们都是用 C 写的。C 语言一直是我进行底层编程的首选语言;我用 C++ 进行 GUI 开发。尽管 CodeProject 上有许多优秀的面向对象线程的示例,但没有一个引入的类能满足我弟弟所有的需求和我自己的好奇心。他想要一个具有以下属性的线程类:
- 它支持事件驱动和基于间隔的异步线程。
- 它支持同类和专用线程。
- 它提供一个 FCFS(先进先出)堆栈队列,用于发布和处理多个任务。
- 它是可移植的。
- 它易于实现。
为了支持新类 CThread
,还开发了其他支持类。这些类包括 CMutexClass
、CEventClass
和 CTask
类。CMutexClass
和 CEventClass
提供资源管理,而 CTask
类是从派生类派生的基类,用于支持同类异步线程。
什么是线程?
每个进程至少有一个控制线程,并且每个进程一次可以执行至少一个任务。具有多个控制线程的进程定义了一个多线程进程。多线程进程允许多个任务在进程环境中异步运行。
资源管理—线程同步
由于多线程进程中的线程共享相同的资源,因此需要操作系统级别的控制机制来确保数据完整性。当一个线程正在修改一个变量而另一个线程试图读取它,或者两个线程试图同时修改同一个变量时,就会发生数据完整性丢失。为了防止这种情况,操作系统提供了一个互斥对象,简称互斥锁。在多线程应用程序中,通过编程部署的互斥锁可以防止多个线程同时访问单个资源。当一个线程需要访问资源时,它必须首先获取互斥锁。一旦一个线程获得了互斥锁,其他试图获取同一互斥锁的线程将被阻塞,并进入低 CPU 使用率的等待状态。一旦一个线程完成了数据访问,它就会释放相应的互斥锁;这允许其他线程获取它并访问相应的数据。
互斥锁的糟糕实现可能导致资源耗尽,也称为死锁。当一个或多个线程争夺同一资源时,就会发生资源耗尽。
示例
线程 A | 线程 B |
---|---|
获取互斥锁(1)以修改数据项 1 | 获取互斥锁(2)以修改数据项 2 |
想要互斥锁(2)以查看数据项 2 | 想要互斥锁(1)以查看数据项 1 |
上面的示例中发生死锁是因为线程 A 正在阻塞,试图获取线程 B 持有的互斥锁(2)。线程 B 正在阻塞,试图获取线程 A 持有的互斥锁(1)。
与互斥锁类似,条件变量(在 UNIX 中)是另一种形式的同步机制。条件变量允许线程进行会合。它们允许一个线程通知另一个线程发生了变化。在 Windows 中,这些是事件。
操作系统调用
下表列出了用于实现 CMutexClass
、CEventClass
、CTask
和 CThread
类中线程的各种函数。
函数 | 操作系统 | 描述 | 使用的类 |
---|---|---|---|
CreateThread |
Windows | 创建一个 Windows 线程 | CThread |
pthread_create |
UNIX - POSIX 线程 | 创建一个 UNIX 线程 | CThread |
pthread_join |
UNIX - POSIX 线程 | 等待 UNIX 线程终止 | CThread |
pthread_attr_init |
UNIX - POSIX 线程 | 将线程属性结构初始化为默认值 | CThread |
pthread_attr_setstacksize |
UNIX - POSIX 线程 | 设置线程属性结构的大小值 | CThread |
WaitForSingleObject |
Windows | 等待一个对象被信号化 | CThread 、CMutexClass 、CEventClass |
CreateMutex |
Windows | 创建命名或未命名的互斥锁 | CMutexClass |
CloseHandle |
Windows | 释放分配给 Windows 句柄的资源 | CMutexClass 、CEventClass 、CThread |
ReleaseMutex |
Windows | 释放由 WaitForSingleObject 之前锁定的互斥锁 |
CMutexClass 、CEventClass |
pthread_mutexattr_init |
UNIX - POSIX 线程 | 初始化互斥锁属性结构 | CMutexClass 、CEventClass |
pthread_mutex_init |
UNIX - POSIX 线程 | 使用提供的属性结构初始化互斥锁 | CMutexClass 、CEventClass |
pthread_mutex_lock |
UNIX - POSIX 线程 | 锁定互斥锁 | CMutexClass 、CEventClass |
pthread_mutex_unlock |
UNIX - POSIX 线程 | 解锁之前由 pthread_mutex_lock 锁定的互斥锁 |
CMutexClass 、CEventClass |
pthread_mutex_destroy |
UNIX - POSIX 线程 | 释放分配给互斥锁的资源 | CMutexClass 、CEventClass |
CreateEvent |
Windows | 创建一个 Windows 事件对象 | CEventClass |
SetEvent |
Windows | 将 Windows 事件对象设置为信号状态 | CEventClass |
pthread_cond_signal |
UNIX - POSIX 线程 | 解除在 pthread_cond_wait 上阻塞的线程 |
CEventClass |
pthread_cond_wait |
UNIX - POSIX 线程 | 阻塞在条件变量上 | CEventClass |
pthread_cond_init |
UNIX - POSIX 线程 | 初始化条件变量 | CEventClass |
CMutexClass 类
CMutexClass
类封装了系统级别的互斥锁函数和互斥锁同步对象。互斥锁创建发生在对象实例化期间,互斥锁创建为未阻塞状态。该类提供了两个成员函数:Lock
和 Unlock
。Lock
成员函数锁定一个互斥锁,将其分配给调用线程。互斥锁将保持锁定状态,直到调用线程使用 Unlock
成员函数释放它。试图通过调用 Lock
成员函数获取已锁定互斥锁的线程将被阻塞,并进入低 CPU 消耗的等待状态,直到阻塞线程释放互斥锁。
成员函数
函数 | 描述 |
---|---|
void CMutexClass() |
构造函数 |
void Lock() |
锁定互斥锁对象,如果阻塞则等待 |
void Unlock() |
解锁/解除之前阻塞的互斥锁 |
示例
int g_iStorage = 0; CMutexClass MyMutex; void StoreValue( int *pInt ) { MyMutex.Lock(); //the gate keeper. only one thread //allowed in at a time g_iStorage = *pInt; //protected data, critical code section MyMutex.Unlock(); //unblocks, allowing another thread to //access g_iStorage }
CEventClass 类
CEventClass
类封装了 Windows 事件函数、Windows 事件对象、UNIX 条件变量函数以及 UNIX 条件变量。包含在 CEventClass
类中的函数是在 Windows 下的 SetEvent
和 CreateEvent
,以及在 UNIX 下的 phtread_cond_init
、pthread_cond_destroy
、pthread_cond_signal
和 pthread_cond_wait
。事件同步对象在 UNIX 中称为条件变量,但为了简化,我将同时称条件变量和事件对象为事件对象。
成员函数
函数 | 描述 |
---|---|
void Set() |
将事件状态设置为信号状态,通知被阻塞的线程。 |
BOOL Wait() |
将调用线程置于阻塞状态,直到事件状态被设置为信号状态。成功返回 TRUE ,失败返回 FALSE 。 |
void Reset() |
将已信号化的事件重置为未信号化。 |
接收线程使用的事件对象的示例
CEventClass event; . . //thread code . . while(bContinueRunning) { event.Wait(); // wait for an event to occur // perform some task . . event.Reset(); // reset the event to un-signaled } . .
一个线程用于信号化另一个线程的事件对象的示例
CEventClass event; . . // change some data . . event.Set(); // notify thread that an event has occured, // set event to signaled . .
CTask 类和非专用线程
在我见过的许多线程编程示例中,线程处理的数据存储在全局变量中,并受互斥锁保护。操作数据的指令集成在线程函数中。我将这种形式的线程定义为专用异步线程(SAT)。理想情况下,数据和处理数据的相应功能应封装在同一个对象中。我将这种形式的线程定义为同类异步线程(HAT)。在 HAT 下,线程不是专用的。例如,在 HAT 解决方案中不会有打印线程和 I/O 线程。取而代之的是,单个线程可以执行这两种类型的任务,因为任务被实现为完整的对象;也就是说,它们包含处理数据所需的数据和功能。CTask
类是支持 HAT 线程的基类。
typedef enum { TaskStatusNotSubmitted, TaskStatusWaitingOnQueue, TaskStatusBeingProcessed, TaskStatusCompleted } TaskStatus_t; class CTask { private: CMutexClass m_mutex; TaskStatus_t m_state; ThreadId_t m_dwThread; public: void SetTaskStatus(TaskStatus_t state) { m_mutex.Lock(); m_state=state; m_mutex.Unlock(); } void SetId(ThreadId_t *pid) { memcpy(&m_dwThread,pid,sizeof(ThreadId_t)); } /** * * Wait * waits for upto timeoutSeconds for a task * to complete * **/ BOOL Wait(int timeoutSeconds) { timeoutSeconds = timeoutSeconds * 1000; if( Status() != TaskStatusCompleted && timeoutSeconds > 0 ) { Sleep(100); timeoutSeconds = timeoutSeconds - 100; } if( Status() == TaskStatusCompleted ) return TRUE; return FALSE; } /** * * Where * returns current state of a task * **/ TaskStatus_t Status() { TaskStatus_t state ; m_mutex.Lock(); state = m_state; m_mutex.Unlock(); return state; } void Thread(ThreadId_t *pId) { memcpy(pId,&m_dwThread,sizeof(ThreadId_t)); } CTask(){m_state=TaskStatusNotSubmitted; memset(&m_dwThread,sizeof(ThreadId_t),0); } ~CTask(){} virtual BOOL Task()=0; };
成员函数
函数 | 描述 |
---|---|
m_mutex |
互斥锁对象同步对象。 |
virtual BOOL Task() |
由 CThread 对象调用以执行任务。 |
TaskStatus_t Status() |
确定任务状态:TaskStatusNotSubmitted 、TaskStatusWaitingOnQueue 、TaskStatusBeingProcessed 或 TaskStatusCompleted 。 |
void Thread(ThreadId_t *pid) |
返回处理线程的线程 ID。 |
BOOL Wait(int iTimeInSeconds) |
将调用线程置于等待状态,直到任务完成或 iTimeInSeconds 超时。如果任务未在 iTimeInSeconds 内完成,则返回 FALSE ;否则返回 TRUE 。 |
我没有定义 CThread
类;然而,它的定义对于理解它如何与 CTask
对象交互不是必需的。下面的列表显示了这两种对象类型如何交互的概述。
处理 CTask 对象的流程
- 将
CTask
对象传递给CThread
对象进行处理。 CThread
对象将CTask
对象放入先进先出队列。CThread
对象将CTask
对象的状态设置为TaskStatusWaitingOnQueue
。CThread
对象从等待队列中弹出CTask
对象。CThread
对象将CTask
对象的状态更改为TaskStatusBeingProcessed
。CThread
对象调用CTask
对象的成员函数“task”来执行任务。CThread
对象将CTask
对象的状态更改为TaskStateCompleted
。
CThread 类,整合所有内容
成员函数
函数 | 描述 |
---|---|
void CThread() |
构造函数初始化对象数据并启动线程。 |
void ~CThread() |
如果线程正在运行,则终止它,并释放资源。 |
BOOL Event(LPVOID lpvData) |
将数据块放入事件堆栈/队列,并通知对象的线程数据正在等待处理。 |
BOOL Event(CTask *pTask) |
将 CTask 对象放入事件堆栈/队列,并通知对象的线程任务正在等待执行。 |
int GetEventsPending() |
返回事件堆栈上等待的事件数。 |
ThreadId_t GetId() |
返回对象的线程 ID。 |
DWORD GetErrorFlags() |
返回对象的错误标志。如果没有错误,则返回 0 (NO_ERRORS )。如果存在错误,则会设置以下一个或多个标志:MUTEX_CREATION (无法创建互斥锁对象)、EVENT_CREATION (无法创建事件对象)、THREAD_CREATION (无法创建对象的线程)、ILLEGAL_USE_OF_EVENT (对基于间隔的线程调用了 Event 成员函数)。 |
BOOL PingThread(DWORD dwTimeoutMilli) |
确定对象的线程是否正在运行。如果线程正在运行,则返回 TRUE ;如果不在运行,则返回 FALSE 。超时以秒为单位。 |
SetPriority(DWORD dwPriority) |
设置线程优先级,仅限 Windows。 |
BOOL Start() |
启动对象的线程。 |
BOOL Stop() |
停止对象的线程。 |
void SetIdle(DWORD dwIdle) |
更改线程的空闲时间(以毫秒为单位),用于基于间隔的线程。 |
SetThreadType(ThreadType_t typ,DWORD dwIdle) |
在 ThreadTypeEventDriven 和 ThreadTypeIntervalDriven 之间更改线程类型。 |
m_mutex |
用于同步的互斥锁对象,请参阅 CMutexClass 。 |
ThreadState_t ThreadState() |
返回线程的状态:ThreadStateBusy (线程正在处理事件)、ThreadStateWaiting (线程正在等待新事件)、ThreadStateDown (线程未运行)、ThreadStateShutingDown (线程正在关闭过程中)。 |
现在您已经了解了支持类,是时候看看主类 CThread
类了——它是主力。CThread
类支持两种类型的线程:事件驱动和间隔驱动。事件驱动线程是一种线程,它保持在等待状态,阻塞在事件对象上,直到事件对象的状态从无信号变为有信号。当另一个线程将任务放入 CThread
对象的队列并向其事件对象发送信号以通知对象线程时,就会发生新事件。一旦被信号化,线程就会唤醒并从其事件队列中弹出任务,直到队列为空。
CThread
对象为每个任务调用 OnTask
成员函数。任务按先进先出(FCFS)顺序处理。因此,首先处理放入 CThread
对象队列的第一个任务,然后是第二个,依此类推。互斥锁对象同步队列访问,允许在线程处理旧事件的同时将其他事件放入队列。一旦队列为空,线程就会将事件对象重置为无信号状态,并返回等待事件对象。CThread
类支持两种类型的事件驱动线程:专用线程和非专用线程,请参阅 CTask
。
要实现专用线程,必须从 CThread
类派生一个新类。派生类应包含 OnTask
的重新定义实现,以处理对象的各种数据类型。
示例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask( LPVOID lpv ) { ThreadId_t id; GetId(&id); if( lpv ) { int *pInt = (int *)lpv; //don't use cout here, output could be broken up due to //threading printf("\tthread(%ld, counter+%d=%d, counter incremented\n", id,*pInt,(counter+=*pInt)); } return TRUE; } virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("\tthread(%ld, counter++= %d, counter incremented)\n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[]) { // object allocated and thread started CIncrementThread MyThread; int two=2; while( MyThread.GetValue() < 20 ) { MyThread.Event(); // increment value by one Sleep(100); // pauses the root thread for 100 // milliseconds } MyThread.Reset(); while( MyThread.GetValue() < 40 ) { MyThread.Event(&two); Sleep(100); } } OUTPUT: thread(5220, counter++= 1, counter incremented) thread(5220, counter++= 2, counter incremented) thread(5220, counter++= 3, counter incremented) thread(5220, counter++= 4, counter incremented) thread(5220, counter++= 5, counter incremented) thread(5220, counter++= 6, counter incremented) thread(5220, counter++= 7, counter incremented) thread(5220, counter++= 8, counter incremented) thread(5220, counter++= 9, counter incremented) thread(5220, counter++= 10, counter incremented) thread(5220, counter++= 11, counter incremented) thread(5220, counter++= 12, counter incremented) thread(5220, counter++= 13, counter incremented) thread(5220, counter++= 14, counter incremented) thread(5220, counter++= 15, counter incremented) thread(5220, counter++= 16, counter incremented) thread(5220, counter++= 17, counter incremented) thread(5220, counter++= 18, counter incremented) thread(5220, counter++= 19, counter incremented) thread(5220, counter++= 20, counter incremented) thread(5220, counter+2=2, counter incremented thread(5220, counter+2=4, counter incremented thread(5220, counter+2=6, counter incremented thread(5220, counter+2=8, counter incremented thread(5220, counter+2=10, counter incremented thread(5220, counter+2=12, counter incremented thread(5220, counter+2=14, counter incremented thread(5220, counter+2=16, counter incremented thread(5220, counter+2=18, counter incremented thread(5220, counter+2=20, counter incremented thread(5220, counter+2=22, counter incremented thread(5220, counter+2=24, counter incremented thread(5220, counter+2=26, counter incremented thread(5220, counter+2=28, counter incremented thread(5220, counter+2=30, counter incremented thread(5220, counter+2=32, counter incremented thread(5220, counter+2=34, counter incremented thread(5220, counter+2=36, counter incremented thread(5220, counter+2=38, counter incremented thread(5220, counter+2=40, counter incremented
在上面的示例中,我从 CThread
类派生了一个 CIncrementThread
类。在类定义中,我重新定义了 OnTask()
和 OnTask(LPVOID)
虚成员函数。在 OnTask()
实现中,我给对象的计数器变量加一。另一个 OnTask
成员函数接受一个指向整数值的指针,并将指针的值添加到计数器成员变量。这个例子说明了线程可以处理的两种类型的事件。由于计数器变量可能会被多个线程访问,我使用 CThread::m_mutex
对象来确保它只被一个线程访问。
HAT(同类异步线程)线程使用 CThread
和 CTask 类来实现。
示例
#include "Thread.h" class CTaskIncrementer: public CTask { private: int counter; int incr; public: void SetIncr(int iValue) { m_mutex.Lock(); incr = iValue; m_mutex.Unlock(); } int GetIncrementValue() { int incrValue; m_mutex.Lock(); incrValue=incr; m_mutex.Unlock(); return incrValue; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } BOOL Task() { ThreadId_t id; Thread(&id); m_mutex.Lock(); printf("\tthread(%ld, counter+%d=%d, counter incremented\n", id,incr,(counter+=incr)); m_mutex.Unlock(); return TRUE; } CTaskIncrementer(){counter=0;} ~CTaskIncrementer(){} }; int main(int argc, char *argv[]) { CTaskIncrementer incr; CThread thr; incr.SetIncr(2); while( incr.GetValue() < 40 ) thr.Event(&incr); } OUTPUT: thread(5700, counter+2=2, counter incremented thread(5700, counter+2=4, counter incremented thread(5700, counter+2=6, counter incremented thread(5700, counter+2=8, counter incremented thread(5700, counter+2=10, counter incremented thread(5700, counter+2=12, counter incremented thread(5700, counter+2=14, counter incremented thread(5700, counter+2=16, counter incremented thread(5700, counter+2=18, counter incremented thread(5700, counter+2=20, counter incremented thread(5700, counter+2=22, counter incremented thread(5700, counter+2=24, counter incremented thread(5700, counter+2=26, counter incremented thread(5700, counter+2=28, counter incremented thread(5700, counter+2=30, counter incremented thread(5700, counter+2=32, counter incremented thread(5700, counter+2=34, counter incremented thread(5700, counter+2=36, counter incremented thread(5700, counter+2=38, counter incremented thread(5700, counter+2=40, counter incremented
间隔驱动线程是一种线程,它会在预设的间隔唤醒,检查环境是否有变化,处理环境中的变化,然后进入下一个间隔的睡眠,再次唤醒并重复所有操作。要实现间隔驱动线程,需要派生一个重定义了 OnTask(LPVOID)
的 CThread
类。实例化线程后,调用 SetThreadType
成员函数,并将参数设置为 ThreadTypeIntervalDriven
以及以毫秒为单位的间隔。
示例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("\tthread(%ld, counter++= %d, counter incremented)\n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[] ) { CIncrementThread thr; thr->SetThreadType(ThreadTypeIntervalDriven,100); Sleep(500); } OUTPUT: thread(6104, counter++= 12, counter incremented) thread(6104, counter++= 13, counter incremented) thread(6104, counter++= 14, counter incremented) thread(6104, counter++= 15, counter incremented) thread(6104, counter++= 16, counter incremented)
结论
就这样,一个功能齐全的线程对象。我在 Linux 上进行了测试,类工作正常。我还没有在 SunOS 或这些类应该支持的任何其他 UNIX 平台上进行测试。在 Windows 上编译时,请务必指定 /Mt 或 /Mtd 用于代码生成;这会将您的应用程序标识为多线程应用程序。对于 Linux,以下 makefile 有效
CC=g++
LIBS=-lpthread -lrt
CFLAGS=-DLINUX -DNANO_SECOND_SLEEP
OBJS=Thread.cpp EventClass.cpp MutexClass.cpp main.cpp
EXECS = thread
all: $(EXECS)
thread: $(OBJS)
$(CC) $(CFLAGS) -o thread $(OBJS) $(LIBS)
clean:; rm -f *.o $(EXECS)
历史
- (2007 年 10 月 31 日)
- 增加了对
beginthreadex
的支持。要编译以使用beginthreadex
,请定义USE_BEGIN_THREAD
。默认使用CreateThread
进行线程创建。附加的项目在预处理器部分定义了USE_BEGIN_THREAD
。 - 我忘了
CreateThread
如果与 C 运行时函数一起使用,很容易导致内存泄漏。 - (2007 年 11 月 1 日)
- 我引入了更多的逻辑和
ASSERTION
来确保CThread
对象的完整性。同类和专用线程类型都可以通过SetThreadType
成员函数进行物理设置。如果未设置线程类型,线程将根据对成员函数的调用来确定其类型;但是,这不适用于基于间隔的线程。基于间隔的线程必须使用SetThreadType
成员函数进行隐式标识。新的完整性测试是为了确保与CThread
对象使用的一致性。 - 新增了
AtCapacity
和PercentCapacity
成员函数,用于确定线程是否真正繁忙。AtCapacity
在以下两种情况之一时返回TRUE
:线程正在处理事件且其队列已满,或者线程未运行。PercentCapacity
返回对象队列的满百分比。这些新函数允许将线程对象放入数组中,并根据其工作负载进行任务分配。 - 新成员函数
SetQueueSize
允许调整CThread
对象队列的大小。如果队列中等待的元素数量大于请求的队列大小,该函数将返回false
。 Event
成员函数已修改为在发布事件之前验证线程是否正在运行。这消除了在首次传递事件时调用PingThread
的需要。- 错误标志在调用某些成员函数时会自动重置;这使得错误发生只与特定的调用序列相关。
- (2007 年 11 月 2 日)
- 根据 On Freund 的评论(见下方消息),删除了
CEventClass
类中对 UNIX 平台不必要的代码。 - (2007 年 11 月 2 日)
- 添加了
try/catch
块以确保所有类都能正确使用,响应 On 的消息。 - (2007 年 11 月 8 日)
- 我忘记删除几个用于测试
CEventClass
的try-catch
块的连续调用m_event.Reset
。这只会影响非 Windows 平台。此外,我将m_event.Reset
移到了m_event.Wait
之后。由于我们处理的是队列,这可能会导致瓶颈。我注释掉了旧代码,以防有人更喜欢旧的方式。 - 示例 main.cpp 包含几个连续调用
m_mutex.lock
的情况,这些调用会抛出异常;它们是为了测试CMutexClass
的try-catch
块。这些调用已被删除。
进行了很多改进
这是本文的最后一期变更。我将开始写我的下一篇文章。由于它将使用这些类,因此将在此处发布其他修复。