本文链接:https://www.cnblogs.com/tujia/p/13637535.html
背景:python 队列 queue.Queue 或 multiprcessing.Queue 或其他队列在写入队列或从队列中读取元素时,都有可能会发生线程阻塞。
下面来说一下阻塞的类型,然后怎么避免阻塞~
一、阻塞的类型
队列的阻塞分为:入队(put)时的阻塞、出队(get)时的阻塞、整体(join)的阻塞(消费的阻塞)
二、入队的阻塞
import queue def 入队阻塞(): q = queue.Queue(maxsize=3) for i in range(4): q.put('任务' + str(i+1)) print('Finished') if __name__ == '__main__': 入队阻塞()
注:因为定义的队列的 maxsize=3,但 put 了4个元素进队列,第4个元素将无法 put 进队列,发生阻塞;注意:就算不设置 maxsize,电脑的内存也是有限的,队列也是会满的。当队列已满,做 put 操作时,一样会发生阻塞。
正确的处理方法:
import queue def 入队阻塞(): q = queue.Queue(maxsize=3) for i in range(4): try: q.put('任务' + str(i+1), block=True, timeout=3) except queue.Full: print('任务%d: 队列已满,写入失败' % (i+1)) print('Finished') if __name__ == '__main__': 入队阻塞()
注:设置 timeout 超时时间,并捕捉 queue.Full 异常;设置tomeout一样会阻塞线程,但timeout之后,可以继续操行程序。如果不想使用 timeout 选项,也可以直接设置 block(阻塞) 为 False,或者直接使用 q.put_nowait 方法(注意:当队列已满的时候 ,put_nowait 一样会触发 queue.Full 异常)
三、出队的阻塞
import queue def 出队阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任务' + str(i+1)) except queue.Full: print('full') for i in range(4): task = q.get() print(task) print('Finished') if __name__ == '__main__': 出队阻塞()
注:队列里只有3个元素,但get了4次。第4次get的时候,不会返回空,而是会发生阻塞。
正确的处理方法:
import queue def 出队阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任务' + str(i+1)) except queue.Full: print('full') for i in range(4): try: task = q.get(block=True, timeout=3) print(task) except queue.Empty: print('队列为空,get失败') print('Finished') if __name__ == '__main__': 出队阻塞()
注:设置 timeout 超时时间,并捕捉 queue.Empty 异常;设置tomeout一样会阻塞线程,但timeout之后,可以继续操行程序。如果不想使用 timeout 选项,也可以直接设置 block(阻塞) 为 False,或者直接使用 q.get_nowait 方法(注意:当队列为空的时候 ,get_nowait 一样会触发 queue.Empty 异常)
四、消费阻塞(正确来说,应该是未消费完时的阻塞)
import queue def 消费阻塞(): q = queue.Queue(maxsize=3) for i in range(3): try: q.put_nowait('任务' + str(i+1)) except queue.Full: print('full') for i in range(2): try: task = q.get(block=True, timeout=3) print(task) q.task_done() except queue.Empty: print('队列为空,get失败') # 阻塞队列 q.join() print('Finished') if __name__ == '__main__': 消费阻塞()
注:队列里设置了3个任务,但只调用了两次 task_done(标记两个任务已完成),还有一个任务未处理,队列将阻塞至第三个任务被消费(标志为 task_done)
上面说完了各种阻塞,下面来说一下阻塞作用~~
五、入队阻塞的作用
很明显,当我要做入队操作时,如果队列已满时,我不会说马上掉头就走,而是会等一下,看有没有人出队,然后,我就可以挤上去了。这就是入队阻塞的作用。
例如异步(asyncio)或多线程(Thread)操作同一个队列(queue),下面看一下使用 asyncio 异步操作 Queue 的例子:
import time import queue import asyncio def get_now(): return time.strftime('%X') # 入队 async def qput(q): for i in range(5): # 每1秒写入一个元素 await asyncio.sleep(1) try: await q.put(i) print('%s: %d 入队' % (get_now(), i)) except queue.Full: print('Full') # 出队 async def qget(q): for i in range(5): # 每2秒消费一个元素 await asyncio.sleep(2) try: item = await q.get() print('%s: %d 出队' % (get_now(), item)) except queue.Empty: print('Empty') async def main(): q = asyncio.Queue(maxsize=3) print('%s: Start' % get_now()) await asyncio.gather(qput(q), qget(q)) print('%s: Finished' % get_now()) if __name__ == '__main__': asyncio.run(main())
运行结果大概是这样:
六、出队阻塞的作用
出队阻塞和入队阻塞是一样的。假设你是一个包工头,看到应聘的队列里没有人,不要着急着马上走啊,等一下可能就有人过来应聘了。这就是 get 阻塞的作用。
下面来看一下 asyncio 异步操作 queue 的例子:
import time import queue import asyncio import random def get_now(): return time.strftime('%X') # 招工 async def 招工(q): print('包工头:招人了喂,管吃管喝、五险一金~') worker_count = 0 for i in range(q.maxsize): try: # 就等10秒 worker = await asyncio.wait_for(q.get(), timeout=10) worker_count = worker_count + 1 print('%s: 面试【%s】,通过/入职' % (get_now(), worker)) except asyncio.TimeoutError: # 10秒内都没人来,直接提前下班了 print('包工头:唉,都没人来应聘,今天只能提前下班了~') exit(0) print('包工头:招够了,可以下班了~~') # 应聘 async def 应聘(q): workers = ['张三', '李四', '王五', '赵六', '陈七'] for name in workers: # 不定时有人来应聘。注:时间要控制到10秒内,10秒内都没人来,包工头就要提前下班了 await asyncio.sleep(random.randint(1, 10)) try: await q.put(name) print('%s: 【%s】 去应聘了' % (get_now(), name)) except queue.Full: print('Full') async def main(): # 上级给任务了,要招够5个人 q = asyncio.Queue(maxsize=5) print('%s: Start' % get_now()) await asyncio.gather(招工(q), 应聘(q)) print('%s: Finished' % get_now()) if __name__ == '__main__': asyncio.run(main())
运行结果大概是这样:
注:须要注意一下,asyncio 操作 queue 时,不能用原生的 queue.Queue,要用 asyncio.Queue
参考链接:
https://docs.python.org/zh-cn/3.7/library/queue.html
https://docs.python.org/zh-cn/3.7/library/asyncio-task.html
https://docs.python.org/zh-cn/3.7/library/asyncio-queue.html
本文链接:https://www.cnblogs.com/tujia/p/13637535.html
完。