逐步发现 Python - 使用 Python 队列 - 01 - 向队列添加元素
如何在 Python 中向队列添加元素
引言
这是 Python 系列文章的第一篇,旨在解释 Python Queue 类的用途及其在实际项目中的应用。
队列数据结构
队列数据结构以线性顺序存储元素,并支持 2 种操作:push
和 pop
。
push
在队列末尾(最后一个元素)插入一个元素。pop
移除队列的第一个元素并将其返回给调用者。
您可以从一个空队列开始,然后 push
2 个元素(先是 elem1
,然后是 elem2
),接着 pop
两次来获取这两个元素。Queue
结构以 FIFO(先进先出)的顺序访问元素。理论上,该结构可以包含无限数量的元素,但实际上会受到内存限制,因此我们通常会为创建的队列设置 maxsize
限制。
Python 队列实现
有趣的是,Python 使用相同的通用名称(queue),并且在同一个包中,实现了队列数据结构(FIFO 顺序)、堆栈数据结构(LIFO 顺序)和优先队列数据结构(根据参数和 push 时的比较规则进行排序的顺序)。
引用
- class
queue.Queue
(maxsize=0)FIFO(先进先出)队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。
- class
queue.LifoQueue
(maxsize=0)LIFO(后进先出)队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。
- class
queue.PriorityQueue
(maxsize=0)优先队列的构造函数。maxsize 是一个整数,用于设置可以放入队列的项目的上限。一旦达到此大小,插入操作将阻塞,直到队列中的项目被消耗。如果 maxsize 小于或等于零,则队列大小是无限的。
价值最低的条目会先被检索(价值最低的条目是
sorted(list(entries))[0]
返回的条目)。条目的典型模式是元组形式:(priority_number, data)
。
目前我将专注于队列数据结构(FIFO 顺序)。
队列有什么用?
我对使用 Python 实现的队列数据结构的附加属性特别感兴趣。
- 它们是线程安全的。多个线程可以尝试同时访问队列(例如,从队列中 pop 一个元素)。此属性意味着队列将确保在任何单一时间点只有一个线程可以获取队列进行操作。调用线程可以根据需要执行阻塞式或带超时的 pop/push 操作。这简化了复杂工作流程的实现,因为我们不需要编写同步原语来处理并发执行。
- 它们有 maxsize:我们可以通过指定队列的最大元素数量 N(N>0)来限制插入线程的吞吐量。尝试将第 N+1 个元素插入到 maxsize 为 N 的队列(队列已满)的线程将阻塞或失败,具体取决于插入的类型。
除了并发性,我还对使用 Python 队列来
- 解耦处理。从生产者线程,我可以将项目放入队列,供另一个线程中的独立任务处理,然后继续生产者线程的活动,而无需等待处理任务完成。
- 序列化处理。我可以让多个线程生成要处理的元素,并将它们放入队列以供处理。
在这两种情况下,队列都可以作为有限大小(maxsize)的缓冲区进行处理。
队列代码示例
向队列添加元素的方法
来自 Python 文档
引用
存在一个不阻塞的插入方法
Queue.put_nowait(x)
此调用等同于...
Queue.put(x,block=False)
示例 1:向队列添加元素(非阻塞)
这是向队列添加元素的示例代码。
main()
函数包含程序的逻辑。创建了一个队列(test_queue
),其 maxsize
为 5,限制了可以插入其中的元素数量。
我创建了一个命名线程来循环插入元素。线程方法 add_items
接收两个参数:要插入的队列和要插入的总元素数(循环中的索引)。对于此测试,元素是整数,但它们也可以是任何 Python 类型。我已经在代码中引入了线程,因为我打算从一开始就展示线程安全队列与线程之间的交互。我还计划使用需要线程的其他功能来扩展实现。
代码启动了用于插入元素的线程,然后我们 join 该线程,在程序退出前等待其完成。该线程使用 put
调用(Python 中 push
操作的命名)插入 100 个元素来填充队列(test_queue
)。默认情况下,put
方法使用阻塞插入和无限超时。使用 timeout
和 block
参数可以更改此行为。
import queue
import threading
import time
# Discovering Python step by step - Using Python Queues - 01 - Queues - Sample 01
def add_items(processing_queue, num_items):
for x in range(num_items):
time.sleep(.1)
processing_queue.put(x)
print("Adding item: {}".format(x))
print("Processing queue size: {}. Remaining tasks: {}".format(processing_queue.qsize(), processing_queue.unfinished_tasks))
def main():
test_queue = queue.Queue(5)
t_add = threading.Thread(target=add_items, args=(test_queue, 100), name="add_items_thread")
t_add.start()
t_add.join()
if __name__ == "__main__":
tick = time.time()
main()
print("Execution time: {} seconds.".format(round(time.time() - tick, 2)))
每次插入后,我都会打印队列的大小(其中的元素数量)以及未完成的任务数(我们将在本系列后续文章中详细讨论)。此示例中的队列大小为五个元素。在我的计算机上,执行结果如下序列日志:
引用添加项:0
处理队列大小:1。剩余任务:1
添加项:1
处理队列大小:2。剩余任务:2
添加项:2
处理队列大小:3。剩余任务:3
添加项:3
处理队列大小:4。剩余任务:4
添加项:4
处理队列大小:5。剩余任务:5
可以看到,在插入五个元素后,队列已满。put
调用是阻塞的,并带有无限超时。由于没有人从队列中提取元素,下一个 put
调用将被阻塞,等待有机会插入下一个元素。
示例 2:向队列添加元素(非阻塞)
如果我们使 put
非阻塞
def add_items(processing_queue, num_items):
for x in range(num_items):
time.sleep(.1)
processing_queue.put(x, block=False)
print("Adding item: {}".format(x))
print("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
... 我们会看到一个 queue.Full
异常
引用添加项:0
处理队列大小:1。剩余任务:1
添加项:1
处理队列大小:2。剩余任务:2
添加项:2
处理队列大小:3。剩余任务:3
添加项:3
处理队列大小:4。剩余任务:4
添加项:4
处理队列大小:5。剩余任务:5
线程 add_items_thread 中的异常
Traceback (most recent call last)
File "C:\dev\python36\lib\threading.py", line 916, in _bootstrap_inner
self.run()
File "C:\dev\python36\lib\threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:\Documents\Personal\Sources\PythonScripts\Blog\Python\Queues\Article 1\Queues - Article 01.py", line 12, in AddItems
processing_queue.put(x, block=False)
File "C:\dev\python36\lib\queue.py", line 130, in put
raise Full
queue.Full执行时间:6.1 秒。
回顾
我创建了两个使用 put
在队列中进行插入的代码示例:一个启用了阻塞(默认),另一个未启用阻塞,以查看不同结果。由于我们没有从队列中提取元素,这会导致。
- 阻塞 put:队列在插入 5 个元素后阻塞,因为其
maxsize
为 5。 - 非阻塞 put:当超出队列的五个元素限制时,代码会抛出异常(
queue.Full
)。
接下来是什么?
我们开始得非常慢,因为向队列插入元素是一项简单的任务,但当我们向代码中添加额外的线程和功能时,事情会变得复杂起来。我们将继续我系列中的下一篇文章,添加一个从队列中提取元素的处理线程,我们将看到队列中的元素数量和未完成的任务数量会发生什么变化。
历史
- 2020 年 6 月 4 日:初始版本