队列 — 线程安全的FIFO实现
queue
模块提供了一个适合多线程编程的先入、先出(FIFO)数据结构。它可以用来安全地传递生产者和消费者线程之间的消息或其他数据。由于线程安全,多线程可以安全地处理同一个Queue实例。
基本的FIFO队列
Queue类实现基本的先进先出容器
put()
-- 从队尾添加元素
get()
-- 从队首删除元素,并返回该元素
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
这个例子使用一个线程来说明元素添加与删除的顺序相同,即先进先出。 结果:
0 1 2 3 4
一个小例子:击鼓传炸弹
代码实现:
import queue
import random
q = queue.Queue()
nameList = ['A', 'B', 'C', 'D', 'E', 'F']
for name in nameList:
q.put(name)
while not q.qsize() == 1:
num = int(random.uniform(0, 20))
print(f'num={num}', end=' ')
while num: # 随机转动num次
q.put(q.get())
num -= 1
outName = q.get()
print(f'outName={outName}')
print(f'winner={q.get()}')
运行结果:
num=2 outName=C
num=6 outName=E
num=7 outName=D
num=0 outName=F
num=11 outName=B
winner=A
LIFO队列
与队列的标准FIFO实现不同,LifoQueue使用后进先出顺序(通常与栈相关联)。
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
结果:
4 3 2 1 0
优先级队列 Priority Queue
有时候,队列中的项目的处理顺序需要基于这些项的特征,而不仅仅是它们被创建或添加到队列中的顺序。例如,来自工资部门的打印作业可能优先于开发人员想要打印的代码清单。PriorityQueue使用队列内容的排序顺序来决定要检索哪一项。
import functools
import queue
import threading
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
结果:
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
一点解释 :
task_done()
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
即每一个由put()调用入队的任务都有一个对应的task_done()调用。
join()
阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
put(item[, block[, timeout]])
将item放入队列中。
- 如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。
- 如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。
- 如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
其非阻塞版本为put_nowait等同于put(item, False)
get([block[, timeout]])
从队列中移除并返回一个数据。block跟timeout参数同put方法
其非阻塞方法为get_nowait()相当与get(False)
empty()
如果队列为空,返回True,反之返回False