zoukankan      html  css  js  c++  java
  • python-queue学习

            Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

    来看下官方文档说明:

    queue 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程。模块中的 Queue 类实现了所有所需的锁定语义。

    模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序。在 FIFO 队列中,先添加的任务先取回。在 LIFO 队列中,最近被添加的条目先取回(操作类似一个堆栈)。优先级队列中,条目将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回。

    在内部,这三个类型的队列使用锁来临时阻塞竞争线程;然而,它们并未被设计用于线程的重入性处理。

    此外,模块实现了一个 "简单的" FIFO 队列类型, SimpleQueue ,这个特殊实现为小功能在交换中提供额外的保障。

    queue 模块定义了下列类和异常:

    class queue.Queue(maxsize=0)

    Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

    class queue.LifoQueue(maxsize=0)

    LIFO 队列构造函数。 maxsize 是个整数,用于设置可以放入队列中的项目数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的项目被消费掉。如果 maxsize 小于等于零,队列尺寸为无限大。

    class queue.PriorityQueue(maxsize=0)

    优先级队列构造函数。 maxsize 是个整数,用于设置可以放入队列中的项目数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的项目被消费掉。如果 maxsize 小于等于零,队列尺寸为无限大。

    最小值先被取出( 最小值条目是由 sorted(list(entries))[0] 返回的条目)。条目的典型模式是一个以下形式的元组: (priority_number, data) 。

    如果 data 元素没有可比性,数据将被包装在一个类中,忽略数据值,仅仅比较优先级数字 :

    from dataclasses import dataclass, field
    from typing import Any
    
    @dataclass(order=True)
    class PrioritizedItem:
        priority: int
        item: Any=field(compare=False)
    class queue.SimpleQueue

    无界的 FIFO 队列构造函数。简单的队列,缺少任务跟踪等高级功能。

    3.7 新版功能.

    exception queue.Empty

    对空的 Queue 对象,调用非阻塞的 get() (or get_nowait()) 时,引发的异常。

    exception queue.Full

    对满的 Queue 对象,调用非阻塞的 put() (or put_nowait()) 时,引发的异常。

    Queue对象

    队列对象 (QueueLifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。

    Queue.qsize()

    返回队列的大致大小。注意,qsize() > 0 不保证后续的 get() 不被阻塞,qsize() < maxsize 也不保证 put() 不被阻塞。

    Queue.empty()

    如果队列为空,返回 True ,否则返回 False 。如果 empty() 返回 True ,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False ,也不保证后续调用的 get() 不被阻塞。

    Queue.full()

    如果队列是满的返回 True ,否则返回 False 。如果 full() 返回 True 不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False 也不保证后续调用的 put() 不被阻塞。

    Queue.put(itemblock=Truetimeout=None)

    将 item 放入队列。如果可选参数 block 是 true 并且 timeout 是 None (默认),则在必要时阻塞至有空闲插槽可用。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间没有可用的空闲插槽,将引发 Full 异常。反之 (block 是 false),如果空闲插槽立即可用,则把 item 放入队列,否则引发 Full 异常 ( 在这种情况下,timeout 将被忽略)。

    Queue.put_nowait(item)

    相当于 put(item, False) 。

    Queue.get(block=Truetimeout=None)

    从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)。

    POSIX系统3.0之前,以及所有版本的Windows系统中,如果 block 是 true 并且 timeout 是 None , 这个操作将进入基础锁的不间断等待。这意味着,没有异常能发生,尤其是 SIGINT 将不会触发 KeyboardInterrupt 异常。

    Queue.get_nowait()

    相当于 get(False) 。

    提供了两个方法,用于支持跟踪 排队的任务 是否 被守护的消费者线程 完整的处理。

    Queue.task_done()

    表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。

    如果 join() 当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个 put() 进队列的条目的 task_done() 都被收到)。

    如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。

    Queue.join()

    阻塞至队列中所有的元素都被接收和处理完毕。

    当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者线程调用 task_done() 表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候, join() 阻塞被解除。

    下面练习看下:

    pro_q = Queue(maxsize=5)
    
    
    def producer():
        while True:
            for i in range(100):
                pro_q.put(i)
                print("生产一个新队列{0}".format(i))
                print("生产者先进先出队列:{0}".format(pro_q.queue))
                print("队列是否为空{0},队列大小{1},队列是否填满{2}".format(
                    pro_q.empty(), pro_q.qsize(), pro_q.full()))
                print("休息2秒")
                time.sleep(2)
    
    
    def consumer():
        while True:
            ret = pro_q.get()
            print("消费一个队列{0}".format(ret))
            print("休息1秒")
            time.sleep(1)
    
    
    if __name__ == "__main__":
        producer()
        consumer()

    当maxsize=5时,就阻塞了:

    生产一个新队列0
    生产者先进先出队列:deque([0])
    队列是否为空False,队列大小1,队列是否填满False
    休息2秒
    生产一个新队列1
    生产者先进先出队列:deque([0, 1])
    队列是否为空False,队列大小2,队列是否填满False
    休息2秒
    生产一个新队列2
    生产者先进先出队列:deque([0, 1, 2])
    队列是否为空False,队列大小3,队列是否填满False
    休息2秒
    生产一个新队列3
    生产者先进先出队列:deque([0, 1, 2, 3])
    队列是否为空False,队列大小4,队列是否填满False
    休息2秒
    生产一个新队列4
    生产者先进先出队列:deque([0, 1, 2, 3, 4])
    队列是否为空False,队列大小5,队列是否填满True
    休息2秒
    pro_q = Queue(maxsize=5)
    
    
    def producer():
        while True:
            for i in range(100):
                pro_q.put(i,timeout=3)
                print("生产一个新队列{0}".format(i))
                print("生产者先进先出队列:{0}".format(pro_q.queue))
                print("队列是否为空{0},队列大小{1},队列是否填满{2}".format(
                    pro_q.empty(), pro_q.qsize(), pro_q.full()))
                print("休息2秒")
                time.sleep(2)
    
    
    def consumer():
        while True:
            ret = pro_q.get()
            print("消费一个队列{0}".format(ret))
            print("休息1秒")
            time.sleep(1)
    
    
    if __name__ == "__main__":
        producer()
        consumer()
    配置timeout
    生产一个新队列0
    生产者先进先出队列:deque([0])
    队列是否为空False,队列大小1,队列是否填满False
    休息2秒
    生产一个新队列1
    生产者先进先出队列:deque([0, 1])
    队列是否为空False,队列大小2,队列是否填满False
    休息2秒
    生产一个新队列2
    生产者先进先出队列:deque([0, 1, 2])
    队列是否为空False,队列大小3,队列是否填满False
    休息2秒
    生产一个新队列3
    生产者先进先出队列:deque([0, 1, 2, 3])
    队列是否为空False,队列大小4,队列是否填满False
    休息2秒
    生产一个新队列4
    生产者先进先出队列:deque([0, 1, 2, 3, 4])
    队列是否为空False,队列大小5,队列是否填满True
    休息2秒
    Traceback (most recent call last):
      File "d:/data/python/zabbixapiweb/project/project/test15.py", line 68, in <module>
        producer()
      File "d:/data/python/zabbixapiweb/project/project/test15.py", line 50, in producer
        pro_q.put(i,timeout=3)
      File "c:usersadministratorappdatalocalprogramspythonpython38libqueue.py", line 147, in put
        raise Full
    queue.Full
    就会抛异常
    pro_q = Queue(maxsize=5)
    
    
    def producer():
        while True:
            for i in range(100):
                pro_q.put(i,block=False,timeout=3)
                print("生产一个新队列{0}".format(i))
                print("生产者先进先出队列:{0}".format(pro_q.queue))
                print("队列是否为空{0},队列大小{1},队列是否填满{2}".format(
                    pro_q.empty(), pro_q.qsize(), pro_q.full()))
                print("休息2秒")
                time.sleep(2)
    
    
    def consumer():
        while True:
            ret = pro_q.get()
            print("消费一个队列{0}".format(ret))
            print("休息1秒")
            time.sleep(1)
    
    
    if __name__ == "__main__":
        producer()
        consumer()
    当block=False,则直接抛异常
    生产一个新队列0
    生产者先进先出队列:deque([0])
    队列是否为空False,队列大小1,队列是否填满False
    休息2秒
    生产一个新队列1
    生产者先进先出队列:deque([0, 1])
    队列是否为空False,队列大小2,队列是否填满False
    休息2秒
    生产一个新队列2
    生产者先进先出队列:deque([0, 1, 2])
    队列是否为空False,队列大小3,队列是否填满False
    休息2秒
    生产一个新队列3
    生产者先进先出队列:deque([0, 1, 2, 3])
    队列是否为空False,队列大小4,队列是否填满False
    休息2秒
    生产一个新队列4
    生产者先进先出队列:deque([0, 1, 2, 3, 4])
    队列是否为空False,队列大小5,队列是否填满True
    休息2秒
    Traceback (most recent call last):
      File "d:/data/python/zabbixapiweb/project/project/test15.py", line 68, in <module>
        producer()
      File "d:/data/python/zabbixapiweb/project/project/test15.py", line 50, in producer
        pro_q.put(i,block=False,timeout=3)
      File "c:usersadministratorappdatalocalprogramspythonpython38libqueue.py", line 136, in put
        raise Full
    queue.Full
    View Code

    使用多线程看看效果:

    pro_q = Queue(maxsize=5)
    
    
    def producer():
        while True:
            for i in range(10):
                pro_q.put(i)
                print("生产一个新队列{0}".format(i))
                print("生产者先进先出队列:{0}".format(pro_q.queue))
                print("队列是否为空{0},队列大小{1},队列是否填满{2}".format(
                    pro_q.empty(), pro_q.qsize(), pro_q.full()))
                print("生产者休息2秒")
                time.sleep(2)
            break
    
    
    def consumer():
        while True:
            ret = pro_q.get()
            print("消费一个队列{0}".format(ret))
            print("任务处理状态{0}".format(pro_q.task_done))
            print("消费者休息1秒")
            time.sleep(1)
    
    
    if __name__ == "__main__":
        Thread1 = Thread(target=producer)
        Thread2 = Thread(target=consumer)
        Thread1.start()
        Thread2.start()
    生产一个新队列0
    生产者先进先出队列:deque([0])
    队列是否为空False,队列大小1,队列是否填满False
    生产者休息2秒
    消费一个队列0
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    生产一个新队列1
    生产者先进先出队列:deque([])
    消费一个队列1
    队列是否为空True,队列大小0,队列是否填满False
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    生产者休息2秒
    消费者休息1秒
    生产一个新队列2
    生产者先进先出队列:deque([])
    消费一个队列2
    队列是否为空True,队列大小0,队列是否填满False
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    生产者休息2秒
    消费者休息1秒
    生产一个新队列3
    生产者先进先出队列:deque([])
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    消费一个队列3
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    生产一个新队列4
    生产者先进先出队列:deque([])
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    消费一个队列4
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    生产一个新队列5
    生产者先进先出队列:deque([])
    消费一个队列5
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    生产一个新队列6
    消费一个队列6
    生产者先进先出队列:deque([])
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    生产一个新队列7
    消费一个队列7
    生产者先进先出队列:deque([])
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    队列是否为空True,队列大小0,队列是否填满False
    消费者休息1秒
    生产者休息2秒
    生产一个新队列8
    消费一个队列8
    生产者先进先出队列:deque([])
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    生产一个新队列9
    生产者先进先出队列:deque([])
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    消费一个队列9
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x00000151CDE1BDC0>>
    消费者休息1秒
    执行结果

    把生产者和消费者的休息时间对调

    pro_q = Queue(maxsize=5)
    
    
    def producer():
        while True:
            for i in range(10):
                pro_q.put(i)
                print("生产一个新队列{0}".format(i))
                print("生产者先进先出队列:{0}".format(pro_q.queue))
                print("队列是否为空{0},队列大小{1},队列是否填满{2}".format(
                    pro_q.empty(), pro_q.qsize(), pro_q.full()))
                print("生产者休息2秒")
                time.sleep(1)
            break
    
    
    def consumer():
        while True:
            ret = pro_q.get()
            print("消费一个队列{0}".format(ret))
            print("任务处理状态{0}".format(pro_q.task_done))
            print("消费者休息1秒")
            time.sleep(2)
    
    
    if __name__ == "__main__":
        Thread1 = Thread(target=producer)
        Thread2 = Thread(target=consumer)
        Thread1.start()
        Thread2.start()
    View Code
    生产一个新队列0
    生产者先进先出队列:deque([0])
    消费一个队列0
    队列是否为空True,队列大小0,队列是否填满False
    生产者休息2秒
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    生产一个新队列1
    生产者先进先出队列:deque([1])
    队列是否为空False,队列大小1,队列是否填满False
    生产者休息2秒
    消费一个队列1
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    生产一个新队列2
    生产者先进先出队列:deque([2])
    队列是否为空False,队列大小1,队列是否填满False
    生产者休息2秒
    生产一个新队列3
    生产者先进先出队列:deque([2, 3])
    队列是否为空False,队列大小2,队列是否填满False
    生产者休息2秒
    消费一个队列2
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    生产一个新队列4
    生产者先进先出队列:deque([3, 4])
    队列是否为空False,队列大小2,队列是否填满False
    生产者休息2秒
    生产一个新队列5
    生产者先进先出队列:deque([3, 4, 5])
    队列是否为空False,队列大小3,队列是否填满False
    生产者休息2秒
    消费一个队列3
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    生产一个新队列6
    生产者先进先出队列:deque([4, 5, 6])
    队列是否为空False,队列大小3,队列是否填满False
    生产者休息2秒
    生产一个新队列7
    生产者先进先出队列:deque([4, 5, 6, 7])
    队列是否为空False,队列大小4,队列是否填满False
    生产者休息2秒
    消费一个队列4
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    生产一个新队列8
    生产者先进先出队列:deque([5, 6, 7, 8])
    队列是否为空False,队列大小4,队列是否填满False
    生产者休息2秒
    生产一个新队列9
    生产者先进先出队列:deque([5, 6, 7, 8, 9])
    队列是否为空False,队列大小5,队列是否填满True
    生产者休息2秒
    消费一个队列5
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    消费一个队列6
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    消费一个队列7
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    消费一个队列8
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    消费一个队列9
    任务处理状态<bound method Queue.task_done of <queue.Queue object at 0x0000018E7B0CBDC0>>
    消费者休息1秒
    执行结果
  • 相关阅读:
    PHP 命令行参数解析工具类
    【日本软件外包】设计书中常用到的文型
    php沙盒测试 http://sandbox.onlinephpfunctions.com/ SQL语句格式化 https://www-atl.blog.so-net.ne.jp/2015-02-08
    intra-mart
    maven安装和eclipse集成
    MyEclipse破解
    pdf 中画虚线
    方法名同类名相同如果没有__construct,会被当做构造函数。
    ESA2GJK1DH1K微信小程序篇: 源码使用注意事项和程序优化
    GPRS(Air202) Lua开发: OLED显示二维码,信号强度,电池电量
  • 原文地址:https://www.cnblogs.com/laonicc/p/14451741.html
Copyright © 2011-2022 走看看