65.9K
CodeProject 正在变化。 阅读更多。
Home

使任何对象都变得线程安全

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (47投票s)

2017年4月24日

CPOL

34分钟阅读

viewsIcon

87151

downloadIcon

788

智能指针,可使任何对象对所有操作都线程安全,性能等同于优化的无锁容器。

三篇相关文章

  1. 我们让任何对象都线程安全
  2. 我们将 std::shared_mutex 的速度提升 10 倍
  3. 线程安全的 std::map,速度媲美无锁 map

引言

在这三篇文章中,我将详细介绍原子操作、内存屏障以及线程之间数据的快速交换,以及通过“执行围绕惯用法”的例子来解释“序列点”。同时,我们将一起尝试做一些有用的事情。

标准 C++ 库中没有线程安全的容器(数组、列表、映射等),这些容器可以在多线程环境下无需额外锁即可使用。在使用标准容器进行多线程交换时,存在一个风险,即你可能会忘记用互斥量保护某个代码段,或者错误地使用另一个互斥量来保护它。

显然,如果开发者使用自己的解决方案而不是标准解决方案,他/她会犯更多的错误。如果任务如此复杂以至于没有标准解决方案,那么开发者在尝试寻找解决方案时,将淹没在错误之中。

遵循“实用性胜过纯粹性”的原则,我们将尝试创建一个实用的解决方案,而不是一个理想的解决方案。

在本文中,我们将实现一个智能指针,它可以使任何对象都线程安全,其性能等同于优化的无锁容器。

使用此类指针的简化、非优化示例

int main() { 
    contfree_safe_ptr< std::map< std::string, int > > safe_map_string_int;

    std::thread t1([&]() { safe_map_string_int->emplace("apple", 1); }); 
    std::thread t2([&]() { safe_map_string_int->emplace("potato", 2); }); 
    t1.join(); t2.join();

    std::cout << "apple = " << (*safe_map_string_int)["apple"] << 
        ", potato = " << (*safe_map_string_int)["potato"] << std::endl;    
    return 0;
}

对于我们算法的每一步,我们将引用标准中的一段引文,以保证必要的行为。

我们将详细讨论 C++ 内存模型,以及线程之间同步和数据交换的各种可能版本。

在本文中,我们将开发一个线程安全的指针 safe_ptr<>。在第二篇文章中,我们将实现一个优化的“无竞争共享互斥量”及其上的优化指针 contfree_safe_ptr<>。在第三篇文章中,我们将展示 contfree_safe_ptr<> 的最优利用示例,并提供性能测试,将其与高度优化的无锁容器进行比较。

背景

我们将从开发一个 safe_ptr<T> 智能指针模板开始,它对于任何 T 类型都是线程安全的,并将展示达到优化无锁算法级别的多线程性能。

此外,它将使我们能够原子地、一致地同时处理多个不同的对象,而这些对象在使用无锁数据结构时需要使用锁,并且存在永远死锁的风险。但是,我们将开发一个特殊的互斥锁类来解决死锁情况。然后,我们将实现我们自己的高性能无竞争共享互斥量,它比标准的 std::shared_mutex 快得多。并且,在此基础上,我们将创建一个安全指针 safe_ptr<T> 的优化版本,称为 contfree_safe_ptr<>。最后,我们将运行性能测试,并与无锁库 libCDS 进行比较。我们将通过 contfree_safe_ptr<std::map<>> 的示例,看到接近 libCDSSkipListMapBronsonAVLTreeMap)的性能。

结果是,要使您的任何类都线程安全,您不需要花费太多时间,只需编写:contfree_safe_ptr<T> ptr_thread_safe;

性能将接近于您花费一个月时间在类方法中开发无锁算法的水平。此外,您可以同时原子地更改多个 contfree_safe_ptr<>。与 std::shared_ptr<> 一样,我们的智能指针将包含引用计数。它可以被复制,并且在最后一个副本被移除后,动态分配的内存将被自动释放。

最后,将提供一个 safe_ptr.h 文件,通过 #include “safe_ptr.h” 连接即可使用此类。

多线程数据交换的基本原理

如您所知,我们只能在以下四种情况下从不同线程读写同一对象:

  1. 基于锁。对象受 lock 保护:自旋锁、标准锁(mutexrecursive_mutextimed_mutexshared_timed_mutexshared_mutex ...):https://cppreference.cn/mwiki/index.php?title=Special%3ASearch&search=mutex
  2. 原子操作。对象类型为 std::atomic<T>,其中 T 是指针、bool 或整数类型(std::is_integral<T>::value == true),并且仅当 CPU 级别存在 T 类型的原子操作时:https://cppreference.cn/w/cpp/atomic/atomic (2+1) - (基于锁的原子操作) 否则,如果 T 类型是可平凡复制类型,即满足 std::is_trivially_copyable<T>::value==true 的条件,那么 std::atomic<T> 的工作方式类似于 Lock-based - 它内部会自动使用锁。
  3. 事务安全。为对象操作实现的功能提供 事务安全 的保证(事务内存 TS(ISO/IEC TS 19841:2015)- 实验性 C++):https://cppreference.cn/w/cpp/language/transactional_memory
  4. 无锁。对象操作的功能是基于无锁算法实现的,即它们提供 无锁 的线程安全保证。

如果您完全了解这四种确保线程安全的方法,则可以跳过本章。

让我们按相反的顺序考虑

(4) 无锁 算法非常复杂,通常每个复杂算法都需要几篇科学论文。关于容器的无锁算法示例 - unordered_mapordered_mapqueue ... - 以及科学论文的链接可以在库 Concurrent Data Structures (CDS) 中找到:https://github.com/khizmax/libcds

这些是速度非常快且可靠的多线程数据结构,但到目前为止,还没有计划将它们包含在标准 C++17 或 C++20 库中,并且它们没有包含在标准草案中:http://www.open-std.org/JTC1/SC22/WG21/

