逐步探索 Python - 使用 Python 队列 - 03 - 正确检测队列完成
当使用 Python 队列时,如何从其他线程正确检测队列完成。
背景
在我 上一篇关于队列的文章 中,我们发现对于一般情况,Python 队列提供的功能是不够的。在此过程中,我列出了我们程序所需的最低要求。再次在此列出:
我们的应用程序...
- 应该独立于项目生成/消耗的相对速度。生产者应该能够以任何速率生成元素,而不管消耗速率如何,并且消费者应该能够正确检测完成。要处理的项目可能需要时间来生成,或者从数据库或远程 HTTP 服务器中提取。消费者线程应该能够缓慢或快速地处理项目,而不会影响完成状态的检测。
- 不应使用守护线程。守护线程在处理打开的外部资源(文件、内存、套接字等)时会产生问题。
- 应用程序应在退出前等待完成,或者至少等待生产者线程停止生成项目:我们将使用
Event
类来处理线程之间的同步(标记线程完成等)。我们将避免在 Python 的main()
方法中使用Queue.join()
(我们仍然可以在自己的queue
实现中重写join
方法)。
示例 1:上一篇文章中的代码
我们将从以下代码开始:
import queue
import threading
import time
def add_items(processing_queue, num_items):
for x in range(num_items):
processing_queue.put(x)
print("Adding item: {}".format(x))
time.sleep(1)
print("add_items - Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
print("add_items - Exiting thread")
def process_items(processing_queue):
while processing_queue.unfinished_tasks > 0:
# Blocking get, no timeout
x = processing_queue.get()
# Processing happens here
print("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
print("Item: {} processed.".format(x))
print("process_items - Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
print("process_items - Exiting thread")
def main():
test_queue = queue.Queue(5)
# insertion thread
t_add = threading.Thread(target=add_items,
args=(test_queue, 10), name="add_items_thread")
t_add.start()
# processing thread
t_process = threading.Thread(target=process_items,
args=(test_queue,), name="process_items_thread")
t_process.start()
t_add.join()
t_process.join()
print("main - Exiting thread")
if __name__ == "__main__":
main()
这段代码卡住了,因为当未完成任务的数量达到零时(太早,未完成),消费者线程就退出了。因此,没有其他项目被从队列中提取。请注意,我已将代码恢复为使用线程 join,而不是使用 Queue.join()
。
引用添加项:0
正在处理项目:0
项目:0 已处理。
process_items - 队列大小:0。剩余任务:0
process_items - 线程退出
add_items - 队列大小:0。剩余任务:0
添加项:1
add_items - 队列大小:1。剩余任务:1
添加项:2
add_items - 队列大小:2。剩余任务:2
添加项:3
add_items - 队列大小:3。剩余任务:3
添加项:4
add_items - 队列大小:4。剩余任务:4
添加项目:5
add_items - 队列大小:5。剩余任务:5<此时程序停止,生产者线程正在等待队列中的空位>
这段代码需要一些修复。我将改进日志记录,并添加一个事件来通知其他线程生产者线程何时完成生产。
示例 2:添加适当的日志记录
我对日志记录方式进行了一些改进,以便我们能更好地跟踪流程。首先,我们使用标准的 Python 库 logging
。我添加了一个配置方法 configure_logging
,该方法将在程序开始时调用,设置日志格式,包括毫秒级的时间和线程名称。目前,我将最低日志级别设置为 INFO
。
import queue
import threading
import time
import logging
def configure_logging():
log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
datefmt='%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=logging.INFO,
format=log_format,
datefmt=datefmt,
)
def add_items(processing_queue, num_items):
for x in range(num_items):
processing_queue.put(x)
logging.info("Adding item: {}".format(x))
time.sleep(1)
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
logging.info("Exiting thread")
def process_items(processing_queue):
while processing_queue.unfinished_tasks > 0:
# Blocking get, no timeout
x = processing_queue.get()
# Processing happens here
logging.info("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
logging.info("Item: {} processed.".format(x))
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
logging.info("Exiting thread")
def main():
configure_logging()
test_queue = queue.Queue(5)
# insertion thread
t_add = threading.Thread(target=add_items, args=(test_queue, 10), name="add_items_thread")
t_add.start()
# processing thread (new)
t_process = threading.Thread(target=process_items, args=(test_queue,),
name="process_items_thread")
t_process.start()
t_add.join()
t_process.join()
logging.info("Exiting thread")
if __name__ == "__main__":
main()
结果
引用2020-06-22 18:56:34.797|add_items_thread |添加项目:0
2020-06-22 18:56:34.798|process_items_thread |正在处理项目:0
2020-06-22 18:56:34.899|process_items_thread |项目:0 已处理。
2020-06-22 18:56:34.903|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:56:34.905|process_items_thread |线程退出
2020-06-22 18:56:35.799|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:56:35.799|add_items_thread |添加项目:1
2020-06-22 18:56:36.801|add_items_thread |队列大小:1。剩余任务:1
2020-06-22 18:56:36.805|add_items_thread |添加项目:2
2020-06-22 18:56:37.807|add_items_thread |队列大小:2。剩余任务:2
2020-06-22 18:56:37.808|add_items_thread |添加项目:3
2020-06-22 18:56:38.809|add_items_thread |队列大小:3。剩余任务:3
2020-06-22 18:56:38.810|add_items_thread |添加项目:4
2020-06-22 18:56:39.811|add_items_thread |队列大小:4。剩余任务:4
2020-06-22 18:56:39.813|add_items_thread |添加项目:5
2020-06-22 18:56:40.815|add_items_thread |队列大小:5。剩余任务:5<此时程序停止,生产者线程正在等待队列中的空位>
示例 3:使用事件信号生产者线程的停止
现在我们将使用 threading.Event
正确地信号生产者线程的结束。当 add_items
方法完成时,它会使用 event.set()
调用来信号生产结束。消费者线程应该在事件被设置时停止。
import queue
import threading
import time
import logging
def configure_logging():
log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
datefmt='%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=logging.INFO,
format=log_format,
datefmt=datefmt,
)
def add_items(processing_queue, num_items, queue_event):
for x in range(num_items):
processing_queue.put(x)
logging.info("Adding item: {}".format(x))
time.sleep(1)
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
queue_event.set()
logging.info("Exiting thread")
def process_items(processing_queue, queue_event):
while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
# Blocking get, no timeout
x = processing_queue.get()
# Processing happens here
logging.info("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
logging.info("Item: {} processed.".format(x))
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
logging.info("Exiting thread")
def main():
configure_logging()
test_queue = queue.Queue(5)
work_finished_event = threading.Event()
# insertion thread
t_add = threading.Thread(target=add_items,
args=(test_queue, 10, work_finished_event), name="add_items_thread")
t_add.start()
# processing thread (new)
t_process = threading.Thread(target=process_items,
args=(test_queue, work_finished_event), name="process_items_thread")
t_process.start()
t_add.join()
t_process.join()
logging.info("Exiting thread")
if __name__ == "__main__":
main()
执行结果如下:
引用2020-06-22 18:57:36.748|add_items_thread |添加项目:0
2020-06-22 18:57:36.751|process_items_thread |正在处理项目:0
2020-06-22 18:57:36.852|process_items_thread |项目:0 已处理。
2020-06-22 18:57:36.853|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:37.754|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:37.757|add_items_thread |添加项目:1
2020-06-22 18:57:37.759|process_items_thread |正在处理项目:1
2020-06-22 18:57:37.875|process_items_thread |项目:1 已处理。
2020-06-22 18:57:37.878|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:38.767|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:38.767|add_items_thread |添加项目:2
2020-06-22 18:57:38.767|process_items_thread |正在处理项目:2
2020-06-22 18:57:38.869|process_items_thread |项目:2 已处理。
2020-06-22 18:57:38.871|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:39.769|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:39.769|add_items_thread |添加项目:3
2020-06-22 18:57:39.770|process_items_thread |正在处理项目:3
2020-06-22 18:57:39.871|process_items_thread |项目:3 已处理。
2020-06-22 18:57:39.871|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:40.771|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:40.773|add_items_thread |添加项目:4
2020-06-22 18:57:40.775|process_items_thread |正在处理项目:4
2020-06-22 18:57:40.918|process_items_thread |项目:4 已处理。
2020-06-22 18:57:40.918|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:41.815|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:41.818|add_items_thread |添加项目:5
2020-06-22 18:57:41.820|process_items_thread |正在处理项目:5
2020-06-22 18:57:41.970|process_items_thread |项目:5 已处理。
2020-06-22 18:57:41.972|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:42.868|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:42.869|add_items_thread |添加项目:6
2020-06-22 18:57:42.870|process_items_thread |正在处理项目:6
2020-06-22 18:57:42.973|process_items_thread |项目:6 已处理。
2020-06-22 18:57:42.974|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:43.872|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:43.874|add_items_thread |添加项目:7
2020-06-22 18:57:43.874|process_items_thread |正在处理项目:7
2020-06-22 18:57:44.003|process_items_thread |项目:7 已处理。
2020-06-22 18:57:44.003|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:44.876|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:44.878|add_items_thread |添加项目:8
2020-06-22 18:57:44.879|process_items_thread |正在处理项目:8
2020-06-22 18:57:45.005|process_items_thread |项目:8 已处理。
2020-06-22 18:57:45.006|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:45.880|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:45.882|add_items_thread |添加项目:9
2020-06-22 18:57:45.883|process_items_thread |正在处理项目:9
2020-06-22 18:57:46.012|process_items_thread |项目:9 已处理。
2020-06-22 18:57:46.013|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:46.886|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:57:46.888|add_items_thread |线程退出<此时程序停止,消费者线程正在等待队列中新的项目>
消费者线程并未如预期那样停止。这是因为使用了没有超时的阻塞式 Queue.get()
。当我们处理完队列中的最后一个项目(项目 9)后,在生产者能够标记完成之前,消费者线程就进入了下一个迭代。为了避免这个问题,我们需要修改代码,为阻塞式 Queue.get()
添加一个超时。
示例 4:为阻塞式提取添加超时,并处理超时引起的 queue.Empty
异常
我们在 Queue.get()
上强制执行超时。如果队列为空,它将触发 queue.Empty
异常。我们将超时的 Queue.get()
包装在 try
/except
块中,以处理 queue.Empty
异常。通过这两项更改,我们应该能最终修复消费者线程的挂起问题。
import queue
import threading
import time
import logging
def configure_logging():
log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
datefmt='%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=logging.INFO,
format=log_format,
datefmt=datefmt,
)
def add_items(processing_queue, num_items, queue_event):
for x in range(num_items):
processing_queue.put(x)
logging.info("Adding item: {}".format(x))
time.sleep(1)
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
queue_event.set()
logging.info("Exiting thread")
def process_items(processing_queue, queue_event):
while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
# Blocking get, timeout of 1 second
try:
x = processing_queue.get(timeout=1)
# Processing happens here
logging.info("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
logging.info("Item: {} processed.".format(x))
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
except queue.Empty as e:
logging.debug("Queue empty")
logging.info("Exiting thread")
def main():
configure_logging()
test_queue = queue.Queue(5)
work_finished_event = threading.Event()
# insertion thread
t_add = threading.Thread(target=add_items,
args=(test_queue, 10, work_finished_event), name="add_items_thread")
t_add.start()
# processing thread (new)
t_process = threading.Thread(target=process_items,
args=(test_queue, work_finished_event), name="process_items_thread")
t_process.start()
t_add.join()
t_process.join()
logging.info("Exiting thread")
if __name__ == "__main__":
main()
再次执行程序,结果如下:
引用2020-06-22 18:59:42.371|add_items_thread |添加项目:0
2020-06-22 18:59:42.376|process_items_thread |正在处理项目:0
2020-06-22 18:59:42.478|process_items_thread |项目:0 已处理。
2020-06-22 18:59:42.478|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:43.375|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:43.377|add_items_thread |添加项目:1
2020-06-22 18:59:43.378|process_items_thread |正在处理项目:1
2020-06-22 18:59:43.483|process_items_thread |项目:1 已处理。
2020-06-22 18:59:43.484|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:44.380|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:44.382|add_items_thread |添加项目:2
2020-06-22 18:59:44.384|process_items_thread |正在处理项目:2
2020-06-22 18:59:44.486|process_items_thread |项目:2 已处理。
2020-06-22 18:59:44.487|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:45.386|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:45.388|add_items_thread |添加项目:3
2020-06-22 18:59:45.389|process_items_thread |正在处理项目:3
2020-06-22 18:59:45.501|process_items_thread |项目:3 已处理。
2020-06-22 18:59:45.502|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:46.391|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:46.393|add_items_thread |添加项目:4
2020-06-22 18:59:46.393|process_items_thread |正在处理项目:4
2020-06-22 18:59:46.516|process_items_thread |项目:4 已处理。
2020-06-22 18:59:46.516|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:47.395|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:47.398|add_items_thread |添加项目:5
2020-06-22 18:59:47.399|process_items_thread |正在处理项目:5
2020-06-22 18:59:47.520|process_items_thread |项目:5 已处理。
2020-06-22 18:59:47.520|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:48.419|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:48.419|add_items_thread |添加项目:6
2020-06-22 18:59:48.419|process_items_thread |正在处理项目:6
2020-06-22 18:59:48.522|process_items_thread |项目:6 已处理。
2020-06-22 18:59:48.523|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:49.421|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:49.424|add_items_thread |添加项目:7
2020-06-22 18:59:49.427|process_items_thread |正在处理项目:7
2020-06-22 18:59:49.568|process_items_thread |项目:7 已处理。
2020-06-22 18:59:49.568|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:50.467|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:50.468|add_items_thread |添加项目:8
2020-06-22 18:59:50.468|process_items_thread |正在处理项目:8
2020-06-22 18:59:50.570|process_items_thread |项目:8 已处理。
2020-06-22 18:59:50.571|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:51.484|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:51.496|add_items_thread |添加项目:9
2020-06-22 18:59:51.497|process_items_thread |正在处理项目:9
2020-06-22 18:59:51.621|process_items_thread |项目:9 已处理。
2020-06-22 18:59:51.621|process_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:52.499|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 18:59:52.500|add_items_thread |线程退出
2020-06-22 18:59:52.622|process_items_thread |队列为空
2020-06-22 18:59:52.622|process_items_thread |线程退出
2020-06-22 18:59:52.623|MainThread |线程退出<此时程序正常停止。生产者和消费者线程完成>
正如你所见,消费者线程现在因为队列为空而抛出异常。这个异常现在被捕获,消费者线程继续进入下一个迭代。这次,程序正确地停止了所有线程!
在通用解决方案中,我可能需要添加更多的消费者线程。让我们看看如何做到这一点。
示例 5:添加消费者线程,添加线程列表
我修改了代码以包含多个消费者线程。我创建了一个线程列表,其中包含程序在退出前需要等待的所有线程。现在将有 1 个生产者线程和 5 个消费者线程。我将队列的最大大小设置为等于线程数,以使所有消费者线程始终保持工作状态。消费者线程的数量决定了处理速率。我们 join 所有线程,从生产者线程开始(它在列表的第一个)。消费者线程的名称现在被编号,以便我们知道哪个线程在做什么。
import queue
import threading
import time
import logging
def configure_logging():
log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
datefmt='%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=logging.INFO,
format=log_format,
datefmt=datefmt,
)
def add_items(processing_queue, num_items, queue_event):
for x in range(num_items):
processing_queue.put(x)
logging.info("Adding item: {}".format(x))
time.sleep(1)
logging.info("Processing queue size: {}. Remaining tasks: {}".format
(processing_queue.qsize(), processing_queue.unfinished_tasks))
queue_event.set()
logging.info("Exiting thread")
def process_items(processing_queue, queue_event):
while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
# Blocking get, timeout of 1 second
try:
x = processing_queue.get(timeout=1)
# Processing happens here
logging.info("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
logging.info("Item: {} processed.".format(x))
logging.info("Processing queue size: {}. Remaining tasks: {}".format
(processing_queue.qsize(), processing_queue.unfinished_tasks))
except queue.Empty as e:
logging.debug("Queue empty")
logging.info("Exiting thread")
def main():
max_queue_size = 5
configure_logging()
test_queue = queue.Queue(max_queue_size)
work_finished_event = threading.Event()
thread_list = list()
# insertion thread
t_add = threading.Thread(target=add_items,
args=(test_queue, 10, work_finished_event), name="add_items_thread")
t_add.start()
thread_list.append(t_add)
# processing thread (new)
for i in range(max_queue_size):
t_process = threading.Thread(target=process_items,
args=(test_queue, work_finished_event),
name="process_items_thread_{0:03d}".format(i))
t_process.start()
thread_list.append(t_process)
for t in thread_list:
t.join()
logging.info("Exiting thread")
if __name__ == "__main__":
main()
执行结果显示:
引用2020-06-22 19:00:27.598|add_items_thread |添加项目:0
2020-06-22 19:00:27.600|process_items_thread_000 |正在处理项目:0
2020-06-22 19:00:27.703|process_items_thread_000 |项目:0 已处理。
2020-06-22 19:00:27.704|process_items_thread_000 |队列大小:0。剩余任务:0
2020-06-22 19:00:28.603|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:28.604|add_items_thread |添加项目:1
2020-06-22 19:00:28.605|process_items_thread_002 |正在处理项目:1
2020-06-22 19:00:28.709|process_items_thread_002 |项目:1 已处理。
2020-06-22 19:00:28.709|process_items_thread_002 |队列大小:0。剩余任务:0
2020-06-22 19:00:29.606|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:29.606|add_items_thread |添加项目:2
2020-06-22 19:00:29.607|process_items_thread_004 |正在处理项目:2
2020-06-22 19:00:29.709|process_items_thread_004 |项目:2 已处理。
2020-06-22 19:00:29.712|process_items_thread_004 |队列大小:0。剩余任务:0
2020-06-22 19:00:30.608|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:30.610|add_items_thread |添加项目:3
2020-06-22 19:00:30.611|process_items_thread_000 |正在处理项目:3
2020-06-22 19:00:30.715|process_items_thread_000 |项目:3 已处理。
2020-06-22 19:00:30.716|process_items_thread_000 |队列大小:0。剩余任务:0
2020-06-22 19:00:31.613|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:31.615|add_items_thread |添加项目:4
2020-06-22 19:00:31.616|process_items_thread_002 |正在处理项目:4
2020-06-22 19:00:31.752|process_items_thread_002 |项目:4 已处理。
2020-06-22 19:00:31.752|process_items_thread_002 |队列大小:0。剩余任务:0
2020-06-22 19:00:32.617|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:32.617|add_items_thread |添加项目:5
2020-06-22 19:00:32.618|process_items_thread_004 |正在处理项目:5
2020-06-22 19:00:32.720|process_items_thread_004 |项目:5 已处理。
2020-06-22 19:00:32.721|process_items_thread_004 |队列大小:0。剩余任务:0
2020-06-22 19:00:33.619|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:33.619|add_items_thread |添加项目:6
2020-06-22 19:00:33.620|process_items_thread_000 |正在处理项目:6
2020-06-22 19:00:33.722|process_items_thread_000 |项目:6 已处理。
2020-06-22 19:00:33.723|process_items_thread_000 |队列大小:0。剩余任务:0
2020-06-22 19:00:34.621|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:34.623|add_items_thread |添加项目:7
2020-06-22 19:00:34.630|process_items_thread_004 |正在处理项目:7
2020-06-22 19:00:34.759|process_items_thread_004 |项目:7 已处理。
2020-06-22 19:00:34.763|process_items_thread_004 |队列大小:0。剩余任务:0
2020-06-22 19:00:35.656|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:35.661|add_items_thread |添加项目:8
2020-06-22 19:00:35.663|process_items_thread_000 |正在处理项目:8
2020-06-22 19:00:35.823|process_items_thread_000 |项目:8 已处理。
2020-06-22 19:00:35.828|process_items_thread_000 |队列大小:0。剩余任务:0
2020-06-22 19:00:36.709|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:36.712|add_items_thread |添加项目:9
2020-06-22 19:00:36.713|process_items_thread_002 |正在处理项目:9
2020-06-22 19:00:36.852|process_items_thread_002 |项目:9 已处理。
2020-06-22 19:00:36.853|process_items_thread_002 |队列大小:0。剩余任务:0
2020-06-22 19:00:37.715|add_items_thread |队列大小:0。剩余任务:0
2020-06-22 19:00:37.720|add_items_thread |线程退出
2020-06-22 19:00:37.767|process_items_thread_004 |线程退出
2020-06-22 19:00:37.832|process_items_thread_000 |线程退出
2020-06-22 19:00:37.854|process_items_thread_002 |线程退出
2020-06-22 19:00:38.605|process_items_thread_001 |线程退出
2020-06-22 19:00:38.611|process_items_thread_003 |线程退出
2020-06-22 19:00:38.681|MainThread |线程退出<此时程序正常停止。生产者和消费者线程完成>
由于我们同时等待 unfinished_tasks
计数归零和生产者完成 Event
被设置,所有消费者线程将一直尝试,直到没有正在执行的任务并且完成事件被设置。
示例 6:添加监控线程
我添加了一个监控线程来计算未完成任务的数量,并稍微清理消费者和生产者线程的日志混乱。这个线程还将等待完成事件被信号。监控线程将每 5 秒显示一次剩余任务计数。
import queue
import threading
import time
import logging
def configure_logging():
log_format="%(asctime)s.%(msecs)03d|%(threadName)-25s|%(message)s"
datefmt='%Y-%m-%d %H:%M:%S'
logging.basicConfig(level=logging.INFO,
format=log_format,
datefmt=datefmt,
)
def monitor_queue(processing_queue, queue_event):
while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
check_period = 5
logging.info("Processing queue size: {}.
Remaining tasks: {}".format(processing_queue.qsize(),
processing_queue.unfinished_tasks))
time.sleep(check_period-divmod(time.time(), check_period)[1])
logging.info("Exiting thread.
Remaining tasks: {}".format(processing_queue.unfinished_tasks))
def add_items(processing_queue, num_items, queue_event):
for x in range(num_items):
processing_queue.put(x)
logging.info("Adding item: {}".format(x))
time.sleep(1)
queue_event.set()
logging.info("Exiting thread")
def process_items(processing_queue, queue_event):
while (processing_queue.unfinished_tasks > 0 or not queue_event.isSet()):
# Blocking get, timeout of 1 second
try:
x = processing_queue.get(timeout=1)
# Processing happens here
logging.info("Processing item: {}".format(x))
# simulate processing time. Much slower than insertion
time.sleep(0.1)
processing_queue.task_done()
logging.info("Item: {} processed.".format(x))
except queue.Empty as e:
logging.debug("Queue empty")
logging.info("Exiting thread")
def main():
max_queue_size = 5
configure_logging()
test_queue = queue.Queue(max_queue_size)
work_finished_event = threading.Event()
thread_list = list()
# monitoring thread
t_monitor = threading.Thread(target=monitor_queue,
args=(test_queue, work_finished_event), name="monitor_queue_thread")
t_monitor.start()
thread_list.append(t_monitor)
# insertion thread
t_add = threading.Thread(target=add_items,
args=(test_queue, 10, work_finished_event), name="add_items_thread")
t_add.start()
thread_list.append(t_add)
# processing thread (new)
for i in range(max_queue_size):
t_process = threading.Thread(target=process_items,
args=(test_queue, work_finished_event), name="process_items_thread_{0:03d}".format(i))
t_process.start()
thread_list.append(t_process)
for t in thread_list:
t.join()
logging.info("Exiting thread")
if __name__ == "__main__":
main()
结果日志:
引用2020-06-22 19:01:25.566|monitor_queue_thread |队列大小:0。剩余任务:0
2020-06-22 19:01:25.568|add_items_thread |添加项目:0
2020-06-22 19:01:25.572|process_items_thread_000 |正在处理项目:0
2020-06-22 19:01:25.673|process_items_thread_000 |项目:0 已处理。
2020-06-22 19:01:26.573|add_items_thread |添加项目:1
2020-06-22 19:01:26.575|process_items_thread_002 |正在处理项目:1
2020-06-22 19:01:26.680|process_items_thread_002 |项目:1 已处理。
2020-06-22 19:01:27.577|add_items_thread |添加项目:2
2020-06-22 19:01:27.578|process_items_thread_004 |正在处理项目:2
2020-06-22 19:01:27.679|process_items_thread_004 |项目:2 已处理。
2020-06-22 19:01:28.579|add_items_thread |添加项目:3
2020-06-22 19:01:28.579|process_items_thread_000 |正在处理项目:3
2020-06-22 19:01:28.680|process_items_thread_000 |项目:3 已处理。
2020-06-22 19:01:29.580|add_items_thread |添加项目:4
2020-06-22 19:01:29.582|process_items_thread_004 |正在处理项目:4
2020-06-22 19:01:29.685|process_items_thread_004 |项目:4 已处理。
2020-06-22 19:01:30.000|monitor_queue_thread |队列大小:0。剩余任务:0
2020-06-22 19:01:30.583|add_items_thread |添加项目:5
2020-06-22 19:01:30.583|process_items_thread_002 |正在处理项目:5
2020-06-22 19:01:30.686|process_items_thread_002 |项目:5 已处理。
2020-06-22 19:01:31.584|add_items_thread |添加项目:6
2020-06-22 19:01:31.584|process_items_thread_000 |正在处理项目:6
2020-06-22 19:01:31.686|process_items_thread_000 |项目:6 已处理。
2020-06-22 19:01:32.586|add_items_thread |添加项目:7
2020-06-22 19:01:32.586|process_items_thread_004 |正在处理项目:7
2020-06-22 19:01:32.688|process_items_thread_004 |项目:7 已处理。
2020-06-22 19:01:33.587|add_items_thread |添加项目:8
2020-06-22 19:01:33.588|process_items_thread_000 |正在处理项目:8
2020-06-22 19:01:33.691|process_items_thread_000 |项目:8 已处理。
2020-06-22 19:01:34.590|add_items_thread |添加项目:9
2020-06-22 19:01:34.590|process_items_thread_004 |正在处理项目:9
2020-06-22 19:01:34.694|process_items_thread_004 |项目:9 已处理。
2020-06-22 19:01:35.003|monitor_queue_thread |队列大小:0。剩余任务:0
2020-06-22 19:01:35.594|add_items_thread |线程退出
2020-06-22 19:01:35.691|process_items_thread_002 |线程退出
2020-06-22 19:01:35.691|process_items_thread_000 |线程退出
2020-06-22 19:01:35.695|process_items_thread_004 |线程退出
2020-06-22 19:01:36.579|process_items_thread_001 |线程退出
2020-06-22 19:01:36.581|process_items_thread_003 |线程退出
2020-06-22 19:01:40.001|monitor_queue_thread |线程退出。剩余任务:0
2020-06-22 19:01:40.004|MainThread |线程退出
回顾
我继续改进了上一篇文章中的程序。现在,当没有更多可用项目时,它会正常停止。使用了多个消费者线程。我还添加了一些日志记录并稍微清理了程序。
接下来是什么?
在下一篇文章中,我将增加生产者线程的数量,并解释如何构建程序。
历史
- 2020 年 8 月 21 日:初始版本