65.9K
CodeProject 正在变化。 阅读更多。
Home

RWMutex: 一个共享/独占递归互斥锁

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (18投票s)

2015年12月12日

CPOL

3分钟阅读

viewsIcon

64919

一个具有升级/降级功能的共享/独占访问互斥锁

引言

我的目标是创建一个可以充当读/写锁定机制的东西。任何线程都可以锁定它以进行读取,但只有一个线程可以锁定它以进行写入。在写入线程释放它之前,所有其他线程都会等待。在任何其他线程释放之前,写入线程不会获取互斥锁。

我可以使用 Slim Reader/Writer 锁,但是

  • 它们不是递归的,例如,如果同一个线程之前调用了同一个函数,那么调用 AcquireSRWLockExclusive() 将会阻塞。
  • 它们不可升级,例如,已锁定锁以进行读取访问的线程无法锁定它以进行写入。
  • 它们不是可复制的句柄。

我可以尝试 C++ 14 shared_lock,但我仍然需要 C++ 11 支持。此外,我还不确定它是否真的能满足我的要求。

因此,我不得不手动实现它。 由于缺少 WaitForMultipleObjects (nyi),普通的 C++ 11 方法已被删除。 现在具有升级/降级功能。

RWMUTEX

我的类相当简单。

class RWMUTEX
    {
    private:
        HANDLE hChangeMap;
        std::map<DWORD, HANDLE> Threads;
        RWMUTEX(const RWMUTEX&) = delete;
        RWMUTEX(RWMUTEX&&) = delete;

我需要一个 std::map<DWORD,HANDLE> 来存储尝试访问共享资源的所有线程的句柄,并且还需要一个互斥锁句柄来确保对此映射的所有更改都是线程安全的。

构造函数

RWMUTEX(const RWMUTEX&) = delete;
void operator =(const RWMUTEX&) = delete;

RWMUTEX()
    {
    hChangeMapWrite = CreateMutex(0,0,0);
    }

只需创建一个指向更改映射互斥锁的句柄。 该对象不应该是可复制的。

CreateIf

HANDLE CreateIf(bool KeepReaderLocked = false)
    {
                WaitForSingleObject(hChangeMap, INFINITE);
                DWORD id = GetCurrentThreadId();
                if (Threads[id] == 0)
                    {
                    HANDLE e0 = CreateMutex(0, 0, 0);
                    Threads[id] = e0;
                    }
                HANDLE e = Threads[id];
                if (!KeepReaderLocked)
                      ReleaseMutex(hChangeMap);
                return e; 
    }

当您调用 LockRead() LockWrite() 来锁定对象时,会调用这个 private 成员函数。 如果当前线程尚未将自己注册到可能访问此互斥锁的线程,则此函数会为该线程创建一个互斥锁。 如果某些其他线程已锁定此互斥锁以进行写入访问,则此函数将阻塞,直到写入线程释放该对象。 此函数返回当前线程的互斥锁句柄。

LockRead/ReleaseRead

HANDLE LockRead()
    {
    auto f = CreateIf();
    WaitForSingleObject(f,INFINITE);
    return f;
    }
void ReleaseRead(HANDLE f)
    {
    ReleaseMutex(f);
    }

当您想要锁定对象以进行读取访问并在以后释放它时,会调用这些函数。

LockWrite/ReleaseWrite

void LockWrite()
    {
                CreateIf(true);

                // Wait for all 
                vector<HANDLE> AllThreads;
                AllThreads.reserve(Threads.size());
                for (auto& a : Threads)
                    {
                    AllThreads.push_back(a.second);
                    }

                WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, INFINITE);

                // Reader is locked
    }

void ReleaseWrite()
    {
    
    // Release All
    for (auto& a : Threads)
        ReleaseMutex(a.second);
    ReleaseMutex(hChangeMap);
    }

当您想要锁定对象以进行写入访问并在以后释放它时,会调用这些函数。 LockWrite() 函数确保

  1. 在锁定期间不注册新线程,并且
  2. 任何读取线程都已释放锁

析构函数

~RWMUTEX()
    {
    CloseHandle(hChangeMap);
    hChangeMap = 0;
    for (auto& a : Threads)
        CloseHandle(a.second);
    Threads.clear();
    }

析构函数确保所有句柄都被清除。

可升级/可降级锁

有时,您希望将读取锁升级为写入锁,而无需先解锁,以提高效率。 因此,LockWrite 被修改如下

    void LockWrite(DWORD updThread = 0)
    {
        CreateIf(true);

        // Wait for all 
        AllThreads.reserve(Threads.size());
        AllThreads.clear();
        for (auto& a : Threads)
        {
            if (updThread == a.first) // except ourself if in upgrade operation
                continue;
            AllThreads.push_back(a.second);
        }
        auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi);

        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"LockWrite debug timeout!");

        // We don't want to keep threads, the hChangeMap is enough
        // We also release the handle to the upgraded thread, if any
        for (auto& a : Threads)
            ReleaseMutex(a.second);

        // Reader is locked
    }

    void Upgrade()
    {
        LockWrite(GetCurrentThreadId());
    }

    HANDLE Downgrade()
    {
        DWORD id = GetCurrentThreadId();
        auto z = Threads[id];
        auto tim = WaitForSingleObject(z, wi);
        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"Downgrade debug timeout!");
        ReleaseMutex(hChangeMap);
        return z;
    }

现在调用 Upgrade() 会导致

  • 更改映射被锁定
  • 等待所有读取线程退出,除了我们自己的

然后我们释放我们自己的线程互斥锁,因为锁定更改映射就足够了。

调用 Downgrade() 会导致

  • 直接从映射获取句柄,无需重新锁定
  • 像在读取模式下一样锁定此句柄
  • 释放更改映射

