65.9K
CodeProject 正在变化。 阅读更多。
Home

使用信号量:多线程生产者/消费者

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (24投票s)

2001 年 6 月 15 日

15分钟阅读

viewsIcon

332159

downloadIcon

3090

信号量使用简介

引言

一如既往,这篇博文的起因是在新闻组中一周内看到几次相同的问题。我从 1969 年就开始使用信号量,不久前 Edsgar Dijkstra在他的经典论文[E. W. Dijkstra. Cooperating Sequential Processes. Programming Languages (F. Genuys, ed.), Academic Press, New York, 1968].]中首次提出它们。我在 20 世纪 70 年代中期在 CMU 进行的 C.mmp/Hydra 系统中大量使用它们,并且在 Windows 中广泛使用它们。它们实际上很容易使用。

我发现太多的人使用 Event 来进行同步,而不是使用 Semaphore。它们旨在解决不同的问题,一个不能解决另一个的领域问题。您将在下面的示例中看到我两者都使用,但用途不同。

您可能还想阅读我关于工作线程的文章。

这里的解决方案是完全通用的;它可以与多个生产者和多个消费者一起工作。在附带的示例代码中,我展示了一个生产者和两个消费者,但该解决方案可以推广到多个生产者。

我所做的是在 C++ 中创建一个 Queue 类。为了不做特定的类或模板类(模板类在 MFC 中存在一些问题),我只创建了一个接受 void * 指针并将其入队的类。因此,您应该可以在许多项目中直接使用此代码。

您可能会问,我为什么使用 API 调用进行同步,而不是使用 MFC 同步类,如 CMutexCSemaphoreCEvent。原因有两个。一是教学目的:我希望您能看到原始对象是如何工作的。另一个是实际原因。在 MFC 的早期版本中,同步原始对象做得非常糟糕,以至于它们实际上是错误的,并且完全无法使用。例如,我曾使用 CEvent::SetEvent 来同步两个独立的进程,但微软的一位过于聪明的程序员认为,检查一个布尔标志以查看 Event 是否已被设置并且不必“浪费时间”调用内核(如果已设置)会很酷。愚蠢的程序员;调用 CEvent::ResetEvent 的代码在完全独立的进程中,这正是您对 Event 想要做的事情之一!而且,为了增添无能,程序员实际上对一个布尔标志进行了不同步的测试,以查看 Event 是否应该被设置。做得如此糟糕的代码不值一提。该程序员对基本操作完全一无所知。也许微软已经修复了这段代码。我从未 bother 过查看。我不信任它。MFC 新闻组中的许多其他人也不信任。

代码和演示程序都可以在源下载中找到。

下一节将描述该类的用户界面。

BOOL Queue::AddTail(LPVOID p)

这会将一个项目添加到队列的头部,并通知等待的线程队列不为空。如果成功将请求入队,则返回 TRUE;如果达到信号量限制,则返回 FALSE

注意:测试程序在恢复方面忽略了布尔值,只是丢弃了它正在入队的项。您可能希望这样做。

LPVOID Queue::RemoveHead()

这会将一个项目从队列中移除。如果队列为空,此调用将阻塞。调用 RemoveTail 操作而被阻塞的线程将在调用 shutdown 方法时终止。

void Queue::shutdown()

导致所有被 RemoveTail 阻塞的线程终止。请注意,不会尝试从队列中出队任何待处理的项目;调用者有责任在队列中有任何内容时不要调用此方法。

注意:我不会在关机过程中阻止线程将项目添加到队列。这需要一个布尔标志,该标志将被设置为指示后续的 AddTail 应返回 FALSE。请注意,在这种情况下,AddTail 应调用 ::SetLastError 来设置一个标志,表明正在进行关机,因此调用者可以使用 ::GetLastError 来确定返回 FALSE 的原因。已经有一个错误代码,ERROR_SHUTDOWN_IN_PROGRESS,可以为此目的进行重用。

