使用线程、信号量和事件实现生产者/消费者






4.64/5 (44投票s)
2004年7月7日
15分钟阅读

208133

3522
使用线程、信号量和事件类实现特定版本的生产者/消费者模型
0. 引言
生产者/消费者是计算机科学领域一个著名的模型。要实际实现这个模型,涉及到一些重要且基础的技术,如多线程和同步。本文介绍了三个泛型类,即mySemaphore
、myThread
和myEvent
,然后使用这些类来实现一个版本的生产者/消费者模型。
与其他关于同一主题的文章相比,这些类不是从MFC或C#的任何类构建或基于这些类的,因此它相当轻量级;同时它们也足够通用,可以用于构建其他应用程序——生产者/消费者实现可以看作是这些类使用的一个例子。另一个值得一提的观点是,每当涉及多线程时,如何安全地终止线程似乎总是一个问题。本文还提供了一种终止线程的方法:在这三个类中,myThread
负责创建和运行线程,mySemaphore
负责同步,而myEvent
则用于提供一个通知线程终止的机制:主线程控制某个事件,线程也可以访问该事件,当需要终止线程时,主线程会发出事件信号,然后线程会捕获该信号,从而知道是时候终止了,但在终止之前,线程会完成所有必要的内存清理和其他善后工作。
下一节将描述要实现的特定版本的生产者/消费者模型。然后,我们将讨论如何构建/编译该项目,以便您自己尝试,或者如果您想在其他项目中使用这三个类。接下来的几节将详细介绍这三个类,并描述如何实现生产者/消费者模型;这也可以作为使用这三个泛型类在实际应用程序中的一个例子。
1. 生产者/消费者场景
我们感兴趣的生产者/消费者模型描述如下。在这种情况下,我们有一个生产者和几个消费者(假设是5个消费者)。生产者会产生一条消息,并将该消息保存在一个文件中。5个消费者都会查看该文件,如果文件中存在消息,它们会读取该消息。请注意,消息的开头指定了该消息是为哪个消费者准备的,因此只有正确的消费者才能真正“消费”该消息:它会获取消息并将文件设置为空,表明消息已被消费并消失。对于任何其他消费者,它们要么发现消息不是为自己准备的(因此它们不会更改消息文件),要么消息文件为空。文件从一开始就是空的;因此,所有消费者都只是等待直到文件中有消息。
生产者会一直检查文件:如果文件为空,要么是一切的开始,要么是最后一条消息已被消费,在这两种情况下,生产者都会在文件中生成一条新消息。如果文件不为空,生产者什么也不做,只等待消息被正确的消费者消费。
生产者可以像这样生成消息:“consumerName QUIT
”,表示名为consumerName
的消费者应该被终止。一旦消费者收到一条读取“QUIT
”的消息,它就会安全地终止自己。一旦所有消费者都被终止,消费者主循环也将终止。
2. 如何运行项目
您可以下载源代码,以下.cpp和.h文件应包含在zip文件中
对于消费者
multiConsumer.cpp
myEvent.cpp
myException.cpp
myLog.cpp
mySemaphore.cpp
myThread.cpp
myEvent.h
myException.h
myLog.h
mySemaphore.h
myThread.h
对于生产者
myException.cpp
myLog.cpp
mySemaphore.cpp
producer.cpp
myException.h
myLog.h
mySemaphore.h
在获取所有源代码后(您只需要对代码进行的唯一更改是更改message.txt的路径,该文件同时存在于生产者和消费者代码中,因此您需要更改两个地方),您可以构建两个项目:一个用于生产者,一个用于消费者。编译完成后,您应该先启动生产者,然后启动消费者,开始输入消息,并观察消费者的反应。由于这只是一个小型的玩具项目,提供的保护/灵活性不多,例如,您需要以某种格式输入消息(参见第7节),您也可以输入“BYE
”来停止生产者。
在接下来的几节中,我们将详细讨论这些类,并展示如何使用这些类来实现生产者/消费者模型。
3. myEvent类
本节将介绍myEvent
类。将回答以下问题:
- 人们可以在MFC中找到
CEvent
类,那么为什么我们要重新发明轮子呢? - 该类提供了哪些基本功能?
- 在生产者/消费者实现中,为什么我们需要这个类?
让我们先来回答第一个问题。基本上,myEvent
类为进程之间发送特定事件已发生的信号提供了一种机制。它简单地封装了WIN32的事件相关API。在MFC中,可以找到CEvent
类,但是,这两个类并不相同:myEvent
类提供了在事件发生时调用用户定义回调函数的能力。这被证明是一个非常有趣的功能,至少有两个潜在的好处。为了说明这一点,让我们看一下它的构造函数。
myEvent(string eventName=NULL,PAPCFUNC userAPC=NULL,BOOL queueUserAPC=FALSE)
userAPC
是指向用户以异步过程调用(APC)形式提供的回调函数的指针。如果此指针不是NULL
且queueUserAPC
为TRUE
,那么除了使用给定的eventName
创建事件外,myEvent
的构造函数还会启动一个线程,其任务是等待事件(或其他条件,如排队的APC)。当事件发生时,线程将排队执行回调。
有了这些说明,很容易看出该功能的好处:这提供了一种更智能的等待方式。想象一下,一个主程序必须等待某个事件发生(一旦发生,主程序将执行一些特定的处理),一个简单的解决方案是让这个主程序等待该事件,这将阻塞它的处理。但是,我们可能希望主程序在等待事件的同时执行其他处理。myEvent
类提供了一个更好的解决方案:事件发生时要执行的特定处理被写入用户定义的回调函数中,并将此回调函数传递给事件构造函数。主程序将创建事件,创建事件时,其构造函数将(在您不知道的情况下)创建一个线程,该线程的唯一任务是等待该事件,当事件发生时,该线程将排队执行回调函数。主程序在不知道此等待线程存在的情况下,可以继续执行任何它想执行的处理。
该类的另一个好处是提供一种安全终止线程的方法:线程可以创建一个这样的事件对象,当需要终止该线程时,主线程中的主要控制可以发出事件信号,然后回调可以继续为该线程进行内存/资源清理。
确实,myEvent
类足够通用,可以在Windows平台开发的任何其他应用程序中使用,除了上述所有潜在的好处外,它还提供了一种简单的方式来使用所有WIN32事件处理API。它的方法包括setEvent()
、resetEvent()
、waitForEvent(DWORD waitTime=INFINITE)
、pulseEvent()
等。要查看所有详细信息,请查看您可以在此处下载的类定义和实现。
在本次生产者/消费者实现中,为什么我们需要这个类?同样,我们在这里需要它,因为它被用作终止消费者线程的一种安全方式。在这个特定的应用程序中,清理工作并不复杂,所以我们不需要编写回调,但思路是一样的,并在后面的章节中有所体现。
4. mySemaphore类
本节将简要介绍mySemaphore
类。同样,将回答与第3节相同的问题。首先,很明显,这个类封装了WIN32信号量API。同样,人们可以在MFC中找到CSemaphore
类,它也提供了一些这些API的包装器,但是,信不信由你,CSemaphore
缺少一个等待信号量释放的方法,这也许被认为是该类的关键功能之一。
该类提供的基本功能集非常直观:您可以通过实际创建新信号量或打开现有信号量来创建信号量对象。获取信号量对象后,您可以锁定它(要么等待直到成功锁定,要么立即尝试锁定并失败则返回),解锁它,更改其设置(初始计数、最大计数等),等等。有关详细信息,请参阅下载代码中的类定义/实现。该类也足够通用,可用于Windows平台上的其他应用程序。
mySemaphore
类用于生产者/消费者模型以实现同步控制:如果其中一个消费者正在读取消息文件,生产者就不会生成任何东西;同样,如果生产者正在更新消息文件,消费者就无法访问消息文件;此外,在任何给定时刻,消息文件只能由一个消费者读取和/或更新。即使是日志文件(出于调试目的)也必须使用信号量进行同步。
5. myThread类
要实现一个带有多个消费者的生产者/消费者模型,多线程可能是最佳解决方案——它可以用来“模拟”多个消费者的存在,因为线程使进程能够一次执行多项任务。让我们再次提及线程编程的一个关键好处(已在第3节中提到):通过创建线程,您可以在该线程中调用阻塞函数,让主线程继续处理而不必等待。
与其他两个类一样,myThread
是一个泛型类,可以轻松地用于开发其他应用程序。它的主要功能包括:创建线程、启动线程执行、挂起/恢复线程、等待线程完成并获取其退出代码、访问线程的设置(例如,线程的优先级),以及报告线程的时间统计信息等。同样,有关详细信息,请参阅myThread
类的定义/实现。
现在让我们集中讨论如何安全地终止线程。首先,让我们先研究一下myThread
的构造函数,
myThread(LPTHREAD_START_ROUTINE threadFunction,LPVOID pThreadFuncParameter=NULL,
DWORD exeFlags=0,DWORD sSize=0,BOOL inheritable=FALSE)
在此构造函数中,threadFunction
是回调函数的地址,pThreadFuncParameter
是回调参数的地址。如果回调需要多个参数,一个好的解决方案是构建一个封装了回调所有参数的类,并传递该类对象的指针。回调函数在接收到此指针后,将能够访问所有参数。
这引导我们构建另一个类,让我们称之为myThreadArgument
。当然,不可能确定该类应该拥有哪些数据成员来满足现实世界中可能创建的所有潜在线程的需求。但是,一个参数,或者更准确地说,一个数据成员,总是需要的:指向myEvent
类对象的指针。
如上所述,每当涉及线程编程时,我们总是需要一种安全的方式来终止线程(WIN32确实提供了TerminateThread()
,但该调用不允许线程在退出前清理一些资源,因此不推荐使用)。在此设计中,myEvent
类用于终止线程:当需要终止线程时,主循环将访问线程的参数对象,发出该对象中的事件信号,而不是调用TerminateThread()
。另一方面,线程也会检查其参数对象中的事件,一旦发现事件被信号化,它就会清理一切并退出。基本流程如下所示。
myThreadArgument
的构造函数将构建一个事件对象
myThreadArgument::myThreadArgument(...)
{
// add data members you want here
// this event is always necessary
exitEvent = new myEvent(“exitEvent”);
}
在主循环中
void main (int argc, char* argv[])
{
myThreadArgument* threadArgument = new myThreadArgument();
myThread* thread = new myThread(threadFunc,(void*)threadArgument);
thread->execute();
while (1)
{
// do stuff here ...
// time to stop the thread: signal the thread to stop
threadArgument->getExitEvent()->setEvent();
Sleep(1);
}
return 0;
}
在线程函数中
DWORD WINAPI threadFunc(LPVOID threadInfo)
{
// get the parameters to this calleback
myThreadArgument* threadArgument = (myThreadArgument*)threadInfo;
// do other stuff ...
while (1)
{
// do stuff here ...
// check to see if this thread should terminate
if (threadArgument->getExitEvent()->waitForEvent(0))
{
// clean up everything here !!! then get out
break;
}
}
// do stuff here ...
}
6. 两个辅助类:myLog和myException
在开始实现生产者/消费者模型之前,我们需要另外两个类:myLog
和myException
。
调试多线程系统可能很困难。为了理解每个线程(包括主线程)的操作,我们希望所有线程将其主要操作写入日志文件,而myLog
类就是为此目的而设计的。这主要是为了理解系统中正在发生的事情,如果您不需要日志,您可以在所有.cpp文件中搜索winLog
并注释掉它们。另外,请记住从producer.cpp中删除以下行
myLog winLog("producer.txt");
并从multiConsumer.cpp中删除以下行
myLog winLog("consumer.txt");
为了捕获可能的错误,我们提供了一个简单的myException
类。我们建议您保留此类,因为它相当简单且易于理解。
7. 生产者/消费者模型的实现
现在我们已经完成了所有必要的类,我们可以开始实现生产者/消费者模型。让我们先看看生产者。同样,在我们的模型中,我们只有一个生产者,它的构建非常直接,唯一需要注意的是,为了保护对消息文件的访问,生产者会构建一个信号量,如下所示:
mySemaphore producerSemaphore(string("producerSem"));
在主循环中,生产者尝试锁定此信号量,以便开始将消息“生产”到消息文件中。一旦生产者成功锁定此信号量,它将检查消息文件(文件名为message.txt),如果文件为空,它将生成一条新消息,否则,它将解锁信号量并等待消费者消费该消息——这是producer.cpp中的主流程,您可以在下载的源代码中找到所有详细信息。
主要的消费者逻辑也不难。它将首先构建两个信号量:一个用于访问日志文件,另一个称为mainSemaphore
// semaphore that protects the log file
mySemaphore logSemaphore(string(""),1);
// main semaphore (see the main loop below)
mySemaphore mainSemaphore(string("main"),0,10);
然后,它将创建所有5个消费者线程,并在进入主循环之前启动所有这些线程。
// create all the consumers and their arguments
for ( int i = 0; i < numOfConsumers; i++ )
{
// first create the consumer's name: their name is "0","1","2","3" and "4"
char tmp[64];
memset(tmp,0,sizeof(tmp));
sprintf(tmp,"%d",i);
string consumerName = string(tmp);
// build the argument for each consumer thread
consumerArgument[i] =
new myThreadArgument(consumerName,&logSemaphore,&mainSemaphore,
string("producerSem"),string("message.txt"));
// build each consumer thread using the above argument
consumer[i] = new myThread(consumerThread,(void*)consumerArgument[i]);
// start it!
consumer[i]->execute();
}
在完成所有上述设置工作后,主消费者循环开始:它将首先尝试锁定mainSemaphore
。成功锁定此信号量意味着一个消费者已经消费了该消息,因此现在轮到主循环执行以下操作:如果此消息是“QUIT
”,它将向接收到该消息的消费者发出终止信号;如果所有消费者都已终止,主循环将终止。
每个消费者线程都有以下函数签名
DWORD WINAPI consumerThread(LPVOID threadInfo))
这代表了一个单独的消费者,它的实现如下。首先,它从线程参数对象threadInfo
中获取必要的信息:日志信号量,以便每个消费者可以同步访问日志文件;该消费者的名称(消费者的名称是集合{“0
”、“1
”、“2
”、“3
”、“4
”}中的一个字符);mainSemaphore
,用于通知主循环消息已被消费;它还将获取生产者信号量的名称,以便访问message.txt文件。然后,每个消费者将进入其主循环:在这个主循环中,它将首先尝试锁定生产者信号量。一旦锁定生产者信号量,它将读取消息文件。如果文件中存在消息,并且该消息是为该消费者准备的(消息的第一部分必须是消费者的名称,即集合{“0
”、“1
”、“2
”、“3
”、“4
”}中的一个字符),它会将文件长度设置为0
,表示消息已被消费,它还将把这条消息打印到日志文件中。如果存在消息但不是为该消费者准备的,它不会更改文件,但会向日志中添加一行,显示该消息是为另一个消费者准备的。如果文件为空,它将在日志文件中打印一条消息,表明另一个消费者已经消费了该消息。然后它解锁生产者信号量,让其他消费者有机会读取它,或者让生产者有机会取回它。该消费者线程将做的最后一件事是检查其参数,以确定是否应该终止。
// notice there is no memory/resource clean-up work needs to be done,
// so simply break the main loop in the consumer thread
if (threadArgument->getExitEvent()->waitForEvent(0)) break;
如果在threadArgument
中的exitEvent
已被信号化,它将跳出循环,从而终止消费者线程。否则,它将继续循环,即尝试锁定生产者信号量并读取消息文件,如上所述。
请注意,生产者生成的所有消息都必须遵循以下格式:
consumerName message
例如:
0 this message is for the first consumer
在此消息中,第一个0是消费者名称,其余部分是消息本身。
以下是一个日志文件,显示了生产者在一次运行中生成的所有消息(请注意,一旦您完成运行生产者和消费者,您的本地目录中将会有日志文件,名称为producer.txt和consumer.txt,您运行可执行文件的目录就是日志文件所在的目录)。
DATE: 07/02/04 - 03:00:08 producer.txt
0 this message is for consumer 0
1 this message is for consumer 1
0 for consumer 0 again
2 this message is for consumer 2
3 this message is for consumer 3
4 this message is for consumer 4
1 for consumer 1 again
0 QUIT
1 QUIT
2 this message is for consumer 2 again
2 QUIT
4 you are one of the two last consumers here!
3 QUIT
4 QUIT
BYE
DATE: 07/02/04 - 03:02:31 producer.txt
Execution time: 2 minutes 23 seconds
下面的日志文件显示了上述生产者消息下消费者的跟踪信息。
DATE: 07/02/04 - 03:00:12 consumer.txt
[consumer 0 ] has read the msg: got message: this message is for consumer 0
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is for consumer 1
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: got message: this message is for consumer 1
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: got message: for consumer 0 again
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: got message: this message is for consumer 2
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: got message: this message is for consumer 3
[consumer 1 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: got message: this message is for consumer 4
[consumer 2 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 0 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 1 ] has read the msg: got message: for consumer 1 again
[consumer 3 ] has read the msg: it is for consumer 0
[main] will signal [0] to exit...
[consumer 4 ] has read the msg: it is for consumer 0
[consumer 2 ] has read the msg: it is for consumer 0
[consumer 0 ] has read the msg: got message: QUIT
[consumer 1 ] has read the msg: it is for consumer 0
[consumer 0 ] will terminate now
[consumer 3 ] has read the msg: it is for consumer 1
[consumer 4 ] has read the msg: it is for consumer 1
[main] will signal [1] to exit...
[consumer 2 ] has read the msg: it is for consumer 1
[consumer 1 ] has read the msg: got message: QUIT
[consumer 1 ] will terminate now
[consumer 3 ] has read the msg: it is already consumed by other consumer.
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 2 ] has read the msg:
got message: this message is for consumer 2 again
[consumer 3 ] has read the msg: it is for consumer 2
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[main] will signal [2] to exit...
[consumer 2 ] has read the msg: got message: QUIT
[consumer 2 ] will terminate now
[consumer 3 ] has read the msg: it is for consumer 4
[consumer 4 ] has read the msg:
got message: you are one of the two last consumers here!
[consumer 4 ] has read the msg: it is already consumed by other consumer.
[consumer 3 ] has read the msg: got message: QUIT
[main] will signal [3] to exit...
[consumer 3 ] will terminate now
[consumer 4 ] has read the msg: got message: QUIT
[main] will signal [4] to exit...
all the consumers are dead, the main thread will be terminated!
[consumer 4 ] will terminate now
DATE: 07/02/04 - 03:02:28 consumer.txt
Execution time: 2 minutes 16 seconds
8. 结论
本文介绍了三个泛型类,并使用这些类实现了一个版本的生产者/消费者模型。同样,这些类足够通用,可以用于需要多线程和同步控制的其他应用程序。您也可以通过此特定示例来研究和理解线程、信号量和事件的行为。希望这对您的开发工作有所帮助,我非常欢迎任何建议和意见。
许可证
本文未附加明确的许可证,但可能在文章文本或下载文件本身中包含使用条款。如有疑问,请通过下面的讨论区联系作者。
作者可能使用的许可证列表可以在此处找到。