所以整个代码是(带有一些调试帮助)

// RWMUTEX
    class RWMUTEX
        {
        private:
            HANDLE hChangeMap = 0;
            std::map<DWORD, HANDLE> Threads;
            DWORD wi = INFINITE;
            RWMUTEX(const RWMUTEX&) = delete;
            RWMUTEX(RWMUTEX&&) = delete;
            operator=(const RWMUTEX&) = delete;

        public:

            RWMUTEX(bool D = false)
                {
                if (D)
                    wi = 10000;
                else
                    wi = INFINITE;
                hChangeMap = CreateMutex(0, 0, 0);
                }

            ~RWMUTEX()
                {
                CloseHandle(hChangeMap);
                hChangeMap = 0;
                for (auto& a : Threads)
                    CloseHandle(a.second);
                Threads.clear();
                }

            HANDLE CreateIf(bool KeepReaderLocked = false)
                {
                auto tim = WaitForSingleObject(hChangeMap, INFINITE);
                if (tim == WAIT_TIMEOUT && wi != INFINITE)
                    OutputDebugString(L"LockRead debug timeout!");
                DWORD id = GetCurrentThreadId();
                if (Threads[id] == 0)
                    {
                    HANDLE e0 = CreateMutex(0, 0, 0);
                    Threads[id] = e0;
                    }
                HANDLE e = Threads[id];
                if (!KeepReaderLocked)    
                    ReleaseMutex(hChangeMap);
                return e;
                }

            HANDLE LockRead()
                {
                auto z = CreateIf();
                auto tim = WaitForSingleObject(z, wi);
                if (tim == WAIT_TIMEOUT && wi != INFINITE)
                    OutputDebugString(L"LockRead debug timeout!");
                return z;
                }

    void LockWrite(DWORD updThread = 0)
    {
        CreateIf(true);

        // Wait for all 
        AllThreads.reserve(Threads.size());
        AllThreads.clear();
        for (auto& a : Threads)
        {
            if (updThread == a.first) // except ourself if in upgrade operation
                continue;
            AllThreads.push_back(a.second);
        }
        auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi);

        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"LockWrite debug timeout!");

        // We don't want to keep threads, the hChangeMap is enough
        // We also release the handle to the upgraded thread, if any
        for (auto& a : Threads)
            ReleaseMutex(a.second);

        // Reader is locked
    }

    void ReleaseWrite()
    {
        ReleaseMutex(hChangeMap);
    }

    void ReleaseRead(HANDLE f)
    {
        ReleaseMutex(f);
    }

    void Upgrade()
    {
        LockWrite(GetCurrentThreadId());
    }

    HANDLE Downgrade()
    {
        DWORD id = GetCurrentThreadId();
        auto z = Threads[id];
        auto tim = WaitForSingleObject(z, wi);
        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"Downgrade debug timeout!");
        ReleaseMutex(hChangeMap);
        return z;
    }              
};

要使用 RWMUTEX,您可以简单地创建锁定类

class RWMUTEXLOCKREAD
    {
    private:
        RWMUTEX* mm = 0;
    public:

        RWMUTEXLOCKREAD(const RWMUTEXLOCKREAD&) = delete;
        void operator =(const RWMUTEXLOCKREAD&) = delete;

        RWMUTEXLOCKREAD(RWMUTEX*m)
            {
            if (m)
                {
                mm = m;
                mm->LockRead();
                }
            }
        ~RWMUTEXLOCKREAD()
            {
            if (mm)
                {
                mm->ReleaseRead();
                mm = 0;
                }
            }
    };

class RWMUTEXLOCKWRITE
    {
    private:
        RWMUTEX* mm = 0;
    public:
        RWMUTEXLOCKWRITE(RWMUTEX*m)
            {
            if (m)
                {
                mm = m;
                mm->LockWrite();
                }
            }
        ~RWMUTEXLOCKWRITE()
            {
            if (mm)
                {
                mm->ReleaseWrite();
                mm = 0;
                }
            }
    };

以及一个用于升级机制的新类

class RWMUTEXLOCKREADWRITE
{
private:
    RWMUTEX* mm = 0;
    HANDLE lm = 0;
    bool U = false;
public:

    RWMUTEXLOCKREADWRITE(const RWMUTEXLOCKREADWRITE&) = delete;
    void operator =(const RWMUTEXLOCKREADWRITE&) = delete;

    RWMUTEXLOCKREADWRITE(RWMUTEX*m)
    {
        if (m)
        {
            mm = m;
            lm = mm->LockRead();
        }
    }

    void Upgrade()
    {
        if (mm && !U)
        {
            mm->Upgrade();
            lm = 0;
            U = 1;
        }
    }

    void Downgrade()
    {
        if (mm && U)
        {
            lm = mm->Downgrade();
            U = 0;
        }
    }

    ~RWMUTEXLOCKREADWRITE()
    {
        if (mm)
        {
            if (U)
                mm->ReleaseWrite();
            else
                mm->ReleaseRead(lm);
            lm = 0;
            mm = 0;
        }
    }
};

使用示例

RWMUTEX m;

// ... other code
void foo1() {
  RWMUTEXLOCKREAD lock(&m);
  }

void foo2() {
 RWMUTEXLOCKWRITE lock(&m);
}

历史

  • 2018年12月13日:添加了升级机制
  • 2017年12月10日:添加了调试辅助功能,并通过允许 ReleaseRead() 不调用 CreateIf() 修复了读/写死锁
  • 2017年5月12日:修复了写入者中罕见的死锁,并简化了类
  • 2016年8月23日:修复了读取器解锁中的死锁
  • 2015年12月17日:修复了读取中的竞争错误(并删除了 C++ 11 实现)
  • 2015年12月12日:首次发布
© . All rights reserved.