Queue.h 的代码包含了整个队列类。没有 .cpp 文件。这是完整的代码

class Queue {
public:
   //----------------
   // Queue
   //----------------
   Queue(UINT limit)
     {
      handles[SemaphoreIndex] = ::CreateSemaphore(NULL,  // no security attributes
                                                  0,     // initial count
                                                  limit, // max count
                                                  NULL); // anonymous

      handles[StopperIndex] = ::CreateEvent(NULL,  // no security attributes
                                            TRUE,  // manual reset
                                            FALSE, // initially non-signaled
                                            NULL); // anonymous

      ::InitializeCriticalSection(&lock);
     } // Queue

   //----------------
   // ~Queue
   //----------------
   ~Queue()
     {
      ::CloseHandle(handles[SemaphoreIndex]);
      ::CloseHandle(handles[StopperIndex]);
      ::DeleteCriticalSection(&lock);
     } // ~Queue

   //----------------
   // AddTail
   //----------------
   BOOL AddTail(LPVOID p)
     { 
      BOOL result;
      ::EnterCriticalSection(&lock);
      queue.AddTail(p);
      result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
      if(!result)
        { /* failed */
         // caller can use ::GetLastError to determine what went wrong
         queue.RemoveTail();
        } /* failed */
      ::LeaveCriticalSection(&lock);
      return result;
     } // AddTail

   //----------------
   // RemoveHead
   //----------------
   LPVOID RemoveHead()
     { 
      LPVOID result;

      switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
         { /* decode */
          case StopperIndex: // shut down thread
              ExitThread(0); // kill thread
          return NULL; // return keeps C compiler happy

          case SemaphoreIndex: // semaphore
              ::EnterCriticalSection(&lock);
              result = queue.RemoveHead();
              ::LeaveCriticalSection(&lock);
              return result;

          case WAIT_TIMEOUT: // not implemented
          default:
              ASSERT(FALSE); // impossible condition
              return NULL;
         } /* decode */
      } // RemoveHead

   //----------------
   // shutdown
   //----------------
   void shutdown()
      {
       ::SetEvent(handles[StopperIndex]);
      } // shutdown

 protected:
      enum {StopperIndex, SemaphoreIndex};
      HANDLE handles[2];
      CRITICAL_SECTION lock;
      CList<LPVOID, LPVOID> queue;
};

基本思想是,CList 称为 queue 的对象仅从提供的两个方法 AddTailRemoveHead 进行操作。由于它是一个 protected 成员(非常重要!),因此没有其他访问权限,因此所有访问都可以受到保护。

现在让我们更详细地看一下代码

构造函数

   Queue(UINT limit)
     {
      handles[SemaphoreIndex] = ::CreateSemaphore(NULL,  // no security attributes
                                                  0,     // initial count
                                                  limit, // max count
                                                  NULL); // anonymous

      handles[StopperIndex] = ::CreateEvent(NULL,  // no security attributes
                                            TRUE,  // manual reset
                                            FALSE, // initially non-signaled
                                            NULL); // anonymous

      ::InitializeCriticalSection(&lock);
     } // Queue

我们需要一个包含两个句柄的数组,用于稍后的 ::WaitForMultipleObjects,所以我直接在数组中创建句柄。为了给两个索引提供助记名称,我使用了一个 enum。枚举常量的选择非常重要。当 ::WaitForMultipleObjects 结束等待时,它会报告最低编号的已发出信号的数组项的索引。因此,您选择这些索引的顺序可以决定是否优先处理关机或队列元素处理。在此示例中,我优先处理关机,因此我选择用于关机的 Event 的索引为 0,而信号量的索引为 1。这被编码在受保护的声明中

      enum {StopperIndex, SemaphoreIndex};

请注意,enum 按定义从索引 0 开始分配,除非另有指定。

虽然信号量可以防止我们尝试在空队列上执行,并阻塞等待的线程,但它提供对队列操作的线程安全性。在这种情况下,我还需要提供一个互斥原语,以防止多个线程同时尝试入队或出队数据。为此,我创建了一个 CRITICAL_SECTION 对象,该对象必须由 ::InitializeCriticalSection 进行初始化。

