65.9K
CodeProject 正在变化。 阅读更多。
Home

快速多读锁

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.80/5 (15投票s)

2014年8月19日

CPOL

7分钟阅读

viewsIcon

29558

downloadIcon

437

提供真实读者的无锁和无等待并发的多读者锁,适用于多核系统。

引言

读写锁共享-独占锁多读者/单写者锁多读者锁是同一同步原语的不同名称。这个想法简单而优雅——我们有共享内存,我们希望保持其一致性。最直接的方法是使用互斥锁来保护对内存的访问。但我们可以做得更好——如果你只是读取共享内存,一致性就不会被破坏。所以,让我们为读者提供并发(共享)访问,为写者提供独占访问。这正是所有列出的锁所做的。更多详情请参见 http://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock

标准的读者-写者锁实现

标准的读者-写者锁实现使用基本的同步原语来保护内部共享数据。你可以看到一个例子 http://www.glennslayden.com/code/win32/reader-writer-lock.  因此,即使在读者的锁定/解锁操作中,也会使用一些同步原语。这发生在一个很短的时间内,仅仅是为了更新读者计数器,但它确实发生了。

现在,让我们假设我们在一个强大的 32 核系统上运行。我们有 32 个读取线程,它们进行并发读取,并且读取操作本身非常简短——我的意思是读取者解锁的调用时间非常接近读取者锁定。我们真的会使用我们强大的(32 核)系统的所有资源进行并发执行吗?答案是——。读者线程在获取读者互斥锁时实际上会被串行化,仅仅为了一个操作——更新读者计数器。在示例实现中,这会发生在 void EnterReader(void) 这里

40	        EnterCriticalSection(&m_csReaderCount);
41	        if (++m_cReaders == 1)
42	            ResetEvent(m_hevReadersCleared);
43	        LeaveCriticalSection(&m_csReaderCount);

以及在 void LeaveReader(void) 这里

49	        EnterCriticalSection(&m_csReaderCount);
50	        if (--m_cReaders == 0)
51	            SetEvent(m_hevReadersCleared);
52	        LeaveCriticalSection(&m_csReaderCount);

而这正是我们要解决的问题。

快速多读者锁实现

正如我们所见,问题在于读者的锁定/解锁操作所使用的同步原语。如果我们没有待处理的写者,我们将实现无同步原语的读者的锁定/解锁操作。

让我们进入源代码,并回顾所有锁定/解锁操作

#ifndef _FAST_MULT_READ_LOCK_
#define _FAST_MULT_READ_LOCK_

#include <cassert>
#include <atomic>
#include <mutex>
#include <condition_variable>

struct FastMultReadlockConcurTest;

class FastMultReadLock
{
public:
    void ReaderLock()
    {
        do
        {
            if (pending_writer.load())
            {
                std::unique_lock<std::mutex> lk(pending_writer_update_mutex);
                pending_readers_cv.wait(lk, [this]{return pending_writer.load() == false; });
            }

            ++read_count;

            if (pending_writer.load() == false)
            {
                break;
            }

            --read_count;

        } while (true);
    }

    void ReaderUnLock()
    {
        --read_count;
    }

    void WriterLock()
    {
        writer_entry_mutex.lock();

        pending_writer.store(true);

        while (read_count.load() != 0);
    }

    void WriterUnLock()
    {
        {
            std::lock_guard<std::mutex> guard(pending_writer_update_mutex);

            pending_writer.store(false);

            pending_readers_cv.notify_all();
        }

        writer_entry_mutex.unlock();
    }

public:
    FastMultReadLock() : read_count{ 0 }, pending_writer{ false } {}
    ~FastMultReadLock() {}
    FastMultReadLock(FastMultReadLock&) = delete;
    FastMultReadLock(FastMultReadLock&&) = delete;
    FastMultReadLock& operator= (FastMultReadLock&) = delete;
    FastMultReadLock& operator= (FastMultReadLock&&) = delete;

private:
    std::atomic<uint32_t>     read_count;
    std::atomic<bool>         pending_writer;
    std::mutex                writer_entry_mutex;
    std::mutex                pending_writer_update_mutex;
    std::condition_variable   pending_readers_cv;

