65.9K
CodeProject 正在变化。 阅读更多。
Home

一步步探索Python - 使用Python队列 - 02 - 从队列中提取元素。另:任务

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.29/5 (5投票s)

2020年6月15日

CPOL

18分钟阅读

viewsIcon

11823

如何在 Python 中从队列中提取元素

背景

上一篇关于队列的文章中,我描述了向队列添加元素的过程。我们创建了一个插入线程,并检查了当队列中有元素和没有元素时,从单独的线程中填充队列时发生的情况。当队列已满时,添加元素的线程会停止。在本文中,我们将继续修改代码,以添加从队列中提取元素进行处理的功能。我还会介绍队列任务的概念。

从队列中提取元素

与插入项一样,有一个Python方法可以从队列中提取项(弹出元素),称为get

来自Python文档

引用
Queue.get(block=True, timeout=None)

从队列中移除并返回一个项。如果可选参数block为true且timeoutNone(默认值),则在有可用项之前进行阻塞。如果timeout为正数,则最多阻塞timeout秒,并在规定时间内没有可用项时引发Empty异常。否则(block为false),如果有一个项可立即用,则返回该项,否则引发Empty异常(此情况下timeout被忽略)。

在3.0之前的POSIX系统以及Windows的所有版本中,如果block为true且timeoutNone,此操作将在底层锁上进入不可中断等待。这意味着不会发生任何异常,特别是SIGINT不会触发KeyboardInterrupt

与插入一样,有一个用于元素提取的非阻塞方法

Queue.get_nowait()

此调用等同于...

Queue.get(block=False)

在这种情况下,引发的异常是Queue.Empty,当无法从队列中提取元素时。

我的建议是使用阻塞方式进行get,并为该调用提供适当的超时(以秒为单位),以避免不可中断等待的问题。为了简化起见,我将在以后的文章中遵循自己的建议。

添加队列元素处理

我对上一篇关于队列的文章中的代码做了一些更改

  • 我在代码中添加了一个新的命名线程用于元素提取和处理(消费者线程)。该线程的关联方法是process_items,它接受将从中提取项进行处理的队列作为参数。
  • 我修改了日志,以便我们可以识别日志记录的线程。
  • 我将插入的项数从100改为10,以减少看到程序结束所需的时间。
  • 我将主线程join到消费者线程,以等待其完成。

目前,我们在程序中使用的实际处理代码并不真正相关。我们将通过在处理线程上执行sleep来模拟处理元素所花费的时间。处理方法包含一个无限循环(while True),以确保线程在可能的情况下继续提取元素。我将在稍后改进这一点,以添加一些方法来以可控的方式停止此线程。

新代码如下

import queue
import threading
import time

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        time.sleep(.1)    
        print("add_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def process_items(processing_queue):
    while True:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, 
            args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), 
                name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    
if __name__ == "__main__":
    main()

在我的电脑上,执行结果是以下日志序列

引用

添加项:0
处理项:0
add_items - 处理队列大小:0。剩余任务:1
添加项:1
add_items - 处理队列大小:1。剩余任务:2
添加项:2
add_items - 处理队列大小:2。剩余任务:3
添加项:3
add_items - 处理队列大小:3。剩余任务:4
添加项:4
add_items - 处理队列大小:4。剩余任务:5
添加项:5
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:1
添加项:6
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:2
添加项:7
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:3
添加项:8
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
添加项:9
处理项:4
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:5
process_items - 处理队列大小:4。剩余任务:4
处理项:6
process_items - 处理队列大小:3。剩余任务:3
处理项:7
process_items - 处理队列大小:2。剩余任务:2
处理项:8
process_items - 处理队列大小:1。剩余任务:1
处理项:9
process_items - 处理队列大小:0。剩余任务:0

<此时程序因处理线程而暂停>

 

您会看到,随着消费者线程的可用,生产者线程能够插入超过第5个元素。生产者线程能够相当快地插入前5个元素(约每秒10个),但从那时起,它不得不接受由消费者线程确定的节奏(每秒1个),因为队列一直保持已满。所有项都已生产并消费。消费者线程是使用无限循环创建的,我们必须对此进行更正,以便在所有工作完成后,处理线程能够正确停止。

在此之前,我想介绍任务的概念。

任务机制

如果您观察process_items方法的代码,我在1秒处理代码模拟sleep的末尾添加了这一行

processing_queue.task_done()