Event 是一个手动重置事件,最初是非信号状态。当需要关闭所有线程时,我所要做的就是调用 ::SetEvent 将 Event 设置为信号状态。请注意,Event 提供同步保护;它仅提供通知。

析构函数

像任何好的析构函数一样,这个析构函数会释放构造函数分配的资源

   ~Queue()
     {
      ::CloseHandle(handles[SemaphoreIndex]);
      ::CloseHandle(handles[StopperIndex]);
      ::DeleteCriticalSection(&lock);
     } // ~Queue

AddTail

添加元素的唯一技巧是,信号量始终有一个上限;尝试添加高于该上限的项将始终失败。我们通过在无法入队时返回 FALSE 来处理此问题。

否则,我们必须确保一次只有一个线程在操作队列。我们通过调用 ::EnterCriticalSection 来阻止其他线程。请注意,您不得在关键部分的范围内执行 return;您必须始终执行 ::LeaveCriticalSection,否则 return 将绕过它。这将意味着下一个尝试访问关键部分的线程将被阻塞,并且没有任何东西可以让它释放这个关键部分。

我们所做的是将该项添加到队列,然后尝试释放信号量。如果我们处于信号量的限制内,::ReleaseSemaphore 将返回 TRUE。但是,如果我们溢出了信号量,现在队列中的项比信号量所能容纳的要多一个,所以我们必须回滚事务,我们通过对 queue 执行 RemoveTail 操作来完成此操作。因此,当我们最终返回 FALSE 时,队列没有净变化。

这种在同步原语范围外执行 return 的技术是基础性的。我也发现编写此类代码更容易,其中等待/释放操作是平衡的,即每个操作只有一个。

   BOOL AddTail(LPVOID p)
     { 
      BOOL result;
      ::EnterCriticalSection(&lock);
      queue.AddTail(p);
      result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
      if(!result)
        { /* failed */
         // caller can use ::GetLastError to determine what went wrong
         queue.RemoveTail();
        } /* failed */
      ::LeaveCriticalSection(&lock);
      return result;
     } // AddTail

RemoveHead

此函数可能有点过度;我选择实际关闭线程,而不是返回 FALSE,尽管您也可以选择这样做。

使此工作正常的是 ::WaitForMultipleObjects,它允许我们阻塞等待队列项或关机事件。构造函数中创建的 Event 用于发出关机信号。如上所述,项的索引被选择为优先于处理的关机,但如果您切换两个 Event,此代码仍然有效,因为它直接使用索引值。

请注意,即使我调用 ::ExitThread(并且在这种情况下调用 ::TerminateThread!),编译器通常也会抱怨没有返回值。为了保持理智,我添加了一个 return NULL 语句,这可以防止我收到编译器警告。

请注意,对 queue 变量的唯一操作是在 CRITICAL_SECTION 的范围内完成的,这样任何其他线程都不能尝试同时添加或删除队列。同样,请注意值必须存储,然后在 ::LeaveCriticalSection 释放锁之后返回。

我不支持超时,但添加了这种情况;它会落入 default 情况,并执行 ASSERT(FALSE) 以捕获开发过程中可能发生的任何错误。

   LPVOID RemoveHead()
     { 
      LPVOID result;

      switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
         { /* decode */
          case StopperIndex:   // shut down thread
              ::ExitThread(0); // kill thread
              return NULL;     // return keeps C compiler happy

          case SemaphoreIndex: // semaphore
              ::EnterCriticalSection(&lock);
              result = queue.RemoveHead();
              ::LeaveCriticalSection(&lock);
              return result;

          case WAIT_TIMEOUT: // not implemented
          default:
              ASSERT(FALSE); // impossible condition
              return NULL;
         } /* decode */
      } // RemoveHead

关闭

