65.9K
CodeProject 正在变化。 阅读更多。
Home

C++ 线程间安全易用的对象共享方式

starIconstarIconstarIconstarIconstarIcon

5.00/5 (11投票s)

2016年6月14日

MIT

14分钟阅读

viewsIcon

45334

downloadIcon

345

小心数据竞争的偷袭。

摘要

首先,本文不应被理解为鼓励在异步线程之间共享对象。事实上,我们认为线程间共享对象是一种常常有问题但有时又不可避免的做法。通常,在设计并发代码时,我们建议在可行的情况下优先选择“隔离 + 异步消息”范式。话虽如此,当你确实要在线程之间共享对象时,你需要尽可能安全地进行。

在 C++ 中,对象在线程间共享的场景可以分为两类——一种是“只读”场景,其中对象永远不会被修改;另一种是“非只读”场景。非只读场景需要访问控制机制。

请注意,在 C++ 中,对象被声明为 const 并不保证它不会被修改,因为对象可能包含“可变”(mutable)成员。有时这些可变成员会受到“保护”(通过互斥锁或等效机制),使其成为“线程安全”的。但有时它们不受保护(尤其是在旧代码中)。因此,在确定对象是否可能被修改时,可能需要额外的警惕。

因此,我们首先考虑程序员希望允许共享对象被修改,并且共享对象可能包含不受保护的可变成员的通用场景。对于这些场景,可以使用 mse::TAsyncSharedReadWriteAccessRequester<>,如下面的示例所示。

#include "mseasyncshared.h"

#include <future>
#include <list>
#include <random>

#include <iostream>
#include <ratio>
#include <chrono>
#include <string>

int main()
{
    std::default_random_engine rand_generator1;
    std::uniform_int_distribution<int> udist_0_9(0, 9);
    const size_t num_tasks = 10;
    const size_t num_digits_per_task = 10000;
    const size_t num_digits = num_tasks * num_digits_per_task;

    /* This block contains a simple example demonstrating the use of
    mse::TAsyncSharedReadWriteAccessRequester to safely share an object between threads. */

    class CObj1WithUnprotectedMutable {
    public:
        std::string text() const {
            m_last_access_time = std::chrono::system_clock::now();
            return m_text1;
        }
        void set_text(const std::string& text) {
            m_last_access_time = std::chrono::system_clock::now();
            m_text1 = text;
        }
        std::chrono::system_clock::time_point last_access_time() {
            return m_last_access_time;
        }
    private:
        std::string m_text1 = "initial text";
        /* Note that mutable members can affect the safety of object sharing. */
        mutable std::chrono::system_clock::time_point m_last_access_time;
    };

    class B {
    public:
        static size_t num_occurrences(
            mse::TAsyncSharedReadWriteAccessRequester<CObj1WithUnprotectedMutable>
            obj1_access_requester, const char ch, size_t start_pos, size_t length) {

            /* Here we're counting the number of occurrences of the given character in the
            specified section of the (shared) object's string of digits. */
            auto obj1_readlock_ptr = obj1_access_requester.readlock_ptr();
            auto end_pos = start_pos + length;
            assert(end_pos <= obj1_readlock_ptr->text().length());
            size_t num_occurrences = 0;
            for (size_t i = start_pos; i < end_pos; i += 1) {
                if (obj1_readlock_ptr->text().at(i) == ch) {
                    num_occurrences += 1;
                }
            }
            return num_occurrences;

            /* At the end of the scope, obj1_readlock_ptr will be destroyed and its lock
            on the shared object will be released. */
        }
    };

    /* mse::make_asyncsharedreadwrite<>, like std::make_shared<>, actually allocates the
    target object. */
    auto obj1_access_requester = mse::make_asyncsharedreadwrite<CObj1WithUnprotectedMutable>();

    std::string rand_digits_string;
    for (size_t i = 0; i < num_digits; i += 1) {
        /* Just generating a random string of digits. */
        rand_digits_string += std::to_string(udist_0_9(rand_generator1));
    }
    /* In the next line we temporarily grab a pointer to the object with a "write lock"
    so we can (safely) call a non-const member function. */
    obj1_access_requester.writelock_ptr()->set_text(rand_digits_string);

    std::list<std::future<size_t>> futures;
    for (size_t i = 0; i < num_tasks; i += 1) {
        /* Here we're dividing the (shared) object's string of digits into sections and
        setting up some (potentially) asynchronous tasks to count the number of
        occurrences of the character '5' in each section. */
        futures.emplace_back(std::async(B::num_occurrences, obj1_access_requester, '5',
            i*num_digits_per_task, num_digits_per_task));
    }

    size_t total_num_occurrences = 0;
    for (auto it = futures.begin(); futures.end() != it; it++) {
        total_num_occurrences += (*it).get();
    }
}

