使用信号量:多线程生产者/消费者






4.97/5 (24投票s)
2001 年 6 月 15 日
15分钟阅读

332159

3090
信号量使用简介
引言
一如既往,这篇博文的起因是在新闻组中一周内看到几次相同的问题。我从 1969 年就开始使用信号量,不久前 Edsgar Dijkstra在他的经典论文[E. W. Dijkstra. Cooperating Sequential Processes. Programming Languages (F. Genuys, ed.), Academic Press, New York, 1968].]中首次提出它们。我在 20 世纪 70 年代中期在 CMU 进行的 C.mmp/Hydra 系统中大量使用它们,并且在 Windows 中广泛使用它们。它们实际上很容易使用。
我发现太多的人使用 Event 来进行同步,而不是使用 Semaphore。它们旨在解决不同的问题,一个不能解决另一个的领域问题。您将在下面的示例中看到我两者都使用,但用途不同。
您可能还想阅读我关于工作线程的文章。
这里的解决方案是完全通用的;它可以与多个生产者和多个消费者一起工作。在附带的示例代码中,我展示了一个生产者和两个消费者,但该解决方案可以推广到多个生产者。
我所做的是在 C++ 中创建一个 Queue
类。为了不做特定的类或模板类(模板类在 MFC 中存在一些问题),我只创建了一个接受 void *
指针并将其入队的类。因此,您应该可以在许多项目中直接使用此代码。
您可能会问,我为什么使用 API 调用进行同步,而不是使用 MFC 同步类,如 CMutex
、CSemaphore
和 CEvent
。原因有两个。一是教学目的:我希望您能看到原始对象是如何工作的。另一个是实际原因。在 MFC 的早期版本中,同步原始对象做得非常糟糕,以至于它们实际上是错误的,并且完全无法使用。例如,我曾使用 CEvent::SetEvent
来同步两个独立的进程,但微软的一位过于聪明的程序员认为,检查一个布尔标志以查看 Event 是否已被设置并且不必“浪费时间”调用内核(如果已设置)会很酷。愚蠢的程序员;调用 CEvent::ResetEvent
的代码在完全独立的进程中,这正是您对 Event 想要做的事情之一!而且,为了增添无能,程序员实际上对一个布尔标志进行了不同步的测试,以查看 Event 是否应该被设置。做得如此糟糕的代码不值一提。该程序员对基本操作完全一无所知。也许微软已经修复了这段代码。我从未 bother 过查看。我不信任它。MFC 新闻组中的许多其他人也不信任。
代码和演示程序都可以在源下载中找到。
下一节将描述该类的用户界面。
BOOL Queue::AddTail(LPVOID p)
这会将一个项目添加到队列的头部,并通知等待的线程队列不为空。如果成功将请求入队,则返回 TRUE
;如果达到信号量限制,则返回 FALSE
。
注意:测试程序在恢复方面忽略了布尔值,只是丢弃了它正在入队的项。您可能希望这样做。
LPVOID Queue::RemoveHead()
这会将一个项目从队列中移除。如果队列为空,此调用将阻塞。调用 RemoveTail
操作而被阻塞的线程将在调用 shutdown
方法时终止。
void Queue::shutdown()
导致所有被 RemoveTail
阻塞的线程终止。请注意,不会尝试从队列中出队任何待处理的项目;调用者有责任在队列中有任何内容时不要调用此方法。
注意:我不会在关机过程中阻止线程将项目添加到队列。这需要一个布尔标志,该标志将被设置为指示后续的 AddTail
应返回 FALSE
。请注意,在这种情况下,AddTail
应调用 ::SetLastError
来设置一个标志,表明正在进行关机,因此调用者可以使用 ::GetLastError
来确定返回 FALSE
的原因。已经有一个错误代码,ERROR_SHUTDOWN_IN_PROGRESS
,可以为此目的进行重用。
Queue.h 的代码包含了整个队列类。没有 .cpp 文件。这是完整的代码
class Queue {
public:
//----------------
// Queue
//----------------
Queue(UINT limit)
{
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security attributes
0, // initial count
limit, // max count
NULL); // anonymous
handles[StopperIndex] = ::CreateEvent(NULL, // no security attributes
TRUE, // manual reset
FALSE, // initially non-signaled
NULL); // anonymous
::InitializeCriticalSection(&lock);
} // Queue
//----------------
// ~Queue
//----------------
~Queue()
{
::CloseHandle(handles[SemaphoreIndex]);
::CloseHandle(handles[StopperIndex]);
::DeleteCriticalSection(&lock);
} // ~Queue
//----------------
// AddTail
//----------------
BOOL AddTail(LPVOID p)
{
BOOL result;
::EnterCriticalSection(&lock);
queue.AddTail(p);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
if(!result)
{ /* failed */
// caller can use ::GetLastError to determine what went wrong
queue.RemoveTail();
} /* failed */
::LeaveCriticalSection(&lock);
return result;
} // AddTail
//----------------
// RemoveHead
//----------------
LPVOID RemoveHead()
{
LPVOID result;
switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
{ /* decode */
case StopperIndex: // shut down thread
ExitThread(0); // kill thread
return NULL; // return keeps C compiler happy
case SemaphoreIndex: // semaphore
::EnterCriticalSection(&lock);
result = queue.RemoveHead();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT: // not implemented
default:
ASSERT(FALSE); // impossible condition
return NULL;
} /* decode */
} // RemoveHead
//----------------
// shutdown
//----------------
void shutdown()
{
::SetEvent(handles[StopperIndex]);
} // shutdown
protected:
enum {StopperIndex, SemaphoreIndex};
HANDLE handles[2];
CRITICAL_SECTION lock;
CList<LPVOID, LPVOID> queue;
};
基本思想是,CList
称为 queue 的对象仅从提供的两个方法 AddTail
和 RemoveHead
进行操作。由于它是一个 protected
成员(非常重要!),因此没有其他访问权限,因此所有访问都可以受到保护。
现在让我们更详细地看一下代码
构造函数
Queue(UINT limit)
{
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security attributes
0, // initial count
limit, // max count
NULL); // anonymous
handles[StopperIndex] = ::CreateEvent(NULL, // no security attributes
TRUE, // manual reset
FALSE, // initially non-signaled
NULL); // anonymous
::InitializeCriticalSection(&lock);
} // Queue
我们需要一个包含两个句柄的数组,用于稍后的 ::WaitForMultipleObjects
,所以我直接在数组中创建句柄。为了给两个索引提供助记名称,我使用了一个 enum
。枚举常量的选择非常重要。当 ::WaitForMultipleObjects
结束等待时,它会报告最低编号的已发出信号的数组项的索引。因此,您选择这些索引的顺序可以决定是否优先处理关机或队列元素处理。在此示例中,我优先处理关机,因此我选择用于关机的 Event 的索引为 0
,而信号量的索引为 1
。这被编码在受保护的声明中
enum {StopperIndex, SemaphoreIndex};
请注意,enum
按定义从索引 0
开始分配,除非另有指定。
虽然信号量可以防止我们尝试在空队列上执行,并阻塞等待的线程,但它不提供对队列操作的线程安全性。在这种情况下,我还需要提供一个互斥原语,以防止多个线程同时尝试入队或出队数据。为此,我创建了一个 CRITICAL_SECTION
对象,该对象必须由 ::InitializeCriticalSection
进行初始化。
Event 是一个手动重置事件,最初是非信号状态。当需要关闭所有线程时,我所要做的就是调用 ::SetEvent
将 Event 设置为信号状态。请注意,Event 不提供同步保护;它仅提供通知。
析构函数
像任何好的析构函数一样,这个析构函数会释放构造函数分配的资源
~Queue()
{
::CloseHandle(handles[SemaphoreIndex]);
::CloseHandle(handles[StopperIndex]);
::DeleteCriticalSection(&lock);
} // ~Queue
AddTail
添加元素的唯一技巧是,信号量始终有一个上限;尝试添加高于该上限的项将始终失败。我们通过在无法入队时返回 FALSE
来处理此问题。
否则,我们必须确保一次只有一个线程在操作队列。我们通过调用 ::EnterCriticalSection
来阻止其他线程。请注意,您不得在关键部分的范围内执行 return
;您必须始终执行 ::LeaveCriticalSection
,否则 return
将绕过它。这将意味着下一个尝试访问关键部分的线程将被阻塞,并且没有任何东西可以让它释放这个关键部分。
我们所做的是将该项添加到队列,然后尝试释放信号量。如果我们处于信号量的限制内,::ReleaseSemaphore
将返回 TRUE
。但是,如果我们溢出了信号量,现在队列中的项比信号量所能容纳的要多一个,所以我们必须回滚事务,我们通过对 queue 执行 RemoveTail
操作来完成此操作。因此,当我们最终返回 FALSE
时,队列没有净变化。
这种在同步原语范围外执行 return
的技术是基础性的。我也发现编写此类代码更容易,其中等待/释放操作是平衡的,即每个操作只有一个。
BOOL AddTail(LPVOID p)
{
BOOL result;
::EnterCriticalSection(&lock);
queue.AddTail(p);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
if(!result)
{ /* failed */
// caller can use ::GetLastError to determine what went wrong
queue.RemoveTail();
} /* failed */
::LeaveCriticalSection(&lock);
return result;
} // AddTail
RemoveHead
此函数可能有点过度;我选择实际关闭线程,而不是返回 FALSE
,尽管您也可以选择这样做。
使此工作正常的是 ::WaitForMultipleObjects
,它允许我们阻塞等待队列项或关机事件。构造函数中创建的 Event 用于发出关机信号。如上所述,项的索引被选择为优先于处理的关机,但如果您切换两个 Event,此代码仍然有效,因为它直接使用索引值。
请注意,即使我调用 ::ExitThread
(并且在这种情况下不调用 ::TerminateThread
!),编译器通常也会抱怨没有返回值。为了保持理智,我添加了一个 return NULL
语句,这可以防止我收到编译器警告。
请注意,对 queue 变量的唯一操作是在 CRITICAL_SECTION
的范围内完成的,这样任何其他线程都不能尝试同时添加或删除队列。同样,请注意值必须存储,然后在 ::LeaveCriticalSection
释放锁之后返回。
我不支持超时,但添加了这种情况;它会落入 default
情况,并执行 ASSERT(FALSE)
以捕获开发过程中可能发生的任何错误。
LPVOID RemoveHead()
{
LPVOID result;
switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
{ /* decode */
case StopperIndex: // shut down thread
::ExitThread(0); // kill thread
return NULL; // return keeps C compiler happy
case SemaphoreIndex: // semaphore
::EnterCriticalSection(&lock);
result = queue.RemoveHead();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT: // not implemented
default:
ASSERT(FALSE); // impossible condition
return NULL;
} /* decode */
} // RemoveHead
关闭
这实际上是一个简单的例程。它所做的只是对“停止器”事件调用 ::SetEvent
,这将释放 ::WaitForMultipleObjects
并返回 StopperIndex
,这将终止所有返回到等待状态的线程。
这不处理您可能想要中止的长时间计算周期的情况。在这种情况下,您还应该设置一个布尔变量,正如我在关于工作线程的文章中所描述的那样。
void shutdown()
{
::SetEvent(handles[StopperIndex]);
} // shutdown
就这些!您现在拥有一个支持多线程的线程安全队列。
为了测试这一点,我编写了一个小测试程序,您可以在下载源代码时获得(或者您可以直接从此页面复制粘贴,但您将无法获得测试程序)。为了使其更真实,我添加了一个 200-700 毫秒的人工延迟,随机选择,因此当您按下按钮将条目入队时,您实际上有机会领先于出队。
这是测试程序中的线程例程
void CQueuetestDlg::run()
{
CString * s = new CString;
s->Format(_T("Thread %08x started"), ::GetCurrentThreadId());
PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)s);
while(TRUE)
{ /* thread loop */
LPVOID p = q.RemoveHead();
long n = InterlockedDecrement(&count);
showCount(n);
PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)p);
::Sleep(200 + rand() % 500); // make simulated delay
} /* thread loop */
} // CQueuetestDlg::run
我向主 GUI 线程发送一条消息(这在主 CDialog
的上下文中执行,因此 PostMessage
会将消息发布到该对象),告知线程已启动。将此消息放入列表框的列表框由线程 ID 确定,我通过 ::GetCurrentThreadId()
获取该 ID。
每次将某项添加到队列时,我都会调用 InterlockedIncrement
将计数器加一,移除时调用 InterlockedDecement
。该值由 showCount
例程显示。
主循环从队列中检索一项并将其发布回主线程。队列中的项都是 CString *
对象,将由 PostMessage
的接收者删除,正如我在关于工作线程的文章中所述。
然后我引入随机的 ::Sleep
将其减慢到人类交互速度。如果调用 shutdown
,则执行此代码的线程将终止(参见RemoveHead
代码)。
这是程序运行的一个示例
请注意,这些项并非严格交替出现;否则,我们会在线程 1 窗口中看到所有偶数编号的消息,在线程 2 窗口中看到所有奇数编号的消息。
那些短语是什么意思?嗯,我找到了这个 buzzword 表,并编写了一个小 buzzword 生成器。当我需要一种方法来为各种输入上下文生成句子时,我就会把它带到每个项目中。请注意,完成此示例后,您可以在商业计划中使用它们。
与其他机制的比较
我见过许多没有使用信号量的非常糟糕的同步实现。我将在此讨论它们,并展示为什么它们不够。
轮询
我见过许多尝试使用轮询的例子。有时,它是这样的形式
while(counter <= 0) /* do nothing */ ;
然后在其他地方,程序员会这样做
counter++;
并且为了删除某项,程序员会这样做
counter--;
现在,首先要理解的是,counter++
,其中 counter
是可以从多个线程访问的变量,在多线程环境中不能、不可能,而且很可能也不会正确工作。就此而言。如果您认为它会正确工作,那么您就错了。添加 volatile
作为声明也不会有帮助。这只会告诉编译器它不能缓存值的副本在寄存器中;它不会保护值本身不被错误使用。如果您在多处理器上运行,它尤其会惨败。
啊哈,您说。您应该使用 ::InterlockedIncrement
和 ::InterlockedDecrement
。您说得对。这些将保证值被安全地递增和递减,并且实际上能够正确工作。但现在我们还面临轮询的问题。轮询循环会吃掉一个完整的 200 毫秒(大多数 NT/2000/XP 盒子上或多或少)时间片,什么都不做。我有一个很好的例子,我在一个有一个约 40 个控件的控制面板的类中使用它。程序进入轮询循环。当循环运行时弹出控件面板时,您将看到每个…单个…控件…被…重绘…一次…一次…非常…非常…缓慢…因为那个轮询循环正在消耗 CPU 周期。在下一个实验课中,我们将展示如何添加中断并消除轮询。
所以,如果您认为不应该使用信号量,因为它“效率低下”,我可以保证,无论如何,使用信号量总是比轮询更有效。事实上,轮询通常发生在程序中最糟糕的时候,也就是它应该努力创建下一个要入队的对象的时。是的,您可以轮询。但永远不要相信它“更有效”。它几乎肯定不是。
而且,我应该指出,您仍然需要一个 CRITICAL_SECTION
或 Mutex 来保护您的数据对象,因此您可能通过使用轮询并没有获得多少。
CRITICAL_SECTION 阻塞
我见过至少一种实现,它做了类似以下的事情
if(count == 0)
::EnterCriticalSection(&wait);
这行不通。首先,在测试进行时(count == 0
)和进入关键部分之间,计数可能会发生变化。由于 CRITICAL_SECTION
不是计数对象,因此在等待之前释放将不起作用,因此存在一个导致永远阻塞的竞争条件。
Event 阻塞
这与 CRITICAL_SECTION
阻塞的问题完全相同。虽然您可以争辩说,在实际尝试阻塞之前成功设置 Event 是可能的,但由于 Event 不是计数的,您必须在阻塞之前对计数变量进行同步更新。由于一个线程可能在另一个线程递增计数并执行 ::SetEvent
的同时将计数递减到零并执行 ::ResetEvent
,因此您无法保证 ::SetEvent
不会在 ::ResetEvent
之前几百纳秒发生,从而导致顺序错乱并阻塞,而本应可以通过 Event。
信号量不是互斥!
我见过这样的代码
::WaitForSingleObject(semaphore, INFINITE);
queue.RemoveHead(p)
论点是“信号量保护队列操作”。不,信号量不保护队列,除非最大信号量值为 1
。在这种情况下,您拥有信号量的一个特殊情况,即二元信号量,也称为“互斥量”。请注意,真正的 Mutex
对象(如 ::CreateMutex
)实际上具有与信号量略有不同的语义;一个线程据说拥有一个互斥量,如果一个线程试图重新获取它已拥有的 Mutex
,它将被允许通过而不阻塞。您必须执行与在它上执行的 ::WaitFor
操作一样多的 ::ReleaseMutex
操作。信号量不是这样工作的;线程第二次尝试获取它已“拥有”的二元信号量将无限期地阻塞。此外,任何线程都可以释放信号量,但只有拥有互斥量的线程才能释放它。信号量仅为可计数资源(如队列中的项目数)提供流程控制,而不保护其他对象免受并发执行。因此,上面的 ::RemoveHead
操作必须由 Mutex 或 CRITICAL_SECTION
保护。
没有信号量的替代品
如果您认为您发明了一种巧妙、更快、更有效、更简单,或者其他任何方式来执行信号量而不实际使用信号量,那么您只是欺骗了自己。阅读 Dijkstra 早期的论文,他当时正在开发同步原语的概念,这些原语是可抢占线程安全的,而且没有 InterlockedIncrement
操作来帮助他。这些是复杂的论文;技术很微妙。只有当您完全理解同步问题时,您才应考虑尝试类似的东西。其余时间,特别是如果您是并行和同步的新手,请将此作为一条规则:您不知道如何在不使用信号量的情况下创建信号量效果。我已经专业地做了二十五年,我在尝试用其他机制来伪造信号量的功能方面并不自信。相信我:您不想去那里。
这些文章中表达的观点是作者的观点,不代表,也不被微软认可。
如果您对本文有任何疑问或评论,请在下方留言。
版权所有 © 1999 保留所有权利
www.flounder.com/mvp_tips.htm
许可证
本文未附加明确的许可证,但可能在文章文本或下载文件本身中包含使用条款。如有疑问,请通过下面的讨论区联系作者。
作者可能使用的许可证列表可以在此处找到。