65.9K
CodeProject 正在变化。 阅读更多。
Home

使用信号量解决读者/写者问题

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.61/5 (14投票s)

2002年10月12日

4分钟阅读

viewsIcon

356957

downloadIcon

3159

使用信号量解决读者/写者问题

引言

读者/写者问题是经典的同步问题之一。与“餐桌上的哲学家”问题一样,它经常用于比较和对比同步机制。它也是一个非常实际的问题。

读者/写者问题 - 经典定义

两种类型的进程——读者和写者——共享一个数据库。读者执行检查数据库记录的事务;写者事务既检查数据库又更新数据库。假设数据库最初处于一致状态(即,数据之间的关系有意义)。每个事务,如果单独执行,都将数据库从一个一致状态转换为另一个一致状态。为了防止事务之间的干扰,写者进程必须独占访问数据库。假设没有写者正在访问数据库,任意数量的读者都可以同时执行事务。

不久前,我们在工作中不得不实现一个服务器,该服务器将接收到的数据流中继并翻译给多个(通常大于 32 个)客户端。由于此数据流代表不断变化(但非固定时间基准)的市场价格(股票/期权),因此快速中继至关重要。我们开发了一个解决方案,该解决方案由一个接收线程、多个翻译线程,甚至更多的发送线程组成(因为我们不希望在客户端接收失败时阻止服务器)。

显然,所有线程都需要访问已接收和/或已翻译的数据包。为了在不损坏数据的情况下实现这一点,需要进行同步。搜索 MSDN,找到了几种同步对象(CCriticalSectionCMutex 等),但没有一个似乎能满足我们的需求:在不写入时允许多个读取访问。因此,我们决定自己编写所需的同步对象:CMutexRW

使用信号量的正式读者和写者解决方案

由于我们的问题已经被广泛研究(自 1960 年以来?),我们首先查阅了一些关于并行形式化语义的旧大学课本,以更新我们对该问题的知识。很快,我们就找到了描述读者/写者问题和(几个)解决方案的页面。我们选择使用信号量来实现我们的解决方案(具有读者偏好)。

首先,快速(可能可以跳过)回顾一下(形式)信号量:信号量最早由 Dijkstra 在 1968 年引入,他认为它是实现互斥和发出中断等事件信号的有用工具。信号量是一种抽象数据类型的实例:它有一个仅由两个特殊操作——PV——进行操作的表示。由于 Dijkstra 是荷兰人,PV 代表荷兰语单词。具体来说,P 是荷兰语单词 passeren 的首字母,意为“通过”;Vvrijgeven 的首字母,意为“释放”。V 操作发出事件发生的信号;P 操作用于延迟进程直到事件发生。具体来说,这两个操作的实现必须能够保持程序中每个信号量都满足以下属性。

信号量不变量:对于信号量 s,设 nP 为已完成的 P 操作次数,设 nv 为已完成的 V 操作次数。如果 inits 的初始值,那么在所有可见的程序状态中,nP <= nV + init

因此,执行 P 操作可能会延迟,直到执行了足够数量的 V 操作。

使用此信号量定义,可以按如下方式解决读者/写者问题

integer    nReaders   := 0
semaphore  semReaders := 1
semaphore  semWriters := 1
Reader[i: 1 .. m] :: do true ->
                        P( semReaders )
                            nReaders := nReaders + 1
                            if nReaders = 1 -> P( semWriters ) fi
                        V( semReaders )

                            read the database

                        P( semReaders )
                            nReaders := nReaders - 1
                            if nReaders = 0 -> V( semWriters ) fi
                        V( semReaders )
                     od
Writer[j: 1 .. n] :: do true ->
                        P( semWriters )
                             write the database
                        V( semWriters )
                     od

显然,该解决方案赋予读者比写者更高的优先级;一旦有读者正在访问数据库,所有读者都可以读取数据库,而无需对 semWriters 执行 PV 操作。

C++ 中的解决方案

废话不多说,这是我的 CMutexRW 实现。我希望代码不言自明。
class CMutexRW
{
protected:
    HANDLE        m_semReaders;
    HANDLE        m_semWriters;
    int        m_nReaders;
public:
    CMutexRW() :
        m_semReaders(NULL),
        m_semWriters(NULL),
        m_nReaders(0)
    {
        // initialize the Readers & Writers variables
        m_semReaders    = ::CreateSemaphore(NULL, 1, 1, NULL);
        m_semWriters    = ::CreateSemaphore(NULL, 1, 1, NULL);
        m_nReaders    = 0;

        if (m_semReaders == NULL || m_semWriters == NULL)
        {
            LPVOID lpMsgBuf;
            FormatMessage( 
                FORMAT_MESSAGE_ALLOCATE_BUFFER | 
                FORMAT_MESSAGE_FROM_SYSTEM | 
                FORMAT_MESSAGE_IGNORE_INSERTS,
                NULL,
                GetLastError(),
                MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
                (LPTSTR) &lpMsgBuf,
                0,
                NULL 
                );
            TRACE( "***** ERROR: CreateSemaphore: %s\n", (LPCTSTR)lpMsgBuf );
            LocalFree( lpMsgBuf );            
        }
    };

    virtual ~CMutexRW()
    {
        if (m_semWriters)
            VERIFY( ::CloseHandle(m_semWriters) );    

        m_semWriters = NULL;
        if (m_semReaders)
            VERIFY( ::CloseHandle(m_semReaders) );    
        m_semReaders = NULL;
    }