mse::TAsyncSharedReadWriteAccessRequester<> 会自动保护共享对象,使其在另一个线程修改时不会被访问。(尽管在这个简单的例子中这不是真正的问题。)

但由于共享对象可能包含不受保护的可变成员,出于审慎考虑,mse::TAsyncSharedReadWriteAccessRequester<> 默认不允许同时访问,即使是通过 readlock_ptr

但有时你可能确实想允许同时进行读取操作。对于这些情况,可以使用 mse::TAsyncSharedObjectThatYouAreSureHasNoUnprotectedMutablesReadWriteAccessRequester<>(以及 mse::make_asyncsharedobjectthatyouaresurehasnounprotectedmutablesreadwrite<>())。它具有与 mse::TAsyncSharedReadWriteAccessRequester<> 相同的接口,但名称有些冗长,以帮助提醒用户使用它的前提条件。

最后,一个常见的场景是简单的只读访问,程序员已可靠地确定共享对象没有不受保护的可变成员。在这种情况下,你可以不使用任何访问控制机制。对于这种情况,可以使用 mse::TReadOnlyStdSharedFixedConstPointer<>,它基本上只是一个 std::shared_ptr 的薄封装(并公开继承自它),它试图确保共享对象是 const 的,并向其他阅读代码的人明确指针的预期用途(即共享一个不会在线程间修改的对象)。

#include "mseasyncshared.h"

#include <future>
#include <list>
#include <random>

#include <iostream>
#include <string>

int main()
{
    /* This block contains an example demonstrating the use of
    mse::TReadOnlyStdSharedFixedConstPointer to share an object between threads in
    simple read only situations. */

    class CObj1WithNoMutables {
    public:
        CObj1WithNoMutables(const std::string& text) : m_text1(text) {}
        std::string text() const {
            return m_text1;
        }
        void set_text(const std::string& text) {
            m_text1 = text;
        }
    private:
        std::string m_text1 = "initial text";
    };

    class B {
    public:
        static size_t num_occurrences(const std::shared_ptr<const CObj1WithNoMutables> obj1_shptr,
            const char ch, size_t start_pos, size_t length) {

            auto end_pos = start_pos + length;
            assert(end_pos <= obj1_shptr->text().length());
            size_t num_occurrences = 0;
            for (size_t i = start_pos; i < end_pos; i += 1) {
                if (obj1_shptr->text().at(i) == ch) {
                    num_occurrences += 1;
                }
            }
            return num_occurrences;
        }
    };

    std::string rand_digits_string;
    for (size_t i = 0; i < num_digits; i += 1) {
        rand_digits_string += std::to_string(udist_0_9(rand_generator1));
    }

    /* mse::make_readonlystdshared<> returns an mse::TReadOnlyStdSharedFixedConstPointer
    which is compatible with the corresponding std::shared_ptr. Aside from enforcing
    constness, the main reason for using mse::make_readonlystdshared<> over
    std::make_shared<> is to make clear the intended purpose of the pointer. Namely, to
    share an object between threads with the intent that the object not be modified. */
    auto obj1_roshfcptr = mse::make_readonlystdshared<CObj1WithNoMutables>(rand_digits_string);

    std::list<std::future<size_t>> futures;
    for (size_t i = 0; i < num_tasks; i += 1) {
        futures.emplace_back(std::async(B::num_occurrences, obj1_roshfcptr, '5',
            i*num_digits_per_task, num_digits_per_task));
    }

    size_t total_num_occurrences = 0;
    for (auto it = futures.begin(); futures.end() != it; it++) {
        total_num_occurrences += (*it).get();
    }
}