(3) 事务安全 计划被包含在 C++ 标准的实验部分,并且已经有 ISO/IEC TS 19841:2015 的草案,地址是:http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2015/N4514.pdf

但是,甚至不是所有的 STL 容器都计划被制成事务安全。例如,std::map 容器甚至不计划被制成事务安全,因为它只有以下函数被定义为事务安全的:beginendsizemax_sizeempty。但是以下函数未被定义为线程安全的:findmodifyinsert。并且要实现具有事务安全成员函数的自己的对象并不容易,否则 std::map 早就被实现了。

(2) 原子操作。这种方法已在 C++11 中标准化,您可以轻松使用它。例如,声明变量 std::atomic<int> shared_val 即可,然后将此变量的引用或指针传递给多个线程,通过 std::atomic 的成员函数和运算符的所有调用都将是线程安全的:[3] http://coliru.stacked-crooked.com/a/9d0219a5cfaa3cd5

成员函数,特化的成员函数:https://cppreference.cn/w/cpp/atomic/atomic

重要的是要理解,如果您执行多个原子变量操作(无论是在一个表达式中还是在多个表达式中),另一个线程可能会在它们之间更改该变量的值。因此,为了原子地执行多个操作,我们使用基于 CAS 函数(比较和交换)compare_exchange_weak() 的无锁方法 - 即:我们将原子变量的值读取到本地变量(old_local_val)中,执行许多必要的操作并将结果写入本地变量(new_local_val),最后,我们使用 CAS 函数将原子变量的当前值与初始值(old_local_val)进行比较;如果不相等,则我们重新执行循环,如果相等 - 这意味着在此期间没有其他线程进行任何更改,然后我们使用新值(new_local_val)更改原子变量的值。同时,比较和赋值是通过一个操作完成的:compare_exchange_weak() - 这是一个原子函数,在它完全执行之前,没有人可以更改该变量的值:[4] http://coliru.stacked-crooked.com/a/aa52b45150e5eb0a

这种带有循环的方法称为乐观锁。悲观锁包括:自旋锁、互斥量...

如果这种循环的所有操作都未经悲观锁执行,则称该算法为无锁。更确切地说,这种算法可能具有一些保证:无障碍、无锁、无等待或无写竞争。

通常,原子 CAS 函数会替换指针,即:分配新内存,将修改后的对象复制到其中,并获取指向它的指针;对该副本执行一系列操作,最后,如果在此期间没有其他线程更改旧指针,CAS 函数将旧指针替换为新对象指针。但如果指针被另一个线程更改,则一切都会重复。

可能出现一个名为“ABA”的问题 - https://en.wikipedia.org/wiki/ABA_problem,当其他线程有时间将指针更改两次,并且指针再次更改为初始值时,但在该地址,其他线程已经能够删除该对象并创建一个新的。也就是说,指针的值已经指向了另一个对象,但我们看到值没有改变,并认为对象没有被替换。有很多方法可以解决这个问题,例如:LL/SC、RCU、hazard_pointer、垃圾回收器...

原子操作是线程间交换数据的最快方式。此外,对于所有原子操作,可以使用要求不那么严格且更快的内存屏障,这将在后面详细讨论。默认情况下,使用最安全、最严格的重排屏障:std::memory_order_seq_cst。但是正如我们上面提到的:使用原子变量实现复杂逻辑需要付出很多努力。

(2) - (1) 原子操作 & 基于锁

但是,如果您需要一次读取或更改多个变量 - std::atomic<int> abc,并且您不想实现无锁算法并解决 ABA 问题,那么您需要使用锁。CPU 的原子 CAS 函数(在大多数 CPU 上)可以检查是否只有一个最大宽度为 64 位变量已被更改,但此时另一个变量可能已被更改。解决方案:std::atomic<T> 允许使用任何大小的结构作为 T 类型。

在 C++ 标准中,如果 T 是“可平凡复制类型”(即满足 std::is_trivially_copyable<T>::value==true 的条件),则可以使用任何类型的 T

C++ 标准说了什么:http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2016/n4606.pdf

§29.5 / 1

引用

有一个通用的类模板 atomic<T>。模板参数的类型 T 必须是可平凡复制的(3.9)。[注意:非静态初始化的类型参数可能难以使用 - 结束注释]

§3.9 / 9

引用

标量类型、可平凡复制的类类型(第 9 章)、此类类型的数组以及这些类型的 cv 限定版本(3.9.3)统称为 可平凡复制类型

但是,如果 CPU 的原子 CAS 函数只能检查一个最大宽度为 64 位变量是否已被更改,而我们有三个 32 位变量,那么 CAS 函数如何在 std::atomic<T> 中工作?对于 T - 可平凡复制类型,CAS 函数和所有其他函数将自动使用标准 std::atomic<T> 实现中包含的 lockstd::mutex 或其他)。

要原子地更改多个变量,我们可以使用变量结构 struct T {int price, count, total; }; 作为 std::atomic<T> 模板的类型。

示例:[5] http://coliru.stacked-crooked.com/a/bdfb3dc440eb6e51

示例输出:10, 7, 70

在此示例中,在任何时候,最后一个值 70 都等于前两个值 p 的乘积 - 10*7 - 也就是说,整个结构只原子地改变。

在 x86 的 gccclang 中,此代码需要使用 -latomic 标志进行编译。

在这种情况下,每次调用 std::atomic<T> shared_val;都会导致其内部产生一个 lock,如函数 shared_val.is_lock_free() == false 的值所示。

也就是说,全局上,我们使用了乐观锁(循环),并且在访问原子变量时,本地使用了 2 个悲观锁:获取旧值和调用 CAS 函数。

(1) 基于锁

但是,由于 T 类型需要“可平凡复制”的必要条件,我们无法将 std::atomic<T> 用于您创建的任何类型。在所有 STL 容器中,我们只能使用 std::array<>。例如,我们不能使用 std::atomic<std::map<int, int>>,因为 std::map<> 类型对于其模板的任何参数都不是可平凡复制的。而且,很可能,您自己的类也不能用作 std::atomic<T>T 类型。