在此以及上一篇文章中,我也打印了“unfinished_tasks”的数量。要理解为什么我使用task_done调用,我们需要再次查看Python文档

引用
Queue.task_done()

指示以前放入队列的任务已完成。供队列消费者线程使用。对于每个用于获取任务的get(),后续调用task_done()表示队列已完成对任务的处理。

如果join()当前正在阻塞,它将在所有项都处理完毕后恢复(这意味着对于已放入队列的每个项,都已接收到一个task_done()调用)。

如果调用次数多于放入队列的项数,则引发ValueError

使用Python队列的主要原因是允许多个生产者和消费者以线程安全的方式从队列中插入/提取元素。生产者插入元素,消费者提取它们。消费者应该对这些项执行任务,进行处理。

Python队列实现提供了一些计数器管理,与插入队列的项数量有关

  • 项数:当项被插入和提取时(使用putget)的实际项数。队列强制项数达到数量限制(队列maxsize),因此它知道队列何时为空或已满。
  • 未完成任务数:与项数类似,但有一个细微差别:当我们向队列中插入项时,未完成任务数会增加,但当我们从队列中提取项时,未完成任务数保持不变。只有在调用Queue.task_finished时才会减少。这迫使我们在代码中确保消费者线程在处理完项后,每个提取的元素都调用一次task_finished方法,而不是在此之前。还要确保没有未捕获的异常会跳过对该方法的调用。未完成任务的最大值可计算为
    引用

    未完成任务的最大数量 = (“队列maxsize值”) + min(“正在提取和处理元素的线程数”,“队列maxsize值”)

在我们的例子中,我们有生产者线程插入了5个项,以及1个消费者线程提取项,所以我们有5 + min(1,5) = 6,这是未完成任务的最大数量,与前面显示的日志一致。

Python队列实现使用锁线程机制来强制对计数器进行独占访问,并确保计数信息以线程安全的方式进行修改。getputtask_done方法使用相同的对象。在我们的代码中,我们需要通过调用Queue.task_done()来通知队列类一项任务(消费者线程中的项处理)已完成。这样在Python中实现,我们可以使用queue对象作为中心位置,在线程之间进行通信,包括生产者线程、消费者线程以及希望等待队列中任务完成的线程。

通过检查未完成任务数来停止消费者线程

检查未完成任务数

停止消费者线程循环的最简单方法是在队列中的所有任务都完成后,使用Queue.unfinished_tasks。如果此队列对象属性的值大于零,则表示仍有任务需要完成,我们应该继续尝试从队列中提取项。

我们将修改process_items方法中的代码,在检测到队列已完成所有任务时停止循环

def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))

这样,生成的代码就会按预期运行

引用

添加项:0
处理项:0
add_items - 处理队列大小:0。剩余任务:1
添加项:1
add_items - 处理队列大小:1。剩余任务:2
添加项:2
add_items - 处理队列大小:2。剩余任务:3
添加项:3
add_items - 处理队列大小:3。剩余任务:4
添加项:4
add_items - 处理队列大小:4。剩余任务:5
添加项:5
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:1
添加项:6
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:2
添加项:7
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:3
添加项:8
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:4
添加项:9
add_items - 处理队列大小:5。剩余任务:6
process_items - 处理队列大小:5。剩余任务:5
处理项:5
process_items - 处理队列大小:4。剩余任务:4
处理项:6
process_items - 处理队列大小:3。剩余任务:3
处理项:7
process_items - 处理队列大小:2。剩余任务:2
处理项:8
process_items - 处理队列大小:1。剩余任务:1
处理项:9
process_items - 处理队列大小:0。剩余任务:0

<程序现在已正确完成,无限循环不再导致暂停>

这段代码看起来可行。

如果生产者太慢,未完成任务数会归零

乍一看,检查未完成任务数供消费者线程知道何时完成所有工作似乎是个好主意。但是,在某些情况下,如果生产者线程不能始终保持队列中有项,消费者线程会过早停止。如果项的生产速率远低于消费者的处理速率,消费者线程在某个时刻会看到一个空队列,并且unfinished_tasks的值为零。相对插入/提取速率会影响未完成任务计数如何达到零,使得此值不足以标记完成。

让我们看看这种情况。我将修改生产者和消费者线程循环中的sleep周期:我将使生产者线程每1秒生成一个项,并使消费者快得多,以便在0.1秒内处理一个项。我将添加更多日志记录,以确切了解线程何时停止。