进入竞争状态

粗略地说,“竞争条件”是指程序或代码的结果可能因并发线程的相对执行时序而异的情况。例如,假设你有一个整数变量 x,初始值为 5,并且有两个线程。假设其中一个线程会将 x 加 1,而另一个线程会将 x 乘以 2。那么 x 的最终值可能是 11 或 12,这取决于哪个线程先访问 x。对吧?

“数据竞争”是指一个或多个异步线程在内存被修改时(由另一个线程)访问该内存的情况。我们认为数据竞争是竞争条件的一个特定案例,但其他人选择将数据竞争排除在其竞争条件的定义之外。异步线程之间的任何通信都可能足以发生竞争条件。另一方面,数据竞争要求一块内存被多个异步线程“共享”。也就是说,每个线程在一段时间内都可以直接访问该内存。

数据竞争 bug 官方会导致“未定义行为”。“未定义行为”在这里基本上是“最严重后果的可能性”的委婉说法,包括无效内存访问,以及在某些情况下,任意代码执行。特别是,数据竞争 bug 会导致对象在不一致的状态下可访问。例如,如果我们考虑一个 std::vector,我们可以想象它包含一个指向某个已分配内存的指针、一个表示所包含元素数量的整数,以及另一个表示分配内存容量(以元素数量计)的整数。(对用户而言)包含的元素数量应始终小于等于容量。(像这样的应始终成立的关系称为“不变量”。)如果用户发现此关系为 false,则 std::vector 将被视为处于“不一致”或“损坏”状态。在 std::vector 的情况下,可能导致无效内存访问。

无效内存访问可以被认为是 C++ 中最严重的 bug 之一(如果不是最严重的的话)。(尽管理论上,所有“未定义行为” bug 都是等价的。)它们特别糟糕,因为在实践中它们可能导致程序停止执行,暴露存储在通常无法访问的内存中的敏感数据,甚至允许任意代码执行并危及主机环境。因此,通常优先考虑减少或消除无效内存访问的可能性,即使这会带来一些代价(例如性能、灵活性、“隐藏 bug”、增加其他较不严重 bug 的可能性等)。为此,虽然无法防止所有竞争条件 bug,但我们希望能够减少或消除那些不可避免地存在无效内存访问风险的 bug。

这些包括数据竞争 bug,但也包括一组稍微更广泛的我们可能称之为“对象竞争”的 bug。数据竞争 bug 涉及对共享内存的直接访问,而“对象竞争” bug 还包括通过共享对象的任何部分(包括成员函数、友元函数和运算符)间接访问共享内存。这里的想法是,维护一致内部状态的对象通常允许其函数和运算符暂时将内部状态更改为不一致的状态,只要在函数或运算符返回之前恢复一致性。“对象竞争” bug 包括一个线程在另一个异步线程中执行的成员函数或运算符将其暂时置于不一致状态时,允许另一个线程访问共享对象的 bug。

如果我们考虑使 bug 存在问题的​​主要因素

i) 发生的频率/不可预测性
ii) 后果的严重性
iii) 难以重现/调试
iv) 在测试期间逃避检测的能力

我们注意到,“对象竞争” bug 作为一类,实现了“超级全满”——所有四个因素都得到了极大的满足,这使其成为计算机编程中可能最糟糕的一类 bug。

因此,在可行的情况下,最好避免在异步线程之间共享对象,并且在不可避免时,不吝惜任何安全机制。

因此,考虑 std::shared_ptr 如何用于解决“对象生命周期” bug,即当您(尝试)访问已释放的对象时的 bug。如果您可以假设一个程序,其中所有对象都通过 std::make_shared 分配,并且仅通过(适当的)std::shared_ptr 访问,那么您基本上可以保证没有“对象生命周期” bug。对吧?因为 std::shared_ptr 做了两件事——它们提供了对对象的访问,并且它们控制对象何时被释放。因此,它们确保在它们(所有永久)停止提供对对象的访问之前,对象不会被释放。

