65.9K
CodeProject 正在变化。 阅读更多。
Home

逐步发现 Python - 使用 Python 队列 - 01 - 向队列添加元素

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.17/5 (4投票s)

2020 年 6 月 4 日

CPOL

7分钟阅读

viewsIcon

13609

如何在 Python 中向队列添加元素

引言

这是 Python 系列文章的第一篇,旨在解释 Python Queue 类的用途及其在实际项目中的应用。

队列数据结构

队列数据结构以线性顺序存储元素,并支持 2 种操作:pushpop

  • push 在队列末尾(最后一个元素)插入一个元素。
  • pop 移除队列的第一个元素并将其返回给调用者。

您可以从一个空队列开始,然后 push 2 个元素(先是 elem1,然后是 elem2),接着 pop 两次来获取这两个元素。Queue 结构以 FIFO(先进先出)的顺序访问元素。理论上,该结构可以包含无限数量的元素,但实际上会受到内存限制,因此我们通常会为创建的队列设置 maxsize 限制。

Python 队列实现

有趣的是,Python 使用相同的通用名称(queue),并且在同一个包中,实现了队列数据结构(FIFO 顺序)、堆栈数据结构(LIFO 顺序)和优先队列数据结构(根据参数和 push 时的比较规则进行排序的顺序)。

来自关于队列构造函数的文档

引用
class queue.Queue(maxsize=0)

FIFO(先进先出)队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.LifoQueue(maxsize=0)

LIFO(后进先出)队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。

class queue.PriorityQueue(maxsize=0)

优先队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。

价值最低的条目会先被检索(价值最低的条目是 sorted(list(entries))[0] 返回的条目)。条目的典型模式是元组形式:(priority_number, data)

目前我将专注于队列数据结构(FIFO 顺序)。

队列有什么用?

我对使用 Python 实现的队列数据结构的附加属性特别感兴趣。

  • 它们是线程安全的。多个线程可以尝试同时访问队列(例如,从队列中 pop 一个元素)。此属性意味着队列将确保在任何单一时间点只有一个线程可以获取队列进行操作。调用线程可以根据需要执行阻塞式或带超时的 pop/push 操作。这简化了复杂工作流程的实现,因为我们不需要编写同步原语来处理并发执行。
  • 它们有 maxsize:我们可以通过指定队列的最大元素数量 N(N>0)来限制插入线程的吞吐量。尝试将第 N+1 个元素插入到 maxsize 为 N 的队列(队列已满)的线程将阻塞或失败,具体取决于插入的类型。

除了并发性,我还对使用 Python 队列来

  • 解耦处理。从生产者线程,我可以将项目放入队列,供另一个线程中的独立任务处理,然后继续生产者线程的活动,而无需等待处理任务完成。
  • 序列化处理。我可以让多个线程生成要处理的元素,并将它们放入队列以供处理。

在这两种情况下,队列都可以作为有限大小(maxsize)的缓冲区进行处理。

队列代码示例

向队列添加元素的方法

来自 Python 文档

引用
Queue.put(item, block=True, timeout=None)

item 放入队列。如果可选参数 block 为 true 且 timeoutNone(默认值),则在必要时阻塞直到有空闲槽可用。如果 timeout 是正数,则最多阻塞 timeout 秒,并在该时间内没有可用空闲槽时引发 Full 异常。否则(block 为 false),如果立即有空闲槽可用,则将项目放入队列,否则引发 Full 异常(此时忽略 timeout)。

存在一个不阻塞的插入方法

Queue.put_nowait(x)

此调用等同于...

Queue.put(x,block=False)

示例 1:向队列添加元素(非阻塞)

这是向队列添加元素的示例代码。

main() 函数包含程序的逻辑。创建了一个队列(test_queue),其 maxsize 为 5,限制了可以插入其中的元素数量。

