65.9K
CodeProject 正在变化。 阅读更多。
Home

Python 并发编程:您应该了解的 6 种 Python 同步原语

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2018 年 4 月 11 日

CPOL

12分钟阅读

viewsIcon

11651

这是 Elliot Forbes 所著的《Python 并发编程学习》一书的摘录。由 Packt Publishing 出版。

竞态条件是并发编程中一个麻烦且备受诟病的问题,它困扰着全球成百上千的程序。

竞态条件的标准定义如下:

竞态条件或竞态危害是指电子、软件或其他系统的行为,其输出取决于其他不可控事件的顺序或时序。

在应用程序中实现并发时,我们需要防范的主要问题之一就是竞态条件。这些竞态条件会削弱我们的应用程序,并导致难以调试甚至更难修复的错误。为了防止这些问题,我们需要同时了解这些竞态条件是如何发生的,以及如何使用本章将要介绍的同步原语来防范它们。

如果你想在 Python 中创建线程安全的、高性能的程序,理解同步以及可用的基本原语至关重要。幸运的是,Python 的 threading 模块提供了许多不同的同步原语,可以帮助我们在各种并发场景中进行处理。

在本节中,我将简要概述所有可用的同步原语,并提供一些如何在程序中使用它们的简单示例。完成后,您应该能够实现自己的并发 Python 程序,以线程安全的方式访问资源。

join 方法

在开发极其重要的企业系统时,能够确定某些任务的执行顺序至关重要。幸运的是,Python 的线程对象允许我们对此进行某种程度的控制,因为它们带有 join 方法。

join 方法本质上会阻止父线程进一步执行,直到该线程确认已终止。这可能是由于自然结束,也可能是由于线程抛出了未处理的异常。

锁是访问多个执行线程的共享资源时必不可少的机制。最好的比喻就是想象你只有一个浴室,但有很多室友——当你想要洗漱或淋浴时,你会锁上门,这样其他人就不能同时使用浴室了。

Python 中的锁是一种同步原语,可以让我们有效地锁上浴室门。它可以处于“锁定”或“解锁”状态,并且我们只能在锁处于“解锁”状态时获取它。

RLocks(重入锁)

重入锁,或称 RLocks,是一种同步原语,其工作方式与标准锁原语非常相似,但如果一个线程已经拥有它,它可以多次被该线程获取。

例如,假设 thread-1 获取了 RLock,那么 thread-1 每次获取锁时,RLock 中的计数器就会加 1。如果 thread-2 试图获取 RLock,那么它必须等到 RLock 的计数器降至 0 才能被获取。thread-2 将进入阻塞状态,直到满足此 0 条件。

但这有什么用呢?嗯,当你想要确保类中访问其他类方法的某个方法能够线程安全地访问时,它就会派上用场。

条件

条件(Condition)是一种同步原语,它等待另一个线程的信号。例如,另一个线程可能已经完成了执行,当前的线程可以继续对结果执行某种计算。

信号量

在第一章中,我们曾提及并发的历史,并谈到了 Dijkstra。Dijkstra 是将铁路系统的信号灯概念转化为我们可以在复杂的并发系统中使用的东西的人。

信号灯(Semaphore)有一个内部计数器,每次调用 acquire 或 release 时都会增加或减少。初始化时,除非另有设置,否则此计数器默认为 1。如果计数器将变为负整数,则无法获取信号灯。

假设我们用信号灯保护了一段代码,并将信号灯的值设置为 2。如果一个线程获取了信号灯,那么信号灯的值将减至 1。然后,如果另一个线程尝试获取信号灯,信号灯的值将减至 0。此时,如果还有另一个线程进来,信号灯将拒绝其获取请求,直到其中一个原始线程调用 release 方法,并且计数器增加到之前的 0。

有界信号灯(Bounded Semaphores)

有界信号灯与普通信号灯几乎相同。除了以下几点:

有界信号灯会检查其当前值是否超过其初始值。如果超过,则会引发 ValueError。在大多数情况下,信号灯用于保护容量有限的资源。