好吧,可以使用类似的技术来确保在另一个线程修改共享对象时不会访问它。为了安全地访问对象,线程需要拥有该对象(写或读)的适当“访问锁”。现在,想象一下智能指针,它们像 std::shared_ptr 一样控制其目标对象的释放时间(即拥有对象的生命周期“所有权”),但此外,还控制访问锁的释放时间(即拥有访问锁的“所有权”)。我们将这些智能指针分别称为“writelock_ptr”和“readlock_ptr”。在它们停止提供对共享对象的访问(即当它们被销毁时)之前,它们不会释放其访问锁。

因此,仅将共享对象的访问限制在 writelock_ptrs 和/或 readlock_ptrs 上将确保访问是安全的。

虽然 writelock_ptrs 和 readlock_ptrs 的使用方式与 std::shared_ptr 相似,但存在差异。特别是,如果您预计将来需要访问目标对象,您可能会考虑存储一个 std::shared_ptr 以备后用。相比之下,您不会对 writelock_ptrs 和/或 readlock_ptrs 这样做,因为只要它们存在,它们就会持有共享对象的锁,可能会阻止其他线程访问该对象。总的来说,您希望最小化 writelock_ptrs 和 readlock_ptrs 的生命周期,以最大限度地提高共享对象的可用性。这需要一个我们称之为“访问请求者”的单独的辅助对象。访问请求者不提供对共享对象的直接访问,也不持有对其的任何锁定。它们确实(尝试)在请求时提供 writelock_ptrs 和/或 readlock_ptrs。因此,如果您预计将来需要访问共享对象,而不是持有 writelock_ptr 或 readlock_ptr,您可以随时使用访问请求者重新获取一个。(请参阅“摘要”部分中的第一个示例和/或下载附带的源代码以获取更全面的示例。)

可变成员问题

C++ 的一个特殊之处在于,const 对象不一定保证是不可修改的,因为,例如,C++ 允许“可变”成员。现在,人们一直认为,虽然 mutable 关键字可以用来破坏 const 的机制,但 const 的语义应该被保留。也就是说,一个声明为 const 的对象,对(其公共接口的)用户来说,应该表现得好像它是 const 的,即使在幕后私有的可变成员实际上正在被修改。但直到 C++11 的出现,按照惯例,保留 const 语义才包含线程安全的概念——即无需问题即可被异步线程同时访问的能力。所以问题是,存在大量具有可变成员的遗留对象,您可能想在线程之间共享它们,但不能假定它们在声明为 const 时是安全可共享的。

而且,这似乎不仅仅是遗留对象的问题。目前的惯例是,具有可变成员的对象应该保留线程安全,作为保留 const 语义的一部分,但这通常会以性能和可扩展性为代价。(可扩展性是因为,例如,线程安全机制通常需要锁定 CPU 缓存行,从而阻止需要访问相同缓存行的其他线程。)因此,在这样的对象不与线程共享的上下文中,通常会支付(微小但)不必要的性能成本。现在,有些人不介意看到确立一个牺牲一点性能来换取安全的惯例。但你不得不怀疑,在一个追求性能的编程社区中,当不清楚这种权衡是否必要时,他们会如何可靠地遵守规则。

考虑一个替代惯例,其中具有可变成员的类被迫清楚地表明(例如,在它们的名称中)它们是否可以安全地在线程之间共享。此外,鼓励任何不安全可共享的类(出于性能原因)与其兼容的安全可共享版本配对提供。最好由作者积极表明他们的类是否可以安全共享(或不能),而不是让用户仅仅假设它。

由于这是 C++,mutable 关键字并不是强制执行 const 的唯一漏洞。例如,您可以声明一个 const 类的实例,打算在线程之间共享它。但如果这个类碰巧是一种“复合”类,它包含对“子”对象(或者说是“间接成员”)的引用,那么 const 性就不会传播到这些子对象,从而使它们容易受到竞争条件和数据竞争 bug 的影响。此类子对象也可以被视为“可变成员”,并且从安全性的角度来看,应如此处理。