我创建了一个命名线程来循环插入元素。线程方法 add_items 接收两个参数:要插入的队列和要插入的总元素数(循环中的索引)。对于此测试,元素是整数,但它们也可以是任何 Python 类型。我已经在代码中引入了线程,因为我打算从一开始就展示线程安全队列与线程之间的交互。我还计划使用需要线程的其他功能来扩展实现。

代码启动了用于插入元素的线程,然后我们 join 该线程,在程序退出前等待其完成。该线程使用 put 调用(Python 中 push 操作的命名)插入 100 个元素来填充队列(test_queue)。默认情况下,put 方法使用阻塞插入和无限超时。使用 timeoutblock 参数可以更改此行为。

import queue
import threading
import time

# Discovering Python step by step - Using Python Queues - 01 - Queues - Sample 01

def add_items(processing_queue, num_items):
    for x in range(num_items):
        time.sleep(.1)
        processing_queue.put(x)
        print("Adding item: {}".format(x))
        print("Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))

def main():
    test_queue = queue.Queue(5)
    t_add = threading.Thread(target=add_items, args=(test_queue, 100), name="add_items_thread")
    t_add.start()
    t_add.join()

if __name__ == "__main__":
    tick =  time.time()
    main()
    print("Execution time: {} seconds.".format(round(time.time() - tick, 2)))

每次插入后,我都会打印队列的大小(其中的元素数量)以及未完成的任务数(我们将在本系列后续文章中详细讨论)。此示例中的队列大小为五个元素。在我的计算机上,执行结果如下序列日志:

引用

添加项:0
处理队列大小:1。剩余任务:1
添加项:1
处理队列大小:2。剩余任务:2
添加项:2
处理队列大小:3。剩余任务:3
添加项:3
处理队列大小:4。剩余任务:4
添加项:4
处理队列大小:5。剩余任务:5

可以看到,在插入五个元素后,队列已满。put 调用是阻塞的,并带有无限超时。由于没有人从队列中提取元素,下一个 put 调用将被阻塞,等待有机会插入下一个元素。

示例 2:向队列添加元素(非阻塞)

如果我们使 put 非阻塞

def add_items(processing_queue, num_items):
    for x in range(num_items):
        time.sleep(.1)
        processing_queue.put(x, block=False)
        print("Adding item: {}".format(x))
        print("Processing queue size: {}. 
        Remaining tasks: {}".format(processing_queue.qsize(), 
        processing_queue.unfinished_tasks))

... 我们会看到一个 queue.Full 异常

引用

添加项:0
处理队列大小:1。剩余任务:1
添加项:1
处理队列大小:2。剩余任务:2
添加项:2
处理队列大小:3。剩余任务:3
添加项:3
处理队列大小:4。剩余任务:4
添加项:4
处理队列大小:5。剩余任务:5
线程 add_items_thread 中的异常
Traceback (most recent call last)
File "C:\dev\python36\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\dev\python36\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Documents\Personal\Sources\PythonScripts\Blog\Python\Queues\Article 1\Queues - Article 01.py", line 12, in AddItems
processing_queue.put(x, block=False)
File "C:\dev\python36\lib\queue.py", line 130, in put
raise Full
queue.Full

执行时间:6.1 秒。

回顾

我创建了两个使用 put 在队列中进行插入的代码示例:一个启用了阻塞(默认),另一个未启用阻塞,以查看不同结果。由于我们没有从队列中提取元素,这会导致。

  • 阻塞 put:队列在插入 5 个元素后阻塞,因为其 maxsize 为 5。
  • 非阻塞 put:当超出队列的五个元素限制时,代码会抛出异常(queue.Full)。

接下来是什么?

我们开始得非常慢,因为向队列插入元素是一项简单的任务,但当我们向代码中添加额外的线程和功能时,事情会变得复杂起来。我们将继续我系列中的下一篇文章,添加一个从队列中提取元素的处理线程,我们将看到队列中的元素数量和未完成的任务数量会发生什么变化。

历史

  • 2020 年 6 月 4 日:初始版本
© . All rights reserved.