如果信号灯被释放的次数过多,这通常是错误的迹象。如果未给出值,则默认值为 1。

这些有界信号灯通常用于 Web 服务器或数据库实现中,以防止在过多用户同时尝试连接或同时执行特定操作时出现资源耗尽的情况。

通常,使用有界信号灯比使用普通信号灯更好。如果我们使用 threading.BoundedSemaphore(4) 来更改我们信号灯示例的前面代码并再次运行它,我们会看到几乎完全相同的行为,只是我们已经保护了我们的代码免受一些非常简单的程序错误的影响,这些错误否则将无法捕获。

事件

事件(Event)是非常有用但也很简单的,用于多个并发运行线程之间通信的一种形式。使用事件,一个线程通常会发出事件已发生的信号,而其他线程则积极监听此信号。

事件本质上是具有内部标志的对象,该标志为 true 或 false。在我们的线程中,我们可以连续轮询此事件对象来检查其状态,然后在标志状态更改时选择以我们想要的方式执行。

在上一章中,我们讨论了 Python 中没有真正的原生杀死线程的机制,这一点仍然成立。然而,我们可以利用这些事件对象,让我们的线程仅在我们的事件对象保持未设置状态时运行。虽然当发送 SIGKILL 信号时这不太有用,但在某些需要优雅关闭的情况下,它可能很有用,但前提是我们可以等待线程完成当前正在执行的任务,然后再终止。

Event 对象有四个 public 函数,我们可以用它们来修改和利用它:

  • isSet():检查事件是否已设置。
  • set():设置事件。
  • clear():重置我们的事件对象。
  • wait():阻塞直到内部标志设置为 true

屏障(Barrier)

屏障是一种同步原语,它在 Python 语言的第三个主要版本中引入,并解决了仅通过条件和信号灯的复杂组合才能解决的问题。

这些屏障是控制点,可用于确保只有在所有参与线程都到达同一点之后,一组线程才能继续前进。

这听起来可能有点复杂且不必要,但在某些情况下它可能极其强大,而且肯定可以降低代码的复杂性。

深入了解 join 方法

总而言之,join 方法本质上会阻止父线程进一步执行,直到该线程确认已终止。这可能是由于自然结束,也可能是由于线程抛出了未处理的异常。让我们通过以下示例来理解这一点:

import threading
import time
def ourThread(i):
 print("Thread {} Started".format(i))
 time.sleep(i*2)
 print("Thread {} Finished".format(i))
def main():
 thread1 = threading.Thread(target=ourThread, args=(1,))
 thread1.start()
 print("Is thread 1 Finished?")
 thread2 = threading.Thread(target=ourThread, args=(2,))
 thread2.start()
 thread2.join()
 print("Thread 2 definitely finished")
if __name__ == '__main__':
 main()

分解说明

前面的代码示例展示了如何利用 join 方法使线程化程序的流程在一定程度上确定化的示例。

我们首先定义一个名为 myThread 的非常简单的函数,它接受一个参数。此函数的作用是打印出它何时启动,休眠指定值乘以 2 的时间,然后打印出它何时完成执行。

在我们的 main 函数中,我们定义了两个线程,第一个线程被恰当地命名为 thread1,并传入 1 作为其唯一参数。然后我们启动该线程并执行一个 print 语句。重要的是要注意,第一个 print 语句在 thread1 完成之前执行。

然后,我们创建第二个线程对象,并富有想象力地将其命名为 thread2,这次传入 2 作为唯一参数。但关键区别在于,我们在启动该线程后立即调用 thread2.join()。通过调用 thread2.join(),我们可以保留执行 print 语句的顺序,并且在输出中可以看到,Thread 2 Is Definitely Finished 确实是在 thread2 终止之后打印的。

整合

虽然 join 方法可能非常有用,并且提供了一种快速简洁的方法来确保代码的顺序,但同样重要的是要注意,我们可能会通过最初使代码多线程来抵消我们所做的所有改进。

