使用信号量解决读者/写者问题






3.61/5 (14投票s)
2002年10月12日
4分钟阅读

356957

3159
使用信号量解决读者/写者问题
引言
读者/写者问题是经典的同步问题之一。与“餐桌上的哲学家”问题一样,它经常用于比较和对比同步机制。它也是一个非常实际的问题。
读者/写者问题 - 经典定义
两种类型的进程——读者和写者——共享一个数据库。读者执行检查数据库记录的事务;写者事务既检查数据库又更新数据库。假设数据库最初处于一致状态(即,数据之间的关系有意义)。每个事务,如果单独执行,都将数据库从一个一致状态转换为另一个一致状态。为了防止事务之间的干扰,写者进程必须独占访问数据库。假设没有写者正在访问数据库,任意数量的读者都可以同时执行事务。
不久前,我们在工作中不得不实现一个服务器,该服务器将接收到的数据流中继并翻译给多个(通常大于 32 个)客户端。由于此数据流代表不断变化(但非固定时间基准)的市场价格(股票/期权),因此快速中继至关重要。我们开发了一个解决方案,该解决方案由一个接收线程、多个翻译线程,甚至更多的发送线程组成(因为我们不希望在客户端接收失败时阻止服务器)。
显然,所有线程都需要访问已接收和/或已翻译的数据包。为了在不损坏数据的情况下实现这一点,需要进行同步。搜索 MSDN,找到了几种同步对象(CCriticalSection
、CMutex
等),但没有一个似乎能满足我们的需求:在不写入时允许多个读取访问。因此,我们决定自己编写所需的同步对象:CMutexRW
。
使用信号量的正式读者和写者解决方案
由于我们的问题已经被广泛研究(自 1960 年以来?),我们首先查阅了一些关于并行形式化语义的旧大学课本,以更新我们对该问题的知识。很快,我们就找到了描述读者/写者问题和(几个)解决方案的页面。我们选择使用信号量来实现我们的解决方案(具有读者偏好)。首先,快速(可能可以跳过)回顾一下(形式)信号量:信号量最早由 Dijkstra 在 1968 年引入,他认为它是实现互斥和发出中断等事件信号的有用工具。信号量是一种抽象数据类型的实例:它有一个仅由两个特殊操作——P 和 V——进行操作的表示。由于 Dijkstra 是荷兰人,P 和 V 代表荷兰语单词。具体来说,P 是荷兰语单词 passeren 的首字母,意为“通过”;V 是 vrijgeven 的首字母,意为“释放”。V 操作发出事件发生的信号;P 操作用于延迟进程直到事件发生。具体来说,这两个操作的实现必须能够保持程序中每个信号量都满足以下属性。
信号量不变量:对于信号量 s,设 nP 为已完成的 P 操作次数,设 nv 为已完成的 V 操作次数。如果 init 是 s 的初始值,那么在所有可见的程序状态中,nP <= nV + init。
因此,执行 P 操作可能会延迟,直到执行了足够数量的 V 操作。
使用此信号量定义,可以按如下方式解决读者/写者问题
integer nReaders := 0
semaphore semReaders := 1
semaphore semWriters := 1
Reader[i: 1 .. m] :: do true ->
P( semReaders )
nReaders := nReaders + 1
if nReaders = 1 -> P( semWriters ) fi
V( semReaders )
read the database
P( semReaders )
nReaders := nReaders - 1
if nReaders = 0 -> V( semWriters ) fi
V( semReaders )
od
Writer[j: 1 .. n] :: do true ->
P( semWriters )
write the database
V( semWriters )
od
显然,该解决方案赋予读者比写者更高的优先级;一旦有读者正在访问数据库,所有读者都可以读取数据库,而无需对 semWriters
执行 P 或 V 操作。
C++ 中的解决方案
废话不多说,这是我的CMutexRW
实现。我希望代码不言自明。class CMutexRW { protected: HANDLE m_semReaders; HANDLE m_semWriters; int m_nReaders; public: CMutexRW() : m_semReaders(NULL), m_semWriters(NULL), m_nReaders(0) { // initialize the Readers & Writers variables m_semReaders = ::CreateSemaphore(NULL, 1, 1, NULL); m_semWriters = ::CreateSemaphore(NULL, 1, 1, NULL); m_nReaders = 0; if (m_semReaders == NULL || m_semWriters == NULL) { LPVOID lpMsgBuf; FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &lpMsgBuf, 0, NULL ); TRACE( "***** ERROR: CreateSemaphore: %s\n", (LPCTSTR)lpMsgBuf ); LocalFree( lpMsgBuf ); } }; virtual ~CMutexRW() { if (m_semWriters) VERIFY( ::CloseHandle(m_semWriters) ); m_semWriters = NULL; if (m_semReaders) VERIFY( ::CloseHandle(m_semReaders) ); m_semReaders = NULL; } inline void Lock_DataRead(){ DWORD dwEvent = WAIT_TIMEOUT; // P( semReaders ) dwEvent = ::WaitForSingleObject( m_semReaders, INFINITE ); ASSERT(dwEvent == WAIT_OBJECT_0); m_nReaders++; if (m_nReaders == 1) { // P( semWriters ) dwEvent = ::WaitForSingleObject( m_semWriters, INFINITE ); ASSERT(dwEvent == WAIT_OBJECT_0); } // V( semReaders ) VERIFY( ::ReleaseSemaphore( m_semReaders, 1, NULL ) ); }; inline void Unlock_DataRead(){ DWORD dwEvent = WAIT_TIMEOUT; // P( semReaders ) dwEvent = ::WaitForSingleObject( m_semReaders, INFINITE ); ASSERT(dwEvent == WAIT_OBJECT_0); m_nReaders--; if (m_nReaders == 0) { // V( semWriters ) VERIFY( ::ReleaseSemaphore(m_semWriters, 1, NULL) ); } // V( semReaders ) VERIFY( ::ReleaseSemaphore( m_semReaders, 1, NULL ) ); }; inline void Lock_DataWrite(){ DWORD dwEvent = WAIT_TIMEOUT; // P( semWriters ) dwEvent = ::WaitForSingleObject( m_semWriters, INFINITE ); ASSERT(dwEvent == WAIT_OBJECT_0); } inline void Unlock_DataWrite(){ // V( semWriters ) VERIFY( ::ReleaseSemaphore(m_semWriters, 1, NULL) ); }; };
当然,我们也希望有一些对象能像 MFC 的 CSingleLock
一样工作,即在离开作用域时自动解锁锁定的 CMutexRW
。这是我的 CReadLock
实现。
class CReadLock { protected: CMutexRW* m_pMutexRW; bool m_bIsLocked; public: CReadLock(CMutexRW* pMutexRW, const bool bInitialLock = false) : m_pMutexRW(pMutexRW), m_bIsLocked(false) { ASSERT(m_pMutexRW); if (bInitialLock){ m_pMutexRW->Lock_DataRead(); m_bIsLocked = true; } }; inline const bool& IsLocked() const{ return m_bIsLocked; }; inline void Lock(){ ASSERT(m_bIsLocked == false); m_pMutexRW->Lock_DataRead(); m_bIsLocked = true; }; inline void Unlock(){ ASSERT(m_bIsLocked); m_pMutexRW->Unlock_DataRead(); m_bIsLocked = false; }; virtual ~CReadLock(){ if (m_bIsLocked){ m_pMutexRW->Unlock_DataRead(); } }; };
接着是 CWriteLock
的实现。
class CWriteLock { protected: CMutexRW* m_pMutexRW; bool m_bIsLocked; public: CWriteLock(CMutexRW* pMutexRW, const bool bInitialLock = false) : m_pMutexRW(pMutexRW), m_bIsLocked(false) { ASSERT(m_pMutexRW); if (bInitialLock){ m_pMutexRW->Lock_DataWrite(); m_bIsLocked = true; } }; inline const bool& IsLocked() const{ return m_bIsLocked; }; inline void Lock(){ ASSERT(m_bIsLocked == false); m_pMutexRW->Lock_DataWrite(); m_bIsLocked = true; }; inline void Unlock(){ ASSERT(m_bIsLocked); m_pMutexRW->Unlock_DataWrite(); m_bIsLocked = false; }; virtual ~CWriteLock(){ if (m_bIsLocked){ m_pMutexRW->Unlock_DataWrite(); } }; };
使用示例:
使用这些对象非常简单;当想要读取由CMutexRW
对象保护的数据时,在局部作用域中构造一个 CReadLock
对象,获取锁并执行实际读取。然后,释放锁,或者离开局部作用域。同样,我们可以以线程安全的方式写入数据。class MyExampleObject{ protected: CMutexRW m_mutexRW; int m_Data; public: void Set(const int& srcData){ CWriteLock writeLock(&m_mutexRW, true); m_Data = srcData; } void Get(int& destData){ CReadLock readLock(&m_mutexRW, true); destData = m_Data; } };
(可以观察到)有一个小而烦人的问题;Get
例程不是 const
的,即 MyExampleObject
的行为不像(数据)对象应有的那样。这可以通过如下实现代码轻松解决:
class MyExampleObject{ protected: mutable CMutexRW m_mutexRW; int m_Data; public: void Set(const int& srcData){ CWriteLock writeLock(&m_mutexRW, true); m_Data = srcData; } void Get(int& destData) const{ CReadLock readLock(&m_mutexRW, true); destData = m_Data; } };