在这种情况下,您必须自己创建互斥对象,并在每次使用共享对象之前锁定它们,并在之后解锁。概念:https://cppreference.cn/w/cpp/concept/Mutex

在 C++ 中,存在以下互斥量:std::mutexstd::recursive_mutexstd::timed_mutexstd::recursive_timed_mutexstd::shared_timed_mutexstd::shared_mutex。有关更多信息,请参阅:https://cppreference.cn/w/cpp/thread

例如,我们创建任何 std::map<int, T> 对象并在线程之间共享,并创建一个保护它的互斥量,然后我们将它们的链接传递给多个线程。在每个线程中,我们在使用共享对象之前锁定互斥量:[6] http://coliru.stacked-crooked.com/a/a97e905d54ae1fbb

我们使用 RAII 惯用法进行锁定

std::lock_guard<std::mutex> lock(mtx); - 当此对象被创建时,它的构造函数会锁定互斥量,在该对象生命周期结束时,析构函数会解锁互斥量。因此,我们肯定不会忘记解锁它,因为析构函数会自动调用。

但是仍然有四个主要问题

  1. 死锁 - 如果您的代码编写方式是线程 1 锁定 mtx1,线程 2 锁定 mtx2,并且在持有 lock 的同时,线程 1 尝试捕获 mtx2,而线程 2 尝试捕获 mtx1,那么这些线程将永远相互等待。无锁算法不存在此问题,但并非所有逻辑都可以通过无锁来实现 - 我们将通过一致的原子更改多个容器的示例来演示这一点。
  2. 如果您编写的代码是在互斥量被锁定时将共享对象的链接分配给指针,而该指针的生命周期比 std::lock_guard lock 长,那么在解锁后,您可以通过此指针引用共享对象 - 这将导致数据竞争问题,即它将导致共享对象状态不一致或程序崩溃。在使用迭代器之后解锁互斥量时也会发生同样的情况。
  3. 您可能会混淆互斥量,并锁定保护另一个对象的互斥量 - 数据竞争。
  4. 您可能会忘记在正确的位置锁定互斥量 - 数据竞争。

执行围绕指针的惯用法

除了 RAII 惯用法之外,还有一个有趣的惯用法 - 执行围绕指针(Execute Around Pointer),它有助于解决最后两个问题。

  1. 互斥量将与您的对象合并,您可以锁定您自己的对象,而不是单独的互斥量。
  2. 当访问 protected object 类的任何成员时,互斥量将自动锁定 - 在表达式执行期间,它将被锁定。

结果是,您不会忘记锁定互斥量,也不会混淆互斥量。

我们让任何对象都线程安全

执行围绕指针惯用法是一个具有严格定义执行顺序的著名惯用法,用于从可视化到日志记录的各种目的。

https://wikibooks.cn/wiki/More_C%2B%2B_Idioms/Execute-Around_Pointer

示例:[7] http://coliru.stacked-crooked.com/a/4f3255b5473c73cc

  execute_around< std::vector< int > > vecc(10, 10);
  int res = my_accumulate(vecc->begin(), vecc->end(), 0);

首先,将创建代理类型临时对象。它们将锁定 execute_around 内的互斥量,然后将 begin()end() 函数返回的迭代器传递给函数,然后执行 my_accumulate() 函数,仅在其完成后,代理类型临时对象将被删除,其析构函数将解锁互斥量。

有关更多详细信息,请参阅文章:C++ 模式序列执行。Kevlin Henney:http://hillside.net/europlop/HillsideEurope/Papers/ExecutingAroundSequences.pdf

在 C++ 中,有两个定义严格定义了 C++ 标准 § 1.9 (13) 的操作顺序:顺序前和顺序后。在下面的标准引文中,您将看到两次“顺序前”。

执行围绕指针惯用法中所有操作的原理和顺序在标准中有严格描述。首先,我们将引用 C++ 标准的五段引文,然后我们将展示其中每段引文如何解释执行围绕指针惯用法的行为。