考虑前面示例中的 thread2 对象——我们通过将其多线程化获得了什么?我知道这是一个相当简单的程序,但重点是我们在启动它后立即对其进行了 join,基本上阻塞了我们的主线程,直到 thread2 完成其执行。我们在 thread2 执行期间有效地将我们的多线程应用程序变成了单线程。

深入了解锁

总而言之,Python 中的锁是一种同步原语,可以让我们有效地锁上浴室门。它可以处于“锁定”或“解锁”状态,并且我们只能在锁处于“解锁”状态时获取它。

示例

在第 2 章“并行化”中,我们回顾了以下代码示例:

import threading
import time
import random
counter = 1
def workerA():
 global counter
 while counter < 1000:
   counter += 1
   print("Worker A is incrementing counter to {}".format(counter))
   sleepTime = random.randint(0,1)
   time.sleep(sleepTime)
def workerB():
 global counter
 while counter > -1000:
   counter -= 1
   print("Worker B is decrementing counter to {}".format(counter))
   sleepTime = random.randint(0,1)
   time.sleep(sleepTime)
def main():
 t0 = time.time()
 thread1 = threading.Thread(target=workerA)
 thread2 = threading.Thread(target=workerB)
 thread1.start()
 thread2.start()
 thread1.join()
 thread2.join()
 t1 = time.time()
 print("Execution Time {}".format(t1-t0))
if __name__ == '__main__':
 main()

在前面的示例中,我们看到两个线程不断地竞争以增加或减少一个计数器。通过添加锁,我们可以确保这些线程能够以确定且安全的方式访问我们的计数器。

import threading
import time
import random
counter = 1
lock = threading.Lock()
def workerA():
 global counter
 lock.acquire()
 try:
   while counter < 1000:
     counter += 1
     print("Worker A is incrementing counter to {}".format(counter))
     sleepTime = random.randint(0,1)
     time.sleep(sleepTime)
 finally:
   lock.release()
def workerB():
 global counter
 lock.acquire()
 try:
   while counter > -1000:
     counter -= 1
     print("Worker B is decrementing counter to {}".format(counter))
     sleepTime = random.randint(0,1)
     time.sleep(sleepTime)
 finally:
   lock.release()
def main():
 t0 = time.time()
 thread1 = threading.Thread(target=workerA)
 thread2 = threading.Thread(target=workerB)
 thread1.start()
 thread2.start()
 thread1.join()
 thread2.join()
 t1 = time.time()
 print("Execution Time {}".format(t1-t0))
if __name__ == '__main__':
 main()

分解说明

在前面的代码中,我们添加了一个非常简单的锁原语,它封装了我们两个工作函数中的 while 循环。当线程启动时,它们都会竞相获取锁以便执行目标,并尝试将计数器增加到 1,000 或 -1,000,而不必与其他线程竞争。只有在一个线程完成其目标并释放锁之后,另一个线程才能获取该锁并尝试增加或减少计数器。

前面的代码执行速度会非常慢,因为它主要用于演示目的。如果删除 while 循环中的 time.sleep() 调用,那么您应该会注意到这段代码几乎会立即执行。

深入了解 RLocks(重入锁)

总而言之,重入锁,或称 RLocks,是一种同步原语,其工作方式与标准锁原语非常相似,但如果一个线程已经拥有它,它可以多次被该线程获取。

示例