import queue
import threading
import time

def add_items(processing_queue, num_items):
    for x in range(num_items):
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        time.sleep(1)
        print("add_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
    print("add_items - Exiting thread")

def process_items(processing_queue):
    while processing_queue.unfinished_tasks > 0:
        # Blocking get, no timeout
        x = processing_queue.get()
        # Processing happens here
        print("Processing item: {}".format(x))
        # simulate processing time. Much slower than insertion
        time.sleep(0.1)
        processing_queue.task_done()
        print("process_items - Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
    print("process_items - Exiting thread")

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread")
    t_process.start()

    t_add.join()
    t_process.join()
    print("main - Finished")
    

if __name__ == "__main__":
    main()

执行此代码的结果是

引用

添加项:0
处理项:0
process_items - 处理队列大小:0。剩余任务:0
process_items - 退出线程
add_items - 处理队列大小:0。剩余任务:0
添加项:1
add_items - 处理队列大小:1。剩余任务:1
添加项:2
add_items - 处理队列大小:2。剩余任务:2
添加项:3
add_items - 处理队列大小:3。剩余任务:3
添加项:4
add_items - 处理队列大小:4。剩余任务:4
添加项:5
add_items - 处理队列大小:5。剩余任务:5

<此时程序因生产者线程等待队列空位而暂停>

消费者线程在处理完第一个项后退出。由于unfinished_tasks计数达到零并保持一段时间,然后生产者才有机会插入新元素。此时,消费者认为没有更多项了,然后退出,也停止了项提取。生产者线程在插入第5个项后无法插入更多元素,因为队列maxsize为5,程序将无限期地挂起。

在main()中使用Queue.join()等待队列完成

您可能想知道为什么我没有使用Python队列实现提供的Queue.join()功能,而是使用了主线程中的Thread.join()

来自文档

引用
Queue.join()

阻塞直到队列中的所有项都已获取并处理完毕。

未完成任务计数在每次将项添加到队列时增加。每当消费者线程调用task_done()表示项已检索并完成所有工作时,计数会减少。当未完成任务计数降至零时,join()将解除阻塞。

来自Queue.join()的实现

def join(self):
        '''Blocks until all items in the Queue have been gotten and processed.
        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.
        When the count of unfinished tasks drops to zero, join() unblocks.
        '''
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

因此,它会等待直到未完成任务计数达到零。如果我们用我们代码的main()方法中的队列join替换线程join,我们得到

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread")
    t_process.start()

    # Replaced this with Queue.join()
    # t_add.join()
    # t_process.join()
    # Replaced this with Queue.join()
    test_queue.join() # New
    print("main - Finished")

现在执行程序,我们会看到主线程过早退出,当未完成任务计数达到零时,就像之前的示例中的消费者线程一样,这是因为在执行的初始阶段,未完成任务计数(由于生产者线程中项的生成缓慢)达到了零。程序执行产生以下日志

引用

添加项:0
处理项:0
process_items - 处理队列大小:0。剩余任务:0
main - finished
process_items - 退出线程

add_items - 处理队列大小:0。剩余任务:0
添加项:1
add_items - 处理队列大小:1。剩余任务:1
添加项:2
add_items - 处理队列大小:2。剩余任务:2
添加项:3
add_items - 处理队列大小:3。剩余任务:3
添加项:4
add_items - 处理队列大小:4。剩余任务:4
添加项:5
add_items - 处理队列大小:5。剩余任务:5

<此时程序因生产者线程等待队列空位而暂停>

如果使用Queue.join(),主线程不会等待正在运行的线程完成。Queue.join()Queue.unfinished_tasks都是不可靠的完成指示器,因为未完成任务的数量取决于相对的生产/消费速度,在一般情况下不能被视为足够的完成指示器。

使用守护线程避免暂停

我们可以尝试使用守护线程来避免线程暂停问题。这是Queue类文档示例部分的一部分。这样,当主线程结束时,所有其他守护线程都会停止。我不建议在一般情况下使用它们,如果您计划使用访问队列的线程中的资源(系统句柄)。我现在解释原因。

来自文档

引用

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

此构造函数应始终使用关键字参数调用。参数是

group应为None;当实现ThreadGroup类时,为将来扩展保留。

target是可通过run()方法调用的可调用对象。默认为None,表示不调用任何内容。

name是线程名。默认情况下,会构造一个唯一的名称,格式为“Thread-N”,其中N是一个小的十进制数字。

args是目标调用的参数元组。默认为()。

kwargs是目标调用的关键字参数字典。默认为{}。

如果不是None,daemon会显式设置线程是否为守护线程。如果为None(默认值),则守护属性从当前线程继承。

如果子类覆盖了构造函数,它必须确保在对线程执行任何其他操作之前调用基类构造函数(Thread.__init__())。

还有

引用

线程可以被标记为“守护线程”。这意味着当只剩下守护线程时,整个Python程序将退出。初始值从创建的线程继承。可以通过daemon 属性或daemon构造函数参数设置该标志。

注意

守护线程在关闭时被强制终止。它们的资源(如打开的文件、数据库事务等)可能无法正确释放。如果您希望线程正常停止,请将它们设置为非守护线程,并使用适当的信号机制,如Event

有一个“主线程”对象;它对应于Python程序中的初始控制线程。它不是守护线程。

有可能创建“虚拟线程对象”。这些是对应于“外部线程”的线程对象,外部线程是在threading模块之外启动的控制线程,例如直接从C代码启动。虚拟线程对象的功能有限;它们始终被视为活动且守护的,并且无法被join()。由于无法检测外部线程的终止,因此它们永远不会被删除。

文档的最后这段告诉我们,如果我们使用守护线程,我们的所有线程都会被中断,并且所有资源都可能被留下,或者处于不一致的状态。当您处理线程中的资源时,Python文档中的代码应被修改,因此您应避免使用守护线程。文档为我们提供了使用Event而不是使用守护线程来正确停止线程的提示。

要使线程成为非守护线程,您应该在创建线程时指定daemon=False。否则,创建的线程将是守护线程。

作为其运行方式的示例,我们可以用修改后的main()方法进行测试

def main():
    test_queue = queue.Queue(5)

    # insertion thread
    t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread", daemon=True)
    t_add.start()

    # processing thread (new)
    t_process = threading.Thread(target=process_items, args=(test_queue,), name="process_items_thread", daemon=True)
    t_process.start()

    # Replaced this with Queue.join()
    # t_add.join()
    # t_process.join()
    # Replaced this with Queue.join()
    test_queue.join() # New
    print("main - Finished")

生成的日志

引用

添加项:0
处理项:0
process_items - 处理队列大小:0。剩余任务:0
main - finished
process_items - 退出线程

<程序退出,所有线程被杀死>

程序在处理完第一个项后立即退出。消费者线程正常完成,但我们没有看到生产者“Exiting thread”的打印。管理该线程中资源的Python代码可能没有正常运行以释放资源,并且可能。

到目前为止我们发现了什么

在对代码进行所有更改以及在本篇和上一篇文章中发现的程序中的不足之后,我认为列出我们需要在实现使用队列和线程的生产者/消费者应用程序时考虑的事项是合理的。

  • 我们使用队列的代码应独立于项生成/消耗的相对速度。应允许生产者以任何速率生成元素,无论消耗速率如何,并且消费者应能够正确检测完成。要处理的项可能需要时间来生成,或者从数据库或远程HTTP服务器中提取。消费者线程应允许缓慢处理项,或非常快速地处理,而不会影响完成状态的检测。
  • 我们使用队列的代码应避免使用守护线程。守护线程会导致打开的外部资源(文件、内存、套接字等)出现问题。
  • 应用程序应在退出前等待完成,或者至少等待生产者线程停止生成项:我们将使用Event类来处理线程之间的同步(标记线程完成等)。我们将避免在main()方法中使用Queue.join(),因为它是Python中的实现(我们仍然可以在我们自己的queue实现中覆盖join方法)。

回顾

在添加消费者线程后,我们意识到Python队列实现提供了一些检测完成所需的方​​法和值,但它们不足够,我们需要添加Python事件。使用守护线程可以解决一些问题,但如果我们的队列程序管理系统资源,可能会导致更多问题,因此我们将避免使用它们。在本系列的下一篇文章中,我将讨论如何安全地同步和停止操作队列的生产者和消费者线程,而不使用守护线程,并遵循所学的注意事项。

在我的下一篇文章中,我们将继续进行改进,以解决发现的问题。

历史

  • 2020年6月15日:初版
  • 2020年8月21日:在总结中添加了指向下一篇文章的链接
© . All rights reserved.