    inline void Lock_DataRead(){
        DWORD dwEvent = WAIT_TIMEOUT;

        // P( semReaders )
        dwEvent = ::WaitForSingleObject( m_semReaders, INFINITE );
        ASSERT(dwEvent == WAIT_OBJECT_0);

        m_nReaders++;

        if (m_nReaders == 1)
        {
            // P( semWriters )
            dwEvent = ::WaitForSingleObject( m_semWriters, INFINITE );
            ASSERT(dwEvent == WAIT_OBJECT_0);
        }
        // V( semReaders )
        VERIFY( ::ReleaseSemaphore( m_semReaders, 1, NULL ) );
    };

    inline void Unlock_DataRead(){
        DWORD dwEvent = WAIT_TIMEOUT;
        // P( semReaders )
        dwEvent = ::WaitForSingleObject( m_semReaders, INFINITE );
        ASSERT(dwEvent == WAIT_OBJECT_0);

        m_nReaders--;

        if (m_nReaders == 0)
        {
            // V( semWriters )
            VERIFY( ::ReleaseSemaphore(m_semWriters, 1, NULL) );
        }
        // V( semReaders )
        VERIFY( ::ReleaseSemaphore( m_semReaders, 1, NULL ) );
    };

    inline void Lock_DataWrite(){
        DWORD dwEvent = WAIT_TIMEOUT;

        // P( semWriters )
        dwEvent = ::WaitForSingleObject( m_semWriters, INFINITE );
        ASSERT(dwEvent == WAIT_OBJECT_0);
    }

    inline void Unlock_DataWrite(){
        // V( semWriters )
        VERIFY( ::ReleaseSemaphore(m_semWriters, 1, NULL) );
    };

};

当然,我们也希望有一些对象能像 MFC 的 CSingleLock 一样工作,即在离开作用域时自动解锁锁定的 CMutexRW。这是我的 CReadLock 实现。

class CReadLock
{
protected:
    CMutexRW* m_pMutexRW;
    bool      m_bIsLocked;
public:
    CReadLock(CMutexRW* pMutexRW, const bool bInitialLock = false) :
        m_pMutexRW(pMutexRW), m_bIsLocked(false)
    {
        ASSERT(m_pMutexRW);
        if (bInitialLock){
            m_pMutexRW->Lock_DataRead();
            m_bIsLocked = true;
        }
    };

    inline const bool& IsLocked() const{
        return m_bIsLocked;
    };

    inline void Lock(){
        ASSERT(m_bIsLocked == false);
        m_pMutexRW->Lock_DataRead();
        m_bIsLocked = true;
    };

    inline void Unlock(){
        ASSERT(m_bIsLocked);
        m_pMutexRW->Unlock_DataRead();
        m_bIsLocked = false;
    };
    virtual ~CReadLock(){
        if (m_bIsLocked){
            m_pMutexRW->Unlock_DataRead();
        }
    };
};

接着是 CWriteLock 的实现。

class CWriteLock
{
protected:
    CMutexRW*   m_pMutexRW;
    bool        m_bIsLocked;
public:
    CWriteLock(CMutexRW* pMutexRW, const bool bInitialLock = false) :
        m_pMutexRW(pMutexRW), m_bIsLocked(false)
    {
        ASSERT(m_pMutexRW);
        if (bInitialLock){
            m_pMutexRW->Lock_DataWrite();
            m_bIsLocked = true;
        }
    };

    inline const bool& IsLocked() const{
        return m_bIsLocked;
    };

    inline void Lock(){
        ASSERT(m_bIsLocked == false);
        m_pMutexRW->Lock_DataWrite();
        m_bIsLocked = true;
    };

    inline void Unlock(){
        ASSERT(m_bIsLocked);
        m_pMutexRW->Unlock_DataWrite();
        m_bIsLocked = false;
    };
    virtual ~CWriteLock(){
        if (m_bIsLocked){
            m_pMutexRW->Unlock_DataWrite();
        }
    };
};

使用示例:

使用这些对象非常简单;当想要读取由 CMutexRW 对象保护的数据时,在局部作用域中构造一个 CReadLock 对象,获取锁并执行实际读取。然后,释放锁,或者离开局部作用域。同样,我们可以以线程安全的方式写入数据。
class MyExampleObject{
protected:
      CMutexRW  m_mutexRW;
      int       m_Data;
public:
      void Set(const int& srcData){
           CWriteLock writeLock(&m_mutexRW, true);
           m_Data = srcData;
      }
      void Get(int& destData){
           CReadLock readLock(&m_mutexRW, true);
           destData = m_Data;
      }
};

(可以观察到)有一个小而烦人的问题;Get 例程不是 const 的,即 MyExampleObject 的行为不像(数据)对象应有的那样。这可以通过如下实现代码轻松解决:

class MyExampleObject{
protected:
      mutable CMutexRW  m_mutexRW;
      int               m_Data;
public:
      void Set(const int& srcData){
           CWriteLock writeLock(&m_mutexRW, true);
           m_Data = srcData;
      }
      void Get(int& destData) const{
           CReadLock readLock(&m_mutexRW, true);
           destData = m_Data;
      }
};

标准免责声明

尽管我们在软件中广泛使用(并针对性能略有修改)了讨论的对象,但我不能保证 100% 无错误、线程安全等行为。所以请使用它,好好使用它,但永远不要忘记,我只是另一个程序员 ;)。
© . All rights reserved.