这实际上是一个简单的例程。它所做的只是对“停止器”事件调用 ::SetEvent,这将释放 ::WaitForMultipleObjects 并返回 StopperIndex,这将终止所有返回到等待状态的线程。

处理您可能想要中止的长时间计算周期的情况。在这种情况下,您还应该设置一个布尔变量,正如我在关于工作线程的文章中所描述的那样。

   void shutdown()
      {
       ::SetEvent(handles[StopperIndex]);
      } // shutdown

就这些!您现在拥有一个支持多线程的线程安全队列。

为了测试这一点,我编写了一个小测试程序,您可以在下载源代码时获得(或者您可以直接从此页面复制粘贴,但您将无法获得测试程序)。为了使其更真实,我添加了一个 200-700 毫秒的人工延迟,随机选择,因此当您按下按钮将条目入队时,您实际上有机会领先于出队。

这是测试程序中的线程例程

void CQueuetestDlg::run()
    {
     CString * s = new CString;
     s->Format(_T("Thread %08x started"), ::GetCurrentThreadId());
     PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)s);

     while(TRUE)
	{ /* thread loop */
	 LPVOID p = q.RemoveHead();
	 long n = InterlockedDecrement(&count);
	 showCount(n);
	 PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)p);
	 ::Sleep(200 + rand() % 500); // make simulated delay
	} /* thread loop */
    } // CQueuetestDlg::run

我向主 GUI 线程发送一条消息(这在主 CDialog 的上下文中执行,因此 PostMessage 会将消息发布到该对象),告知线程已启动。将此消息放入列表框的列表框由线程 ID 确定,我通过 ::GetCurrentThreadId() 获取该 ID。

每次将某项添加到队列时,我都会调用 InterlockedIncrement 将计数器加一,移除时调用 InterlockedDecement。该值由 showCount 例程显示。

主循环从队列中检索一项并将其发布回主线程。队列中的项都是 CString * 对象,将由 PostMessage 的接收者删除,正如我在关于工作线程的文章中所述。

然后我引入随机的 ::Sleep 将其减慢到人类交互速度。如果调用 shutdown,则执行此代码的线程将终止(参见RemoveHead 代码)。

这是程序运行的一个示例

请注意,这些项并非严格交替出现;否则,我们会在线程 1 窗口中看到所有偶数编号的消息,在线程 2 窗口中看到所有奇数编号的消息。

那些短语是什么意思?嗯,我找到了这个 buzzword 表,并编写了一个小 buzzword 生成器。当我需要一种方法来为各种输入上下文生成句子时,我就会把它带到每个项目中。请注意,完成此示例后,您可以在商业计划中使用它们。

与其他机制的比较

我见过许多没有使用信号量的非常糟糕的同步实现。我将在此讨论它们,并展示为什么它们不够。

轮询

我见过许多尝试使用轮询的例子。有时,它是这样的形式

while(counter <= 0) /* do nothing */ ;

然后在其他地方,程序员会这样做

counter++; 

并且为了删除某项,程序员会这样做

counter--;

现在,首先要理解的是,counter++,其中 counter 是可以从多个线程访问的变量,在多线程环境中不能、不可能,而且很可能也不会正确工作。就此而言。如果您认为它会正确工作,那么您就错了。添加 volatile 作为声明也不会有帮助。这只会告诉编译器它不能缓存值的副本在寄存器中;它不会保护值本身不被错误使用。如果您在多处理器上运行,它尤其会惨败。

啊哈,您说。您应该使用 ::InterlockedIncrement::InterlockedDecrement。您说得对。这些将保证值被安全地递增和递减,并且实际上能够正确工作。但现在我们还面临轮询的问题。轮询循环会吃掉一个完整的 200 毫秒(大多数 NT/2000/XP 盒子上或多或少)时间片,什么都不做。我有一个很好的例子,我在一个有一个约 40 个控件的控制面板的类中使用它。程序进入轮询循环。当循环运行时弹出控件面板时,您将看到每个…单个…控件…被…重绘…一次…一次…非常…非常…缓慢…因为那个轮询循环正在消耗 CPU 周期。在下一个实验课中,我们将展示如何添加中断并消除轮询。

