zoukankan      html  css  js  c++  java
  • 【Python之路Day11】网络篇之生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发的问题。该模式通过平衡生产线成和消费者线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式?

    在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式?

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者之间彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔该阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

    以上概念来自 --InfoQ

    上述我们说的容器,就是一个消息队列,在Python中,Queue模块实现了多生产者、多消费者队列。

    队列的使用场景在现实中应用太广泛了,比如说:

    • 提高并发
    • 流量削峰
    • 程序解耦
    • ...

    Queue模块实现了三类队列:

    • FIFO(First In First Out), 先进先出, 最早加入的任务会被先得到;
    • LIFO(Last In First Out), 后进先出,最后加入的任务会被最早先得到(就像栈一样).
    • 队列优先级,权重,任务被保持有序,拥有最小值的任务(优先级最高) 被最先得到。

    Queue 模块定义了下列的类和异常:

    class Queue.Queue(maxsize=0)
    构造一个FIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

    class Queue.LifoQueue(maxsize=0)
    构造一个LIFO队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

    class Queue.PriorityQueue(maxsize=0)

    构造一个优先队列。maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

    拥有最小值的任务会被最先得到(sorted(list(entries))[0]的返回值即为拥有最小值的任务)。任务的典型模式就是如(priority_number, data)这样的元组。

    exception Queue.Empty
    在空的Queue对象上调用非阻塞的get()(或者get_nowait())会抛出此异常。

    exception Queue.Full
    在满的Queue对象上调用非阻塞的put()(或者put_nowait())会抛出此异常。

    Queue对象的方法:

    Queue.qsize()
    返回队列的近似大小。注意,队列大小大于0并不保证接下来的get()调用不会被阻塞,队列大小小于maxsize也不保证接下来的put()调用不会被阻塞。

    Queue.empty()
    如果队列为空返回True,否则返回False。如果empty()返回True并不保证接下来的put()调用不会被阻塞。类似的,如果empty()返回False也不能保证接下来的get()调用不会被阻塞。

    Queue.full()
    如果队列是满的返回True,否则返回False。如果full()返回True并不能保证接下来的get()调用不会被阻塞。类似的,如果full()返回False并不能保证接下来的put()调用不会被阻塞。

    Queue.put(item[, block[, timeout]])
    将item放入队列中。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),如有必要(比如队列满),阻塞调用线程,直到有空闲槽可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空闲槽可用,抛出Full异常(带超时的阻塞调用)。如果block为假,如果有空闲槽可用将数据放入队列,否则立即抛出Full异常(非阻塞调用,timeout被忽略)。

    Queue.put_nowait(item)
    等同于put(item, False)(非阻塞调用)。

    Queue.get([block[, timeout]])
    从队列中移除并返回一个数据。如果可选的参数block为真且timeout为空对象(默认的情况,阻塞调用,无超时),阻塞调用进程直到有数据可用。如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无数据可用,抛出Empty异常(带超时的阻塞调用)。如果block为假,如果有数据可用返回数据,否则立即抛出Empty异常(非阻塞调用,timeout被忽略)。

    Queue.get_nowait()
    等同于get(False)(非阻塞调用)。

    为了跟踪入队任务被消费者线程完全的处理掉,Queue对象提供了两个额外的方法。

    Queue.task_done()
    意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

    如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

    如果该方法被调用的次数多于被放入队列中的任务的个数,ValueError异常会被抛出。

    Queue.join()
    阻塞调用线程,直到队列中的所有任务被处理掉。

    只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

    import queue
    
    q = queue.Queue()  #先进先出
    q.put(1)    #往队列里放三个值
    q.put(2)
    q.put(3)
    
    res = q.qsize()  #查看队列大小
    print(res)
    #q.get(timeout=3)   #如果队列已经空了,那么默认会阻塞, 可以指定值block为false, 不阻塞直接报错.
    
    print('第一次取值',q.get() )   #取出队列的一个值
    q.task_done()
    
    print('第二次取值:',q.get())
    q.task_done()
    
    print('第三次取值:',q.get())
    q.task_done()  #每取出一个需要告诉join
    q.join()       #阻塞调用线程,知道队列中的所有任务被处理掉.
    print('此时队列剩余:',q.qsize())

    代码执行结果:

    3
    第一次取值 1
    第二次取值: 2
    第三次取值: 3
    此时队列剩余: 0
    import queue
    
    q = queue.LifoQueue() #后进先出
    q.put(1)    #往队列里放三个值
    q.put(2)
    q.put(3)
    
    res = q.qsize()  #查看队列大小
    print(res)
    #q.get(timeout=3)   #如果队列已经空了,那么默认会阻塞, 可以指定值block为false, 不阻塞直接报错.
    
    print('第一次取值',q.get() )   #取出队列的一个值
    q.task_done()
    
    print('第二次取值:',q.get())
    q.task_done()
    
    print('第三次取值:',q.get())
    q.task_done()  #每取出一个需要告诉join
    q.join()       #阻塞调用线程,知道队列中的所有任务被处理掉.
    print('此时队列剩余:',q.qsize())
    
    #代码执行结果:
    3
    第一次取值 3
    第二次取值: 2
    第三次取值: 1
    此时队列剩余: 0
    import queue
    
    q = queue.PriorityQueue()  #优先级队列
    q.put((2,1))    #往队列里放三个值, 传入的时候需要传入一个元组, 第一个值是优先级,数字越小,优先级越高
    q.put((3,2))
    q.put((1,3))
    
    res = q.qsize()  #查看队列大小
    print(res)
    #q.get(timeout=3)   #如果队列已经空了,那么默认会阻塞, 可以指定值block为false, 不阻塞直接报错.
    
    print('第一次取值',q.get() )   #取出队列的一个值
    q.task_done()
    
    print('第二次取值:',q.get())
    q.task_done()
    
    print('第三次取值:',q.get())
    q.task_done()  #每取出一个需要告诉join
    q.join()       #阻塞调用线程,知道队列中的所有任务被处理掉.
    print('此时队列剩余:',q.qsize())
    
    
    #代码执行结果:
    3
    第一次取值 (1, 3)   #数字越小,优先级越高
    第二次取值: (2, 1)
    第三次取值: (3, 2)
    此时队列剩余: 0
  • 相关阅读:
    前端
    Spring AOP知识点整理
    【转载】spring aop 面试考点
    【转载】MDC 是什么?
    【转载】在分布式项目中,每个服务器的日志从产生,到集中到一个统一日志平台的流程是什么,中间都有那些过程?
    【转载】门面日志如何自动发现日志组件
    【转载】ArrayList从源码看扩容实现
    【原创】Ajax实现方式
    【转载】servlet与springMVC的差别
    【转载】serlvet
  • 原文地址:https://www.cnblogs.com/dubq/p/5688921.html
Copyright © 2011-2022 走看看