int main()
{
    class CObjWithIndirectMember {
    public:
        CObjWithIndirectMember() : m_string1(*(new std::string("initial text"))) {}
        ~CObjWithIndirectMember() {
            delete (&m_string1);
        }
        void set_string2(const std::string& string_cref) const {
            /* We know the "mutable" keyword can be used to subvert "const"ness. */
            m_string2 = string_cref;
        }
        void set_string1(const std::string& string_cref) const {
            /* As with members declared with the "mutable" keyword qualifier, "const"ness
            does not propagate to "indirect" members. */
            m_string1 = string_cref;
        }

        mutable std::string m_string2 = "initial text";
        std::string& m_string1;
    };

    const CObjWithIndirectMember const_obj_with_indirect_member;

    /* "const" objects aren't necessarily unmodifiable if they have members declared
    "mutable". */
    const_obj_with_indirect_member.m_string2 = "new text";

    /* Or if they have "indirect" members. That is, members that are actually references
    to other objects. */
    const_obj_with_indirect_member.m_string1 = "new text";

    /* So declaring an object "const" doesn't necessarily make it safe to share without
    access controls. */
}

因此,不幸的是,在 C++ 中,确实没有通用的方法可以确保对象不会被修改,因此也没有通用的方法可以确保共享对象的并发访问是安全的。所以谨慎的做法是,只允许在共享对象足够简单以至于普遍认为没有可变成员问题的情况下,或在对象提供明确的积极指示表明它可以安全共享的情况下,才允许同时访问。在这两种情况下,如果您要允许同时访问,代码应清楚地表明这一点,最好是使用专门为此目的设计的类型(并且可能促进额外的编译时安全)。(例如,倾向于使用 mse::TReadOnlyStdSharedFixedConstPointer 这样的类型,而不是直接使用 std::shared_ptr。)

共享对象越多,问题越多

因此,我们引入了数据类型,它们除其他功能外,还可以保护共享对象的内部状态的一致性。但是,如果您共享多个对象,则这些对象之间的任何关系(又称不变量)的一致性不会自动受到保护。因此,虽然我们建议在可行的情况下避免在线程之间共享对象的做法,但我们更强烈地建议避免在线程之间共享多个相互依赖对象的做法。但是,当您不得不这样做时,您应该将对这些对象的操作视为需要原子地执行的事务。

#include "mseasyncshared.h"

#include <future>
#include <list>
#include <random>

#include <ratio>
#include <chrono>

int main()
{
    /* This is an example of "atomic" transactions when performing operations on multiple
    interdependent shared objects. In this case, funds transfers between accounts. */

    class CAccount {
    public:
        void add_to_balance(double amount) {
            m_balance += amount;
            m_last_transaction_time = std::chrono::system_clock::now();
        }
        double balance() const {
            return m_balance;
        }
    private:
        double m_balance = 0.0;
        std::chrono::system_clock::time_point m_last_transaction_time;
    };

    class B {
    public:
        static bool nonatomic_funds_transfer(
            mse::TAsyncSharedReadWriteAccessRequester<CAccount> source_ar,
            mse::TAsyncSharedReadWriteAccessRequester<CAccount> destination_ar, const double amount)
        {
            /* Non-atomic transactions between shared objects like this can be bad. They
            can result in "race condition" bugs. */

            if (source_ar.readlock_ptr()->balance() >= amount) {
                source_ar.writelock_ptr()->add_to_balance(-amount);

                destination_ar.writelock_ptr()->add_to_balance(amount);
                return true;
            }
            else {
                return false;
            }
        }

        static bool atomic_funds_transfer(
            mse::TAsyncSharedReadWriteAccessRequester<CAccount> source_ar,
            mse::TAsyncSharedReadWriteAccessRequester<CAccount> destination_ar, const double amount)
        {
            /* You want your transactions between shared objects to be atomic like this
            one to avoid "race condition" bugs. */

            /* To make your transaction atomic, first obtain a lock on all the parties
            in the transaction. */
            auto source_writelock_ptr = source_ar.writelock_ptr();
            auto destination_writelock_ptr = destination_ar.writelock_ptr();

            if (source_writelock_ptr->balance() >= amount) {
                source_writelock_ptr->add_to_balance(-amount);
                destination_writelock_ptr->add_to_balance(amount);
                return true;
            }
            else {
                return false;
            }
        }
    };

    /* create the accounts */
    auto bobs_account_access_requester = mse::make_asyncsharedreadwrite<CAccount>();
    auto bills_account_access_requester = mse::make_asyncsharedreadwrite<CAccount>();
    auto barrys_account_access_requester = mse::make_asyncsharedreadwrite<CAccount>();

    /* set initial balances */
    bobs_account_access_requester.writelock_ptr()->add_to_balance(100.0);
    bills_account_access_requester.writelock_ptr()->add_to_balance(200.0);
    barrys_account_access_requester.writelock_ptr()->add_to_balance(300.0);

    /* do some concurrent fund transfers */
    std::future<bool> bob_to_bill_res = std::async(B::atomic_funds_transfer,
        bobs_account_access_requester, bills_account_access_requester, 10.0);

    std::future<bool> bill_to_barry_res = std::async(B::atomic_funds_transfer,
        bills_account_access_requester, barrys_account_access_requester, 20.0);

    std::future<bool> barry_to_bob_res = std::async(B::atomic_funds_transfer,
        barrys_account_access_requester, bobs_account_access_requester, 30.0);

    bool all_transfers_were_executed = (bob_to_bill_res.get() && bill_to_barry_res.get()
        && barry_to_bob_res.get());
}