    static FastMultReadlockConcurTest concur_test;
};

// Lock free test for bool and uint32_t atomics
struct FastMultReadlockConcurTest
{
    FastMultReadlockConcurTest()
    {
        assert(std::atomic<uint32_t>().is_lock_free() && std::atomic<bool>().is_lock_free());
    }
};

#endif // _FAST_MULT_READ_LOCK_

我们将回顾所有的 4 种(读者/写者和锁定/解锁操作)

让我们从 ReaderLock() 开始,这是实现中最复杂的部分。

ReaderLock()

void ReaderLock()
{
    do
    {
        if (pending_writer.load())
        {
            std::unique_lock<std::mutex> lk(pending_writer_update_mutex);
            pending_readers_cv.wait(lk, [this]{return pending_writer.load() == false; });
        }

        ++read_count;

        if (pending_writer.load() == false)
        {
            break;
        }

        --read_count;

    } while (true);
}

首先,让我们检查一下 ReaderLock 在没有待处理写者的情况下是否真正是无锁的。第一个if语句为假(没有待处理的写者),我们增加读者计数器,再次检查是否有待处理的写者,现在第二个if语句为真,所以我们break并退出方法。因此,如果没有待处理的写者, ReaderLock() 可以在多个核心上执行,无串行化

现在让我们回顾一下 ReaderLock() 在有待处理的写者时(这些写者可能在 ReaderLock() 执行期间的任何时候出现)的正确性。

如果第一个if语句为真,这意味着有一个(或在检查时有一个)待处理的写者,我们将等待条件变量信号,直到写者完成。请注意,如果在此期间,写者解锁了它的写者锁,条件变量将不会阻塞操作,我们将继续,就像没有待处理的写者一样。但如果我们被阻塞在条件变量上,一旦写者退出,我们将继续,同样是“乐观”的假设——没有待处理的写者。

好的,现在我们到了 ++read_count。无论我们如何到达这里,最初我们都没有待处理的写者,或者我们已经通过了条件变量。我们将增加 read_count 并再次检查待处理的写者。如果在此期间有待处理的写者进来——我们将只是回滚(--read_count)然后重新开始。如果没有待处理的写者——我们很幸运,并且成功地锁定了 ReaderLock()。我们将 break 并退出方法。

但这为什么会阻止写者获取锁呢?因为 read_counter 和 pending_writer 都是具有 memory_order_seq_cst 顺序的原子变量WriterLock() 设置 pending_writer 然后开始轮询 read_count。另一方面,ReaderLock() 增加 read_count 然后再次检查 pending_writer。如果在此时有任何写者进来——ReaderLock() 放弃(减少 read_count)然后重新开始。 但如果没有——没有写者可以获取锁,因为 read_count 不是

ReaderUnLock()

void ReaderUnLock()
{
    --read_count;
}

这里没有什么需要解释的,我们只是减少 read_count。如果最后一个读者退出——read_count 将被设置为零,如果有任何轮询的写者——它将获取写者锁。

WriterLock()

void WriterLock()
{
    writer_entry_mutex.lock();

    pending_writer.store(true);

    while (read_count.load() != 0);
}

首先,我们希望在进入方法本身时序列化所有写者: writer_entry_mutex.lock(); 现在我们可以确定只有一个写者将访问多读者锁的内部。我们只需要设置 pending_writer 标志,并轮询最后一个读者退出。我不喜欢轮询,但这是这里最好的解决方案。我们构建了一个优化了并发读者访问的锁,并且不对 read_count 进行访问保护。一旦我们开始轮询 read_count 直到为零,read_count 只会减少。唯一增加 read_count 的地方是在 ReaderLock() 中,但所有新进来的读者都会因为我们设置了 pending_writer 标志而在条件变量上等待。请注意,此轮询操作只会使用我们多核系统的一个核心,因为所有其他潜在的写者都将被 writer_entry_mutex 阻塞。一旦最后一个读者退出——待处理的写者就可以进来了。

