zoukankan      html  css  js  c++  java
  • 队列&生产者消费者模型

    队列

    ipc机制:进程通讯

    管道:pipe 基于共享的内存空间

    队列:pipe+锁 queue

    from multiprocessing import Process,Queue
    
    ### 案例一
    q = Queue()
    q.put('hyc')
    q.put([1,2,4])
    q.put(2)
    print(q.get())
    print(q.get())
    print(q.get())
    # q.put(5)
    # q.put(5)
    print(q.get()) # 默认就会一直等着拿值
    

    此时程序运行到这里就会阻塞

    from multiprocessing import Process,Queue
    
    ## 案例2
    q = Queue(4)
    q.put('我妻由乃')
    q.put([1,2,4])
    q.put('我妻善逸')
    q.put(2)
    
    q.put('乔碧萝')  #队列满了的情况再放值,会阻塞
    

    也是同理,已经设置了四个值,当放入第五个值时,就会阻塞

    from multiprocessing import Process,Queue
    
    q = Queue(3)
    q.put('zhao',block=True,timeout=2) #
    q.put('zhao',block=True,timeout=2) #
    q.put('zhao',block=True,timeout=2) #
    
    q.put('zhao',block=True,timeout=5) # put里的  block=True 如果满了会等待,timeout最多等待n s,如果ns还是队列还是满的就报错了
    

    在等待5秒后会报错

    from multiprocessing import Process,Queue
    
    q = Queue()
    q.put('yyyy')
    q.get()
    q.get(block=True,timeout=5) # block=True 阻塞等待,timeout最多等5s, 剩下同上
    

    取值时也是同理

    q = Queue(3)
    q.put('qwe')
    q.put('qwe')
    q.put('qwe')
    
    q.put('qwe',block=False) # 对于put来说block=False 如果队列满了就直接报错
    
    q = Queue(3)
    q.put('qwe')
    q.get()
    
    
    q.get(block=False)
    # block = Flase 拿不到不阻塞,直接报错
    

    当 block = Flase 时,timeout 就没有什么意义了,因为会直接报错

    q = Queue(1)
    q.put('123')
    # q.get()
    q.put_nowait('666') # block = False
    q.get_nowait() # block = False
    

    也会直接报错其实是和block = False一样的,不阻塞,有问题时直接报错

    生产者和消费者模型

    生产者:生产数据的任务

    消费者:处理数据的任务

    生产者和消费者的关系:生产者--队列(盆)-->消费者

    生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.
    生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.

    from multiprocessing import Process, Queue
    
    def producer(q,name,food):
        '''生产者'''
        for i in range(10):
            print(f'{name}生产了{food}{i}')
            res = f'{food}{i}'
            q.put(res)
        q.put(None)
    
    def consumer(q,name):
        '''消费者'''
        while True:
            res = q.get(timeout=5)
            if res is None:break
            print(f'{name}吃了{res}')
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,'rocky','包子'))
        c1 = Process(target=consumer,args=(q,'成哥'))
        p1.start()
        c1.start()
    

    这就是最简单的生产者和消费者模型了,他的作用是每次生产者都会生产物品,当生产者生产完10个包子的时候,跳出for循环,最后放入一个None。随后当消费者接收到None的时候,就会被break掉。

    在一个生产者对上一个消费者的时候,这个方法是可行的。但是当多个生产者对一个消费者的时候,其中一名生产者先生产完了食物并放入None,但是第二名生产者还没有生产完的话,消费者却已经收到了第一个生产者的None。这样的话会出现问题,所以我们可以改变一下思路,把None放到外面

    from multiprocessing import Process,Queue
    import time,random
    
    def producer(q,name,food):
        '''生产者'''
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1, 3))
            res = f'{food}{i}'
            q.put(res)
        # q.put(None)
    
    def consumer(q,name):
        '''消费者'''
        while True:
            res = q.get(timeout=5)
            if res is None:break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,'rocky','包子'))
        p2 = Process(target=producer,args=(q,'mac','韭菜'))
        p3 = Process(target=producer,args=(q,'nick','蒜泥'))
        c1 = Process(target=consumer,args=(q,'成哥'))
        c2 = Process(target=consumer,args=(q,'浩南哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        q.put(None)# 几个消费者put几次
        q.put(None)
    

    但是用这种方法就会出现新的问题,因为消费者方法用的是while True,他是不会停下的。所以我们就要用到之前学过的守护进程:

    from multiprocessing import Process,Queue,JoinableQueue
    import time,random
    
    def producer(q,name,food):
        '''生产者'''
        for i in range(3):
            print(f'{name}生产了{food}{i}')
            time.sleep(random.randint(1, 3))
            res = f'{food}{i}'
            q.put(res)
        # q.put(None)
    
    def consumer(q,name):
        '''消费者'''
        while True:
            res = q.get()
            # if res is None:break
            time.sleep(random.randint(1,3))
            print(f'{name}吃了{res}')
            q.task_done() #
    
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer,args=(q,'rocky','包子'))
        p2 = Process(target=producer,args=(q,'mac','韭菜'))
        p3 = Process(target=producer,args=(q,'nick','蒜泥'))
        c1 = Process(target=consumer,args=(q,'成哥'))
        c2 = Process(target=consumer,args=(q,'浩南哥'))
        p1.start()
        p2.start()
        p3.start()
        c1.daemon = True
        c2.daemon = True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join() # 生产者生产完毕
        # q.put(None)# 几个消费者put几次
        # q.put(None)
        q.join() # 分析
        # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
        #这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.
    

    joinableQueue

    from multiprocessing import Process,Queue,JoinableQueue
    
    
    q = JoinableQueue()
    
    q.put('zhao') # 放队列里一个任务
    q.put('qian')
    
    print(q.get())
    q.task_done() # 完成了一次任务
    print(q.get())
    q.task_done() # 完成了一次任务
    q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
    
    # 想象成一个计数器 :put +1   task_done -1
    
  • 相关阅读:
    通过Spring使用远程访问和web服务
    mongoDB id 导出,dump,sed,count,mysql import等用法示例
    Spring属性占位符 PropertyPlaceholderConfigurer
    关于Spring管理的类如何创建对象
    spring中反射机制和注入的使用
    Spring 反射注入+全注解注入
    Spring 注解@Component,@Service,@Controller,@Repository
    @Transactional spring 配置事务 注意事项
    Spring 注解Autowired自动注入bean异常解决
    Spring事务的传播行为 @Transactional
  • 原文地址:https://www.cnblogs.com/jie9527-/p/11537474.html
Copyright © 2011-2022 走看看