细节给有兴趣的人

标准库认识到需要允许单个线程多次锁定互斥锁,因此提供了 std::recursive_mutex。它们还认识到需要一个支持共享锁定和在超时后过期的锁定尝试的互斥锁,因此提供了 std::shared_timed_mutex。但对于通用情况,您当然需要一个支持所有这些功能的互斥锁——一个 std::recursive_shared_timed_mutex。但令人沮丧的是(在写作时),标准库不提供这样的互斥锁。可能是因为它不清楚如何实现一种优化了性能和内存占用的类型。

总之,我们不能等待标准库,所以我们提供了互斥锁——mse::recursive_shared_timed_mutex。它是实现我们这样的共享对象通用解决方案所需的互斥锁。如果您发现自己出于其他目的需要此类互斥锁,它就在头文件中。

说到在超时后过期的锁定尝试,虽然我们在示例中没有使用它们,“访问请求者”类型确实支持此功能。

auto access_requester = mse::make_asyncsharedreadwrite<std::string>("some text");
auto writelock_ptr1 = access_requester.try_writelock_ptr();
if (writelock_ptr1) {
    // lock request succeeded
}
auto readlock_ptr2 = access_requester.try_readlock_ptr_for(std::chrono::seconds(10));
auto writelock_ptr3 = access_requester.try_writelock_ptr_until(std::chrono::steady_clock::now()
    + std::chrono::seconds(10));

控制你对危险并行编程实践的热情

在为当今的多处理器架构编程时,共享对象给并发线程似乎既方便又自然。并且很容易忽略这样一个事实:在线程之间共享对象的做法会带来一种根本不同且更具问题性的 bug 类型,与我们习惯处理的 bug 不同。

控制对共享对象的访问的 bug 可能导致该对象的任何部分(或对象引用的任何内容)随时被修改。“任何时候”这一点很特别且具有问题性。这意味着,如果您的程序崩溃,或者断言失败,即使您有完整的堆栈跟踪和内存转储,也仍然可能无法推断出导致故障状态的序列。

更糟糕的是,这类 bug 常常难以重现。无法推断或重现导致故障状态的步骤,这种情况令人不安。

但这并非我们作为一个物种所不熟悉的。例如,古埃及人无法推断导致尼罗河偶尔发生灾难性洪水的原因。他们也无法按需重现这种现象。他们通过向一位长着羊头神献祭来应对这种情况。也许这也能帮助解决我们的数据竞争 bug。:)

而且,在线程之间共享对象时多加谨慎可能也不会有什么坏处。

 

 

© . All rights reserved.