zoukankan      html  css  js  c++  java
  • python 进程队列实现生产者消费者模型

    一. 队列

      引入队列 : from multiprocess import Queue,JoinableQueue(继承自Queue,所以可以使用Queue的方法)

      创建队列 : Queue( maxsize )  #maxsize为队列允许的最大对列数,省略则不限制大小

      实例化队列的具体使用方法 :

        q = Queue()

        q.get(  ,biock , timeout   ) : 取出队列中的某一项.如果队列为空,此方法将阻塞,知道队列中有东西可取.block用于控制阻塞行为,默认为True,如果设置为False,将不会等待,直接引发Queue.Empty异常. timeout是可选超时时间,用于阻塞,如果在设定的时间间隔内队列内还是没有东西可取,将引发Queue.Empty异常.

        q.get_nowait() : 如果队列为空,不阻塞等待,直接引发Queue.Empty异常

        q.put(  item , block ,timeout ) : 将item放入队列.如果队列已满,此方法将阻塞,直到队列中有空间放下item. block控制阻塞,默认为True,如果设置为False,将引发Queue.Full异常. timeout指定在阻塞模式中等待可用空间的时间长短,超时后将引发Queue.Full异常.

        q.put_nowait() : 如果对列已满,不阻塞等待,直接引发Queue.Full异常.

        q.qsize() : 返回队列目前项目的正确数量. 此函数的结果不可靠,因为再返回结果和在稍后程序中使用结果之间,对列可能添加或删除了某一个项目.

        q.empty() : 判断对列是否为空,如果空返回True. 判断结果不可靠,在返回和使用结果之间,对列就有可能已经加入了新的项目.

        q.full() : 判断对列是否已满,如果已满返回True,结果不可靠,理由同上.

        q.close() : 关闭队列,防止对列加入更多的数据.关闭队列不会给队列使用者返回任何数据结束信号或异常.

      实例化队列具体用法 :

        j = JoinableQueue( maxsize )  #maxsize为队列允许的最大对列数,省略则不限制大小

        j.task_done() : 用于消费者,标识每取出队列中一个数据,就会给join一个标识

        j.join() : 生产者使用此方法进行阻塞,直到队列中所有数据均被处理完成.相当于一个计数器,消费者没消费一个数据,join就会接收task_done发来的一个标识,直到消费者返回所有标识时,join就知道所有数据已经全部被消费掉了.

       

    二. 生产者消费者模型(队列)

      主要是为了消除耦合性

      即供需不平衡问题. 此模型将生产者生产的产品放到一个独立的空间中,消费者在空间内取产品消费,这样生产者和消费者就没有了直接的供求关系.

      生产者消费者模型代码一(单个生产者消费者) : 

    from multiprocessing import Process,Queue
    
    def xiaofei(q,name):
        while 1:
            x = q.get()#取出队列中的内容
            if x:#判断取出的额内容是不是空,不是空打印
                print('%s消费了%s'%(name,x))
            else:#取出的内容是空,直接退出循环
                break
    def shengchan(q,name):
        for i in range(10):
            p = '%s 生产了xx%s号'%(name,i)
            q.put(p)
            print(p)
        q.put(None)#当所有产品都被生产完之后给队列传一个空,用于取出时判断
    if __name__ == '__main__':
        q = Queue(6)#实例化队列,并且设置队列最大长度为6
        s = Process(target=shengchan , args=(q,'哈哈'))#开启生产者进程
        s.start()
        x = Process(target=xiaofei , args=(q,''))#开启消费者进程
        x.start()

      生产者消费者模型2(多个生产者消费者,将生产结束标识放在父进程) :

    from multiprocessing import Queue,Process
    
    def xiaofei(q,name):
        while 1:
            x = q.get()
            if x:
                print('%s使用了%s'% (name,x))
            else:
                break
    
    def shengchan(q,name):
        for i in range(10):
            p = '%s生产了xx%s号' %(name,i)
            q.put(p)
    
    if __name__ == '__main__':
        q = Queue()
        s = Process(target=shengchan, args=(q,''))
        s1 = Process(target=shengchan, args=(q,''))
        x = Process(target=xiaofei , args=(q,''))
        x1 = Process(target=xiaofei , args=(q,''))
        s.start()
        s1.start()
        x.start()
        x1.start()
        s.join()
        s1.join()
        q.put(None)
        q.put(None)

      生产者消费者模型3(JoinableQueue模块) :

    from multiprocessing import JoinableQueue,Process
    def xiaofei(j,name):
        while 1:
            j1 = j.get()
            print('%s使用了%s'% (name,j1))
            j.task_done()#消费者没消费一次,便标识一下返回给生产者join,用于计数
    def shengchan(j,name):
        for i in range(10):
            p = '%s 生产了娃娃 %s 号'%(name,i)
            j.put(p)
        j.join()#记录下生产者生产的总数.接收task_done返回的标识以计算消费者消费的产品数量,直到消费进程全部将产品消费完
    
    if __name__ == '__main__':
        j = JoinableQueue()
        s = Process(target=shengchan,args=(j,''))
        x = Process(target=xiaofei,args=(j,''))
        x.daemon = True#将消费方法设置为守护进程,用来结束进程
        s.start()
        x.start()
        s.join()#将生产进程设置为何主进程同步,等待生产进程结束后主进程结束

      

  • 相关阅读:
    hdu 4614 线段树 二分
    cf 1066d 思维 二分
    lca 最大生成树 逆向思维 2018 徐州赛区网络预赛j
    rmq学习
    hdu 5692 dfs序 线段树
    dfs序介绍
    poj 3321 dfs序 树状数组 前向星
    cf 1060d 思维贪心
    【PAT甲级】1126 Eulerian Path (25分)
    【PAT甲级】1125 Chain the Ropes (25分)
  • 原文地址:https://www.cnblogs.com/dong-/p/9520659.html
Copyright © 2011-2022 走看看