作为替代方案,而不是轮询 read_count,我们可以使用另一个条件变量,该变量检查 read_count 为零的条件,并且必须由最后一个即将退出的读者发出信号。但该解决方案有两个缺点:第一个——我们在 ReaderUnLock() 中必须检查零条件,并为最后一个读者发出条件变量信号——这导致了读者的性能下降。第二个——我们必须带有超时来等待 read_count ——wait_for API。而这仅仅是因为我们无法保护 read_count 的访问, 这正是这个解决方案的重点——无锁读者的锁定。因此,我们必须带超时等待,并在超时到期时——再次 wait_for

我们的解决方案针对读者的性能进行了优化,因此 read_count 轮询更好。

WriterUnLock()

void WriterUnLock()
{
    {
        std::lock_guard<std::mutex> guard(pending_writer_update_mutex);

        pending_writer.store(false);

        pending_readers_cv.notify_all();
    }

    writer_entry_mutex.unlock();
}

我们需要重置 pending_writer 标志,并通知所有待处理的读者。在这里,我们将获取 pending_writer_update_mutex 来同步 WriterUnlock() 和 ReaderLock() 中使用的条件变量之间的 pending_writer 标志的访问。通过这种方式,我们可以确定不会出现虚假等待条件变量的情况。一旦我们重置了标志,我们将通知所有待处理的读者线程。最后,解锁 writer_entry_mutex 并允许其他写者访问。

注意,额外的作用域 {} 和 std::lock_guard 是为了更好地控制 pending_writer_update_mutex 的粒度。

FastMultReadlockConcurTest

我们使用 C++11 原子变量来处理 pending_writer 标志和 read_count 计数器。特别是,我们使用了 std::atomic<bool> 和 std::atomic<uint32_t> 类型。这些类型的读写操作是无锁的吗?如果不是——我们的解决方案将不起作用。对原子变量的任何访问都将使用内部实现锁,我们将遇到我们试图避免的同样的读者串行化

这就是为什么我们需要FastMultReadlockConcurTest

struct FastMultReadlockConcurTest
{
    FastMultReadlockConcurTest()
    {
        assert(std::atomic<uint32_t>().is_lock_free() && std::atomic<bool>().is_lock_free());
    }
};

如果任何类型(std::atomic<bool> 或 std::atomic<uint32_t>)不是无锁的,我们将在断言处失败。

并发和性能分析

让我们分析所有 API 方法的并发和性能。

ReaderLock() - 如果没有待处理的写者,ReaderLock() 是无锁的、无等待的,并且可以在多个核心上并行执行。如果有待处理的写者——读者将被阻塞,但这正是多读者锁所期望的。但写者退出时会发生什么?所有待处理的读者都将是无锁的、无等待的,并且可以在多个核心上并行执行。此外,这将立即并发发生(条件变量 notify_all API)。

ReaderUnLock() - 完全 是无锁的、无等待的,并且可以在多个核心上并行执行。

WriterLock() 和 WriterUnLock() - 绝对是非无锁和非无等待操作。非无锁是其本质(独占访问),非无等待是其实现——这是 ReaderLock() 无锁的代价。但在我们的情况(针对并发频繁的读取和偶尔的写入进行了优化)是可以接受的。仅供提醒,等待(轮询)总是只针对一个写者——所以在多核系统中不会浪费太多资源。

结论

Fast Multi-Reader Lock 在多核系统上表现良好,并为多个读者提供真正的、绝对无锁和无等待的读者并发。

但是,如果典型用例不是重度的并发读者执行,如果写操作频繁,或者系统没有多个核心——标准的读者-写者锁实现可能会提供更好的性能。

© . All rights reserved.