C++ 标准中的五段引文,工作草案,C++ 编程语言标准 N4606 2016-07-12:http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2016/n4606.pdf

  1. 对于除原始指针之外的所有类型:x->m 被解释为 (x.operator->())->m,即表达式 (x->m) 将反复展开 ((x.operator->().operator->())->m,直到我们得到原始指针。示例:三个含义相同的表达式:[8] http://coliru.stacked-crooked.com/a/dda2474024b78a0b
    引用

    § 13.5.6

    operator-> 应为不带参数的非静态成员函数。它实现了使用 -> 的类成员访问语法。

    postfix-expression -> template opt id-expression

    postfix-expression -> pseudo-destructor-name

    如果 T::operator->() 存在并且该运算符被重载解析机制(13.3)选为最佳匹配函数,则对于类型 T 的类对象 x,表达式 x->m 被解释为 (x.operator->())->m。

  2. 调用函数时,即使函数是“内联”的,函数参数的求值以及任何表达式的计算和副作用也将在函数体开始执行之前执行。
    引用

    § 1.9 / 16

    调用函数时(无论函数是否为内联),与任何参数表达式或指定被调用函数的后缀表达式相关的每个值计算和副作用,都 **在被调用函数的函数体中的每个表达式或语句执行之前被排序**。

  3. 表达式将在临时对象销毁之前完全执行。
    引用

    § 1.9 / 10

    void f() {
      if (S(3).v()) 	// full-expression includes lvalue-to-rvalue and
                        // int to bool conversions, performed before
                        // temporary is deleted at end of full-expression
      { }
    } 
  4. 在整个表达式完全执行后,临时对象将按照创建它们的顺序的逆序销毁。
    引用

    § 1.9 注脚 8

    如 12.2 中所述,在完整表达式求值之后,会进行零个或多个临时对象析构函数调用的序列,通常按照创建每个临时对象的逆序进行。

  5. 三种临时对象销毁时间与完整表达式结束点不同的情况 - 2 种是初始化数组元素时,第 3 种是创建临时对象引用时。
    引用

    § 12.2 临时对象

    § 12.2 / 5

    有三种情况,其中临时对象的销毁时间与完整表达式的结束点不同。第一种情况是在调用默认构造函数来初始化没有相应初始值设定项的数组元素(8.6)时。第二种情况是在复制整个数组时调用复制构造函数来复制数组元素(5.1.5、12.8)。在这两种情况下,如果构造函数有一个或多个默认参数,则在创建的每个临时对象中的默认参数的销毁 **在下一个数组元素的构造之前被排序**(如果存在)。第三种情况是当引用绑定到临时对象时。

例如,我们有一个简化的类 execute_around<>

template< typename T, typename mutex_type = std::recursive_mutex >
class execute_around {
  std::shared_ptr< mutex_type > mtx;
  std::shared_ptr< T > p;
 
  void lock() const { mtx->lock(); }
  void unlock() const { mtx->unlock(); }
  public:
    class proxy {
      std::unique_lock< mutex_type > lock;
      T *const p;
      public:
        proxy (T * const _p, mutex_type& _mtx) : 
                   lock(_mtx), p(_p)  { std::cout << "locked \n";} 
        proxy(proxy &&px) : lock(std::move(px.lock)), p(px.p)  {}
        ~proxy () { std::cout << "unlocked \n"; }
        T* operator -> () {return p;}
        const T* operator -> () const {return p;}
    };
 
    template< typename ...Args >
    execute_around (Args ... args) : 
        mtx(std::make_shared< mutex_type >()), p(std::make_shared< T >(args...))  {}  
 
    proxy operator->() { return proxy(p.get(), *mtx); }
    const proxy operator->() const { return proxy(p.get(), *mtx); }
    template< class Args > friend class std::lock_guard;
};

我们像这样使用我们的模板类 execute_around<>,例如:[45] http://coliru.stacked-crooked.com/a/d2e02b61af6459f5

int main() {
    typedef execute_around< std::vector< int > > T;
    T vecc(10, 10); 
    int res = my_accumulate(vecc->begin(), vecc->end(), 0);
    return 0;
}

然后,经过几次转换后,最后一个表达式可以简化为以下形式。

  1. 根据标准的第一个引文,x->m 被解释为 (x.operator->())->m
    int res = my_accumulate(
        (vecc.operator->())->begin(),
        (vecc.operator->())->end(), 
        0);
  2. 由于 vecc.operator->() 返回临时对象 T::proxy(),因此我们得到
    int res = my_accumulate( 
        T::proxy(vecc.p.get(), *vecc.mtx)->begin(),
        T::proxy(vecc.p.get(), *vecc.mtx)->end(),
        0);
  3. 此外,根据引文 2、3 和 4 - 临时代理类型对象将在函数开始执行前创建,并在函数结束后(整个表达式结束后)销毁。
    T::proxy tmp1(vecc.p.get(), *vecc.mtx);	// lock-1 std::recursive_mutex
    T::proxy tmp2(vecc.p.get(), *vecc.mtx);	// lock-2 std::recursive_mutex
      int res = my_accumulate(tmp1->begin(), tmp2->end(), 0);
    
    tmp2.~T::proxy();				// unlock-2 std::recursive_mutex
    tmp1.~T::proxy();				// unlock-1 std::recursive_mutex
  4. 再次,根据第一个引文
    • tmp1->begin() 等同于 (tmp1.operator->())->begin()
    • tmp1.operator->() 返回 p

结果是,其中 p 是指向我们 std::vector<int> 类型对象的指针。

typedef execute_around< std::vector< int > > T;
T vecc(10, 10); 
T::proxy tmp1(vecc.p.get(), *vecc.mtx);	// lock-1 std::recursive_mutex
T::proxy tmp2(vecc.p.get(), *vecc.mtx);	// lock-2 std::recursive_mutex

  int res = my_accumulate(tmp1.p->begin(), tmp2.p->end(), 0);

tmp2.~T::proxy();				// unlock-2 std::recursive_mutex
tmp1.~T::proxy();				// unlock-1 std::recursive_mutex

我们分四步描述了惯用法所有操作的严格顺序。请注意,标准不保证临时变量 tmp1tmp2 创建的倒序,即,首先可能创建 tmp2,然后是 tmp1;但这不会改变我们程序的逻辑。

请注意,我们没有引用标准的第五段引文,因为它描述了 3 种临时对象移除时间可能与给定时间不同的情况,并且正如我们所见,这些情况都不适用于我们的案例。标准引文中的前两种情况是数组的初始化或复制,它们缩短了临时对象的生命周期,第三种情况是由于存在其链接而延长了临时对象的生命周期。

Using the Code

线程安全的关联数组

您是否同意,拥有一个像 safe_ptr<> 这样的模板类,可以传入任何类型,并获得线程安全的结果类型,这将很方便?

safe_ptr<std::map<std::string, std::pair<std::string, int> >> safe_map_strings;

之后,您可以像处理关联数组的指针一样使用此对象。

std::shared_ptr<std::map<std::string, std::pair<std::string, int> >> shared_map_string;

但是现在,我们可以安全地从不同线程处理它,并且每个单独的表达式都将是线程安全的。

    (*safe_map_strings)["apple"].first = "fruit";
    (*safe_map_strings)["potato"].first = "vegetable";

    safe_map_strings->at("apple").second = safe_map_strings->at("apple").second * 2;
    safe_map_strings->find("potato")->second.second++;

让我们来看一个线程安全关联数组的案例研究:std::map<>

[9] http://coliru.stacked-crooked.com/a/5def728917274b22

#include < iostream >
#include < string >
#include < vector >
#include < memory >
#include < mutex >
#include < thread >
#include < map >

template< typename T, typename mutex_t = std::recursive_mutex, typename x_lock_t = 
std::unique_lock< mutex_t >, typename s_lock_t = std::unique_lock < mutex_t > >
class safe_ptr {
    typedef mutex_t mtx_t;
    const std::shared_ptr< T > ptr;
    std::shared_ptr< mutex_t > mtx_ptr;

    template< typename req_lock >
    class auto_lock_t {
        T * const ptr;
        req_lock lock;
    public:
        auto_lock_t(auto_lock_t&& o) : ptr(std::move(o.ptr)), lock(std::move(o.lock)) { }
        auto_lock_t(T * const _ptr, mutex_t& _mtx) : ptr(_ptr), lock(_mtx){}
        T* operator -> () { return ptr; }
        const T* operator -> () const { return ptr; }
    };

    template< typename req_lock >
    class auto_lock_obj_t {
        T * const ptr;
        req_lock lock;
    public:
        auto_lock_obj_t(auto_lock_obj_t&& o) : 
ptr(std::move(o.ptr)), lock(std::move(o.lock)) { }
        auto_lock_obj_t(T * const _ptr, mutex_t& _mtx) : ptr(_ptr), lock(_mtx){}
        template< typename arg_t >
        auto operator [] (arg_t arg) -> decltype((*ptr)[arg]) { return (*ptr)[arg]; }
    };

    void lock() { mtx_ptr->lock(); }
    void unlock() { mtx_ptr->unlock(); }
    friend struct link_safe_ptrs;
    template< typename mutex_type > friend class std::lock_guard;
    //template< class... mutex_types > friend class std::lock_guard;    // C++17
public:
  template< typename... Args >
  safe_ptr(Args... args) : ptr(std::make_shared< T >(args...)), 
           mtx_ptr(std::make_shared< mutex_t >()) {}

  auto_lock_t< x_lock_t > operator-> () 
             { return auto_lock_t< x_lock_t >(ptr.get(), *mtx_ptr); }
  auto_lock_obj_t< x_lock_t > operator* () 
             { return auto_lock_obj_t< x_lock_t >(ptr.get(), *mtx_ptr); }
  const auto_lock_t< s_lock_t > operator-> () const 
             { return auto_lock_t< s_lock_t >(ptr.get(), *mtx_ptr); }
  const auto_lock_obj_t< s_lock_t > operator* () const 
             { return auto_lock_obj_t< s_lock_t >(ptr.get(), *mtx_ptr); }
};
// ---------------------------------------------------------------

safe_ptr< std::map< std::string, std::pair< std::string, int > > > safe_map_strings_global;

void func(decltype(safe_map_strings_global) safe_map_strings) 
{
    //std::lock_guard< decltype(safe_map_strings) > lock(safe_map_strings);

    (*safe_map_strings)["apple"].first = "fruit";
    (*safe_map_strings)["potato"].first = "vegetable";

    for (size_t i = 0; i < 10000; ++i) {
        safe_map_strings->at("apple").second++;
        safe_map_strings->find("potato")->second.second++;
    }

    auto const readonly_safe_map_string = safe_map_strings;

    std::cout << "potato is " << readonly_safe_map_string->at("potato").first <<
        " " << readonly_safe_map_string->at("potato").second <<
        ", apple is " << readonly_safe_map_string->at("apple").first <<
        " " << readonly_safe_map_string->at("apple").second << std::endl;
}

int main() {

    std::vector< std::thread > vec_thread(10);
    for (auto &i : vec_thread) i = std::move(std::thread(func, safe_map_strings_global)); 
    for (auto &i : vec_thread) i.join();

    std::cout << "end";
    int b; std::cin >> b;

    return 0;
}

输出

引用

土豆是蔬菜 65042,苹果是水果 65043

土豆是蔬菜 81762,苹果是水果 81767

土豆是蔬菜 84716,苹果是水果 84720

土豆是蔬菜 86645,苹果是水果 86650

土豆是蔬菜 90288,苹果是水果 90291

土豆是蔬菜 93070,苹果是水果 93071

土豆是蔬菜 93810,苹果是水果 93811

土豆是蔬菜 95788,苹果是水果 95790

土豆是蔬菜 98951,苹果是水果 98952

土豆是蔬菜 100000,苹果是水果 100000

end

因此,我们得出两个结论:

  1. 100,000 的最终值意味着这 10 个线程中的每一个线程的每一次加法都是以线程安全的方式进行的。确实,只需更改我们的代码,使 operator-> 返回指向对象本身的指针而不是 auto_lock_tauto_lock_obj_t 类型,我们就会看到如果代码不是线程安全的会发生什么 - 数据竞争:[10] http://coliru.stacked-crooked.com/a/45d47bcb066adf2e
  2. 每隔一万的中间值表明线程是并行或伪并行执行的,也就是说,它们在任何操作的中间被中断,而此时正在执行另一个线程,也就是说,在每一次 operator++ 增量之前,互斥量都被锁定,增量完成后立即解锁,然后另一个执行增量的线程可以锁定该互斥量。我们可以通过使用 std::lock_guard<> 来立即在每个线程的开头锁定互斥量,直到线程函数执行结束,我们将看到如果线程是顺序执行而不是伪并行执行会发生什么:[11] http://coliru.stacked-crooked.com/a/cc252270fa9f7a78

这两个结论都证实了我们的 safe_ptr<T> 智能指针类的模板能够自动确保 protected 的 T 类型对象的线程安全。

线程安全性、原子性和多个对象的一致性

我们将演示如何同时原子地更改多个对象,以保持它们的一致性。我们将演示何时有必要这样做以及如果不这样做会发生什么。

让我们给出一个简化的例子,假设我们有两个表

  1. user_accounts(INT user_id, STRING user_name, INT money) - 包含每个客户金钱数量的表 - 按 user_id 字段排序
  2. cash_flows(INT unique_id, INT src_id, INT dst_id, INT time, INT money) - 显示金钱流动的表 - 每个条目都通过两个关联数组引用,这两个数组已排序:按 src_id 字段和按 dst_id 字段排序。
// 1-st table 
struct user_accounts_t { 
    std::string user_name; int64_t money; 
    user_accounts_t(std::string u, int64_t m) : user_name(u), money(m) {}
};

std::map< uint64_t, user_accounts_t > user_accounts;

// 2-nd table 
struct cash_flows_t { uint64_t unique_id, src_id, dst_id, time; int64_t money; };

std::atomic< uint64_t > global_unique_id;	// SQL-sequence
std::multimap< uint64_t, std::shared_ptr< cash_flows_t > > cash_flows_src_id;
std::multimap< uint64_t, std::shared_ptr< cash_flows_t > > cash_flows_dst_id;

就 RDBMS 而言

  • 第一个带有 user_id 字段索引的表 - 它是索引组织的表(Oracle)或具有聚集索引的表(MS SQL)。
  • 第二个表 - 它是一个表,具有两个分别按 src_id 字段和 dst_id 字段组织的索引。

在实际任务中,一个表可能包含数百万个客户条目和数十亿个资金流动条目,在这种情况下,按字段(user_idsrc_iddst_id)的索引可以加快搜索速度数十万倍,因此它们极其必要。

假设有三个用户在三个并行线程中发出了执行三个任务的请求

  1. move_money() - 线程将资金从一个客户转移到另一个客户
    1. 它从一个客户那里扣钱
    2. 它向另一个客户添加相同金额的钱
    3. id-source 字段索引添加一个指向资金条目的指针
    4. id-destination 字段索引添加指向同一笔资金条目的指针(在实际任务中不需要,但我们将作为示例进行)
  2. show_total_amount() - 显示所有客户的总金额
    1. 在循环中,我们遍历每个客户并累加所有金钱。
  3. show_user_money_on_time() - 显示指定 user_id 的客户在特定时间点的金钱数量
    1. incoming - 从指定时间点开始计算所有到达客户的资金总额(使用 id-source 字段索引)
    2. outcoming - 从指定时间点开始计算所有已从客户流出的资金总额(使用 id-destination 字段索引)
    3. user_money - 获取客户的当前金钱数量
    4. user_ user_money - 进账 + 出账 - 这是该客户在特定时间点的金钱数量。

我们知道,任何线程都可能随时被操作系统中断,例如,以便将 CPU 核心交给另一个线程。最危险的是,这种情况发生得极其罕见,您可能永远不会在调试时遇到它,但有一天,您的客户会遇到。如果这导致数据竞争,那么资金可能会从金融系统中消失。

因此,我们在最关键的地方故意添加 wait 函数,将线程暂停几毫秒,以便立即看到错误。

我们将使用 safe_ptr<> 使我们的表(user_accountscash_flows_src_idcash_flows_dst_id)线程安全,但整个程序会因此变得线程安全吗?

[12] http://coliru.stacked-crooked.com/a/5bbba59b63384a2b

让我们看看程序输出中用 <<< 标记的“基本行”。

引用

初始化表 safe_user_accounts
时间 = 0 <<<
1 => John Smith, 100
2 => John Rambo, 150
- 开始事务... show_total_amount()
1 => John Smith, 100
2 => John Rambo, 100
结果:所有账户总金额 = 200 <<<

- 开始事务... show_user_money_on_time()
1 => John Smith, 150, 时间 = 0 <<<

立即显现两个错误:

  1. 最初,所有(两个)用户的总金额为 250,而 show_total_amount() 函数仅显示 200,其余 50 的金额不知去向。
  2. 在时间点 time=0,用户 1 有 100 金钱,但 show_user_money_on_time() 函数显示的结果错误 - 用户 1 在时间点 0 时有 150 金钱。

问题在于原子性仅在单个表和索引级别上得到保证,而不是在整体上,因此一致性被破坏。解决方案是锁定所有使用的表和索引,以执行所有必须原子地执行的操作 - 这将保持一致性。

添加的行用黄色高亮显示。差异

正确示例:[13] http://coliru.stacked-crooked.com/a/c8bfc7f5372dfd0c

让我们看看程序输出中用 <<< 标记的“基本行”。

引用

初始化表 safe_user_accounts

时间 = 0 <<<

1 => John Smith, 100

2 => John Rambo, 150

结果:所有账户总金额 = 250 <<<

1 => John Smith, 100, 时间 = 0 <<<

现在一切都正确了,所有客户的总金额为 250,并且客户 1 在时间点 0 时的金额为 100。

也就是说,我们可以同时对一个对象和 3 个对象执行操作,并在任何操作中保持数据的一致性。

但这里也可能出现另一个问题。如果您(或其他开发者)在一个函数中以不同的顺序锁定容器的互斥量,那么就可能出现死锁的情况 - 两个线程永远互相等待。

在正确的示例中,我们在两个函数 - move_money()show_user_money_on_time() - 中以相同的顺序锁定了互斥量:

  • lock1(safe_user_accounts)
  • lock2(safe_cash_flows_src_id)
  • lock3(safe_cash_flows_dst_id)

现在让我们看看如果在每个函数中以不同的顺序锁定容器的互斥量会发生什么。

  1. move_money()
    • lock2(safe_cash_flows_src_id)
    • lock3(safe_cash_flows_dst_id)
    • lock1(safe_user_accounts)
  2. show_user_money_on_time()
    • lock3(safe_cash_flows_dst_id)
    • lock2(safe_cash_flows_src_id)
    • lock1(safe_user_accounts)

函数 move_money() 锁定了 lock2 并等待 lock3 释放才能锁定它。函数 show_user_money_on_time() 锁定了 lock3 并等待 lock2 释放才能锁定它。它们将永远互相等待。

示例:[14] http://coliru.stacked-crooked.com/a/0ddcd1ebe2be410b

示例

引用

初始化表 safe_user_accounts

时间 = 0 <<<

1 => John Smith, 100

2 => John Rambo, 150

- 开始事务... move_money()

- 开始事务... show_total_amount()

1 => John Smith, 100

2 => John Rambo, 150

也就是说,函数 move_money()show_user_money_on_time() 没有完成,永远死锁。

有四种解决方案:

  1. 所有开发者始终以相同的顺序锁定互斥量,并且他们从不错过 - 这是一个非常不可靠的假设。
  2. 您最初将所有将要原子使用的对象组合到一个结构中,并使用该结构的类型安全指针:struct all_t { std::map<int,int> m1; std::multimap<int,int> m2; … }; safe_ptr<all_t> safe_all_obj; - 但是,如果您最初只将这些 2 个容器单独用于 safe_ptr<map<int,int>> m1; safe_ptr<multimap<int,int>> m2;,并且您已经编写了大量代码,之后您决定将它们合并到一个由单个互斥量保护的结构中,那么您将不得不重写所有使用它们的地方,例如,而不是 m2->at(5);,您需要编写 safe_all_obj->m2.at(5); 重写大量代码很不方便。
  3. 一次,您可以将一起使用的 safe_ptr<> 组合起来,使它们使用相同的递归互斥量,之后锁定顺序无关紧要;这些对象的一致性将始终保持,并且永远不会死锁。为此,您只需添加 1 行 - 这非常方便。但这会降低性能,因为现在锁定其中一个容器总是会导致锁定所有关联的容器。即使在不需要一致性时也会获得一致性,但代价是降低性能。示例:[15] http://coliru.stacked-crooked.com/a/2a6f1535e0b95f7b

    代码中的所有更改都只是一行。

    static link_safe_ptrs tmp_link
      (safe_user_accounts, safe_cash_flows_src_id, safe_cash_flows_dst_id);

    结论 - 基本线条已显示。

    引用

    初始化表 safe_user_accounts

    时间 = 0 <<<

    1 => John Smith, 100

    2 => John Rambo, 150

    结果:所有账户总金额 = 250 <<<

    1 => John Smith, 100, 时间 = 0 <<<

  4. 您可以使用超时设置同时锁定不同类型的多个互斥量。如果您在此时间内无法锁定至少一个互斥量,那么所有先前锁定的互斥量都将被解锁,线程等待一段时间,然后再次尝试逐个锁定所有互斥量。为此,只需在使用容器之前添加一行即可。
    lock_timed_any_infinity lock_all
    (safe_cash_flows_src_id, safe_cash_flows_dst_id, safe_user_accounts);

    并且锁定容器互斥量的顺序无关紧要。示例:[16] http://coliru.stacked-crooked.com/a/93206f216ba81dd6

也就是说,即使我们以不同的顺序锁定互斥量

因此,通过锁,我们解决了可组合性问题,并且保证了不会发生永久死锁:https://en.wikipedia.org/wiki/Lock_(computer_science)#Lack_of_composability

您可以从文章顶部的链接获取 Windows/Linux 的此示例。

测试于

  • Windows x86_64 (MSVS 2013 和 2015)
  • Linux x86_64 (g++ 6.3.0 和 clang 3.8.0)

此代码在线编译器中可用:http://coliru.stacked-crooked.com/a/a97a84ff26c6e9ee

使用 safe_ptr<> 和 rw-lock

要将安全指针与 rw-lock 结合使用而不是 unique-lock,只需 #include <safe_ptr.h> 并这样写:shared_mutex_safe_ptr<std::map<int,int>>,而不是 safe_ptr<std::map<int,int>>。或者最好写 contfree_safe_ptr<std::map<int,int>> ptr;,它使用速度快得多的共享互斥量 - 我们在第二篇文章中描述了它。第二篇文章。然后使用 slock_safe_ptr(ptr)->find(5); 或使用 auto const& ptr_read = ptr; ptr_read->find(5); 将调用只读共享锁 - 我们在第三篇文章中描述了这些方法。

附加背景

可组合性和死锁

如上所述,我们使用锁来确保线程安全,因此我们的算法称为基于锁的算法。

在无锁容器、基于事务内存的算法方面,关于死锁的缺失,一切真的都很好吗?现代 RDBMS:MSSQL(基于锁的 IL)和 Oracle(多版本并发控制)中是否存在死锁?

无锁算法不允许一次更改多个容器。RDBMS 在死锁方面存在与基于锁的算法相同的问题,并且它们通常通过锁超时或锁图来解决。C++ 标准中新的事务安全部分不允许您安全地使用 std::map<> 等复杂算法。

无锁算法不具有可组合操作的属性 - 联合原子地使用多个无锁算法。也就是说,多个无锁数据结构不能一次更改或读取。例如,您可以从 libCDS 使用关联数组的无锁容器,它们将是单独线程安全的。但是,如果您想一次性原子地操作多个无锁容器并保持一致性,那么您无法做到,因为它们的 API 不提供对多个容器同时进行无锁操作的功能。当您更改或读取一个容器时,另一个容器可能已被更改。为了避免这种情况,您将需要使用锁,在这种情况下,它将是基于锁的容器,这意味着它们将具有基于锁算法的所有问题,即可能发生死锁。此外,有时在使用单个容器时也会使用锁。

在事务性 RDBMS 中,例如 MSSQL(基于锁)和 Oracle(多版本并发控制),也使用锁,这就是为什么会出现死锁问题,例如,可以通过构建锁图并查找循环等待来自动解决,或者通过设置锁等待时间 select col from tbl where id in (....) for update wait 30; 如果延迟时间已过或在锁图中找到死锁,则会回滚其中一个事务 - 也就是说,取消该事务已进行的所有更改;解锁所有已锁定的内容;然后您可以尝试从头开始执行事务(并多次尝试)。

Oracle Database 在线文档 11g Release 1 (11.1) - Deadlocks:https://docs.oracle.com/cd/B28359_01/server.111/b28318/consist.htm#i5337

Oracle 锁https://docs.oracle.com/cd/B28359_01/server.111/b28318/consist.htm#i5249

反之,所有表达式都使用独占不兼容锁:Insert/Update/Delete/Select-For-Update

MSSQL 检测和结束死锁:https://technet.microsoft.com/en-us/library/ms178104(v=sql.105).aspx

MS SQL 锁https://technet.microsoft.com/en-us/library/ms186396(v=sql.105).aspx

反之,事务内存与无锁容器不同,它可以原子地处理大量容器/数据。也就是说,事务内存具有可组合操作的功能。内部使用悲观锁(有死锁冲突的可能性)或乐观锁(更有可能出现竞争修改的冲突修改)。在发生任何冲突的情况下,事务会自动取消并从头开始重复,这会导致多次重复所有操作 - 这会产生巨大的开销。他们正试图通过在 CPU 级别创建硬件事务内存来减少开销,但到目前为止还没有显示出可接受性能的实现,尽管 Intel 已经在 Haswell CPU 中添加了硬件事务内存。他们还承诺将事务内存包含在 C++ 标准中,但目前仅作为实验性使用,并且不支持与 std::map 的工作。也就是说,到目前为止,一切都只在理论上是好的。但在未来,这可能会取代常规的同步方法。

最终

  • 基于锁的算法不可组合,除非在其实现中提供了此选项;但可以实现此选项,并且我们在前面的章节中已成功实现。
  • 无锁算法不可组合,并且在不使用锁的情况下组合它们是一项非常复杂的任务;但使用锁,这样的算法就不再是无锁的,并且存在永久死锁的风险。
  • RDBMS:MSSQL(基于锁的 IL)和 Oracle(MVCC)- 存在死锁的可能性,这些死锁通过锁图或超时来消除。
  • C++ 标准实验部分中的事务内存目前仅限于在最简单的算法中使用,并且不允许使用 std::map<> 方法或更复杂的算法。

结论:死锁问题存在于所有类型的高性能算法和系统中,这些系统一次调用多个数据结构。因此,我们为 safe_ptr<> 提供了两种解决方案。

  1. static link_safe_ptrs tmp_link(safe_user_accounts, safe_cash_flows_src_id, safe_cash_flows_dst_id); - 为多个容器使用一个互斥量。
  2. lock_timed_any_infinity lock_all(safe_cash_flows_src_id, safe_cash_flows_dst_id, safe_user_accounts); - 使用锁超时;在时间到期后,解锁所有内容并再次尝试锁定。

如果仅为 safe_ptr<> 使用一个容器和一个递归互斥量,则 safe_ptr<> 中不可能发生死锁,因为死锁至少需要两个递归互斥量(或 1 个非递归互斥量)。

基于锁算法的可组合性

通常认为,基于锁的程序是不可组合的,即,如果您简单地取两个基于锁的数据结构并逐个原子地更改它们,那么在任何时候您都不会得到一致的状态。

但是上面我们轻松地组合了三个基于锁的容器。我们是怎么做到的?在这方面有一个小的澄清 - 加粗显示。

也许最根本的反对意见 [...] 是基于锁的程序不可组合:正确的片段在组合时可能会失败。例如,考虑一个具有线程安全插入和删除操作的哈希表。现在假设我们要从表 t1 中删除一个项目 A,并将其插入到表 t2 中;但中间状态(两者都不包含该项目)不应被其他线程看到。除非哈希表的实现者预见到这种需求,否则根本无法满足此要求。 [...] 简而言之,单个操作(插入、删除)的正确性无法组合成更大的正确操作。

—Tim Harris 等人,“Composable Memory Transactions”,第 2 节:背景,第 2 页。

https://www.microsoft.com/en-us/research/wp-content/uploads/2005/01/2005-ppopp-composable.pdf

事实是,基于锁的算法除非在其实现时提供可组合性,否则是不可组合的。也就是说,基于锁的数据结构不能自动组合,但可以手动组合。例如,就像我们使用 lock_timed_any_infinity 类所做的那样,如果外部可以访问它们的互斥量来进行组合操作。

我们实现了基于锁的模板类 safe_ptr<T>,并且(对于其中的任何 T 类型),我们为可组合性和解决死锁的使用组合操作提供了支持:link_safe_ptrslock_timed_any_infinitylock_timed_any_once

那么,为什么我们选择锁及其悲观选项?

  • 锁是确保操作系统和 C++ 语言线程安全的标准机制。
  • 通过锁,我们可以实现多个数据结构的可组合性和一致性。
  • 悲观锁可能发生死锁,如果您忘记正确组合锁。很难找到这种死锁,但它很容易解决,并且很少发生。在乐观锁中,在任何情况下都可能发生冲突修改。它们很容易找到,但需要额外的 C++ 代码来解决它们,并且发生得更频繁。
  • Tom Kyte 是 Oracle 服务器技术部门的高级技术架构师 - 他是 Oracle DB(多版本并发控制)中悲观锁的支持者:https://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:5771117722373                        关于悲观锁可能导致锁定和死锁的情况,他写道:
引用

我非常喜欢所谓的悲观锁定。用户已非常清楚地宣布了他们更新数据的意图。他们提到的锁定很容易通过会话超时(琐碎)来处理,而死锁非常罕见,而且肯定是一个应用程序错误(在 Oracle 和 RDB 中)。

  • 死锁是一种错误。如果您认为最好减缓几个线程而不是完全停止它们,同时修复您的错误,那么请使用 lock_timed_any_infinity。否则,如果您想永久停止程序,请使用:link_safe_ptrsstd::lock_guard<>
  • 也不需要自动升级锁。例如,Oracle DB 绝不会这样做:https://docs.oracle.com/cd/B28359_01/server.111/b28318/consist.htm#CIHCFHGE

Oracle 数据库从不升级锁。锁升级大大增加了死锁的可能性。

在接下来的文章中,我们将继续定期引用工业 RDBMS 的丰富经验来实现我们的算法。

结论

我们证明了执行围绕指针惯用法在自动提供多线程安全访问方面是正确的 - 严格符合 C++ 标准。我们展示了其可组合性的示例。我们还演示了使用悲观锁定来确保线程安全的优势。

历史

  • 2017年4月24日 - 初始版本
  • 2017年5月1日 - 添加了GitHub链接
© . All rights reserved.