zoukankan      html  css  js  c++  java
  • (并发编程)进程IPC,生产者消费者模型,守护进程补充

    一、IPC(进程间通信)机制
    进程之间通信必须找到一种介质,该介质必须满足
    1、是所有进程共享的
    2、必须是内存空间
    附加:帮我们自动处理好锁的问题
    a、from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
    b、IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
    c、IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
    #d. 文件,共享,硬盘,需要自己解决锁的问题
    a、用Manager
    from multiprocessing import Process,Manager,Lock
    import time
    mutex=Lock()
    def task(dic,lock):
        lock.acquire()
        temp=dic['num']
        time.sleep(0.1)
        dic['num']=temp-1
        lock.release()
    if __name__ == '__main__':
        m=Manager()
        dic=m.dict({'num':10})
        l=[]
        for i in range(10):
            p=Process(target=task,args=(dic,mutex))
            l.append(p)
            p.start()
        for p in l:
            p.join()
    print(dic)
    b、用队列Queue
    1)共享的空间
    2)是内存空间
    3)自动帮我们处理好锁定问题
    from multiprocessing import Queue
    q=Queue(3)  #设置队列中maxsize个数为三
    q.put('first')
    q.put({'second':None})
    q.put('三')
    # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
    print(q.get())
    print(q.get())
    print(q.get())
    强调:
    1、队列用来存成进程之间沟通的消息,数据量不应该过大
    2、maxsize的值超过的内存限制就变得毫无意义
                                                                 
                                                                                                                                                                                                         
    了解:
    q=Queue(3)
    q.put('first',block=False)
    q.put('second',block=False)
    q.put('third',block=False)
    q.put('fourth',block=False)  #报错 queue.Full
    q.put('first',block=True)
    q.put('second',block=True)
    q.put('third',block=True)
    q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用
    q.get(block=False)
    q.get(block=False)
    q.get(block=False)
    q.get(block=False)           #报错 queue.Empty
    q.get(block=True)
    q.get(block=True)
    q.get(block=True)
    q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
    二、生产者消费者模型
    该模型中包含两类重要的角色:
    1、生产者:将负责造数据的任务比喻为生产者
    2、消费者:接收生产者造出的数据来做进一步的处理,该类任务被比喻成消费者
    实现生产者消费者模型三要素
    1、生产者
    2、消费者
    3、队列
    什么时候用该模型:
    程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
    该模型的好处:
    1、实现了生产者与消费者解耦和
    2、平衡了生产者的生产力与消费者的处理数据的能力

    import time
    import random
    from multiprocessing import Process,Queue
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
    def producer(name,q,food):
        for i in range(5):
            time.sleep(random.randint(1,2))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
    if __name__ == '__main__':
        #1、共享的盆
        q=Queue()#
        #2、生产者们
        p1=Process(target=producer,args=('egon',q,'包子'))
        p2=Process(target=producer,args=('刘清政',q,'泔水'))
        p3=Process(target=producer,args=('杨军',q,'米饭'))
        #3、消费者们
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('梁书东',q))
        p1.start()
        p2.start()
        p3.start()
        c1.start()
     c2.start()
    生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。
    三、守护进程与应用
     #如果父进程将子进程设置为守护进程,那么在主进程 代码运行完毕 后守护进程就立即被回收
     注意:代码运行完 和 结束(进程死了)的区别
    import time
    import random
    from multiprocessing import Process,Queue
    方法一:
    def consumer(name,q):
        while True:
            res=q.get()
            if res is None:break
            time.sleep(random.randint(1,3))
            print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
    def producer(name,q,food):
        for i in range(5):
            time.sleep(random.randint(1,2))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
    if __name__ == '__main__':
        #1、共享的盆
        q=Queue()
        #2、生产者们
        p1=Process(target=producer,args=('egon',q,'包子'))
        p2=Process(target=producer,args=('刘清政',q,'泔水'))
        p3=Process(target=producer,args=('杨军',q,'米饭'))
        #3、消费者们
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('梁书东',q))

        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()

        # 在生产者生产完毕后,往队列的末尾添加一个结束信号None
        p1.join()
        p2.join()
        p3.join()
        # 有几个消费者就应该放几个结束信号
        q.put(None)
      q.put(None)
    方法二:
    import time
    import random
    from multiprocessing import Process,JoinableQueue
    def consumer(name,q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
            q.task_done() #拿一个减一个,与q.join()有联系
    def producer(name,q,food):
        for i in range(5):
            time.sleep(random.randint(1,2))
            res='%s%s' %(food,i)
            q.put(res)
            print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
    if __name__ == '__main__':
        #1、共享的盆
        q=JoinableQueue()
        #2、生产者们
        p1=Process(target=producer,args=('egon',q,'包子'))
        p2=Process(target=producer,args=('刘清政',q,'泔水'))
        p3=Process(target=producer,args=('杨军',q,'米饭'))
        #3、消费者们
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('梁书东',q))
        c1.daemon=True   # c1.daemon=True 必须在c1.start() 前
        c2.daemon=True
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        # 确定生产者确确实实已经生产完毕
        p1.join()
        p2.join()
        p3.join()
        # 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕
        q.join()
        #一旦结束意味着队列确实被取空,消费者已经确确实实把数据都取干净了
    print('主进程结束')
  • 相关阅读:
    004-核心技术-netty概述、传统IO、Reactor线程模型
    003-核心技术-IO模型-NIO-基于NIO群聊示例
    002-核心技术-IO模型-NIO【Selector、Channel、Buffer】、零拷贝
    018-redis-命令合计
    【整理】js、python、java分别对url进行编码和解码
    深度 | 翟东升:写在美帝国撤军和阿富汗政权溃散之际
    修改Windows10 命令终端cmd的编码为UTF-8 && IDEA控制台输出中文乱码
    Dockerfile文件中的ENTRYPOINT,CMD命令跟k8s中command,args之间的关系
    服务器带宽,流量之间的关系
    值得收藏的下载地址
  • 原文地址:https://www.cnblogs.com/3sss-ss-s/p/9598486.html
Copyright © 2011-2022 走看看