import threading
import time
class myWorker():
def __init__(self):
  self.a = 1
  self.b = 2
  self.Rlock = threading.RLock()
 def modifyA(self):
  with self.Rlock:
    print("Modifying A : RLock Acquired:
    {}".format(self.Rlock._is_owned()))
    print("{}".format(self.Rlock))
    self.a = self.a + 1
    time.sleep(5)
def modifyB(self):
  with self.Rlock:
    print("Modifying B : RLock Acquired:
    {}".format(self.Rlock._is_owned()))
    print("{}".format(self.Rlock))
    self.b = self.b - 1
    time.sleep(5)
def modifyBoth(self):
  with self.Rlock:
    print("Rlock acquired, modifying A and B")
    print("{}".format(self.Rlock))
    self.modifyA()
    self.modifyB()
  print("{}".format(self.Rlock))
workerA = myWorker()
workerA.modifyBoth()

分解说明

在前面的代码中,我们看到了一个 RLock 在单线程程序中工作的典型示例。我们定义了一个名为 myWorker 的类,该类具有四个函数:构造函数(用于初始化我们的 Rlock 和 a、b 变量)。

然后我们继续定义两个函数,它们分别修改 a 和 b。它们都首先使用 with 语句获取类的 Rlock,然后执行对我们内部变量的任何必要修改。

最后,我们有一个 modifyBoth 函数,它执行初始的 Rlock 获取,然后调用 modifyAmodifyB 函数。

在每一步,我们都会打印出 Rlock 的状态。我们看到,在 modifyBoth 函数中获取 Rlock 后,它的所有者被设置为主线程,并且其计数器增加到一。当我们下次调用 modifyA 时,RLock 的计数器再次增加一,在进行必要的计算后,modifyA 会释放 Rlock。在 modifyA 函数释放 Rlock 后,我们看到计数器减至 1,然后立即被我们的 modifyB 函数再次增加到 2。

最后,当 modifyB 完成执行时,它会释放 Rlock,然后我们的 modifyBoth 函数也会释放。当我们最后一次打印 Rlock 对象时,我们看到所有者已设置为 0,并且计数也设置为 0。只有在这个时间点,另一个线程理论上才能获得此锁。

输出

输出将如下所示:

$ python3.6 04_rlocks.py
    Rlock acquired, modifying A and B
    <locked
_thread.RLock object owner=140735793988544 count=1 at 
    0x10296e6f0>
    Modifying A : RLock Acquired: True
    <locked
_thread.RLock object owner=140735793988544 count=2 at 
    0x10296e6f0>
    <locked
_thread.RLock object owner=140735793988544 count=1 at 
    0x10296e6f0>
    Modifying B : RLock Acquired: True
    <locked
_thread.RLock object owner=140735793988544 count=2 at 
    0x10296e6f0>
    <unlocked
_thread.RLock object owner=0 count=0 at 0x10296e6f0>

RLocks 与普通锁的对比

如果我们尝试使用传统的锁原语执行相同的先前程序,那么您应该会注意到程序实际上从未到达执行 modifyA() 函数的地步。我们的程序将基本上进入一种死锁状态,因为我们没有实现允许我们的线程继续执行的释放机制。这在以下代码示例中有所体现:

import threading
import time
class myWorker():
def __init__(self):
  self.a = 1
  self.b = 2
  self.lock = threading.Lock()
 def modifyA(self):
  with self.lock:
    print("Modifying A : RLock Acquired:
{}".format(self.lock._is_owned()))
    print("{}".format(self.lock))
    self.a = self.a + 1
    time.sleep(5)
def modifyB(self):
  with self.lock:
    print("Modifying B : Lock Acquired:
{}".format(self.lock._is_owned()))
    print("{}".format(self.lock))
    self.b = self.b - 1
    time.sleep(5)
def modifyBoth(self):
  with self.lock:
    print("lock acquired, modifying A and B")
    print("{}".format(self.lock))
    self.modifyA()
    print("{}".format(self.lock))
    self.modifyB()
  print("{}".format(self.lock))
workerA = myWorker()
workerA.modifyBoth()

RLocks 本质上允许我们在递归地获得某种线程安全性,而无需在整个代码中实现复杂的获取和释放锁逻辑。它们使我们能够编写更简单的代码,这些代码更容易理解,因此在代码投入生产后更容易维护。

要更深入地了解其他 Python 同步原语,并学习编写高效并发 Python 程序的其他技术和技巧,请参阅 Packt Publishing 的书籍《 Learning Concurrency in Python》,作者是 Elliot Forbes

© . All rights reserved.