所以,如果您认为不应该使用信号量,因为它“效率低下”,我可以保证,无论如何,使用信号量总是比轮询更有效。事实上,轮询通常发生在程序中最糟糕的时候,也就是它应该努力创建下一个要入队的对象的时。是的,您可以轮询。但永远不要相信它“更有效”。它几乎肯定不是。

而且,我应该指出,您仍然需要一个 CRITICAL_SECTION 或 Mutex 来保护您的数据对象,因此您可能通过使用轮询并没有获得多少。

CRITICAL_SECTION 阻塞

我见过至少一种实现,它做了类似以下的事情

if(count == 0)
    ::EnterCriticalSection(&wait);

这行不通。首先,在测试进行时(count == 0)和进入关键部分之间,计数可能会发生变化。由于 CRITICAL_SECTION 不是计数对象,因此在等待之前释放将不起作用,因此存在一个导致永远阻塞的竞争条件。

Event 阻塞

这与 CRITICAL_SECTION 阻塞的问题完全相同。虽然您可以争辩说,在实际尝试阻塞之前成功设置 Event 是可能的,但由于 Event 不是计数的,您必须在阻塞之前对计数变量进行同步更新。由于一个线程可能在另一个线程递增计数并执行 ::SetEvent 的同时将计数递减到零并执行 ::ResetEvent,因此您无法保证 ::SetEvent 不会在 ::ResetEvent 之前几百纳秒发生,从而导致顺序错乱并阻塞,而本应可以通过 Event。

信号量不是互斥!

我见过这样的代码

::WaitForSingleObject(semaphore, INFINITE);
queue.RemoveHead(p)

论点是“信号量保护队列操作”。不,信号量保护队列,除非最大信号量值为 1。在这种情况下,您拥有信号量的一个特殊情况,即二元信号量,也称为“互斥量”。请注意,真正的 Mutex 对象(如 ::CreateMutex)实际上具有与信号量略有不同的语义;一个线程据说拥有一个互斥量,如果一个线程试图重新获取它已拥有的 Mutex,它将被允许通过而不阻塞。您必须执行与在它上执行的 ::WaitFor 操作一样多的 ::ReleaseMutex 操作。信号量不是这样工作的;线程第二次尝试获取它已“拥有”的二元信号量将无限期地阻塞。此外,任何线程都可以释放信号量,但只有拥有互斥量的线程才能释放它。信号量仅为可计数资源(如队列中的项目数)提供流程控制,而不保护其他对象免受并发执行。因此,上面的 ::RemoveHead 操作必须由 Mutex 或 CRITICAL_SECTION 保护。

没有信号量的替代品

如果您认为您发明了一种巧妙、更快、更有效、更简单,或者其他任何方式来执行信号量而不实际使用信号量,那么您只是欺骗了自己。阅读 Dijkstra 早期的论文,他当时正在开发同步原语的概念,这些原语是可抢占线程安全的,而且没有 InterlockedIncrement 操作来帮助他。这些是复杂的论文;技术很微妙。只有当您完全理解同步问题时,您才应考虑尝试类似的东西。其余时间,特别是如果您是并行和同步的新手,请将此作为一条规则:您不知道如何在不使用信号量的情况下创建信号量效果。我已经专业地做了二十五年,在尝试用其他机制来伪造信号量的功能方面并不自信。相信我:您不想去那里。


这些文章中表达的观点是作者的观点,不代表,也不被微软认可。

如果您对本文有任何疑问或评论,请在下方留言。
版权所有 © 1999 保留所有权利
www.flounder.com/mvp_tips.htm

许可证

本文未附加明确的许可证,但可能在文章文本或下载文件本身中包含使用条款。如有疑问,请通过下面的讨论区联系作者。

作者可能使用的许可证列表可以在此处找到。

© . All rights reserved.