zoukankan      html  css  js  c++  java
  • Python Day 33 并发编程(生产者消费者模型,multiprocessing.Pipe管道,multiprocessing.Pool进程池)

    生产者消费者模型

    import time
    import random
    from multiprocessing import Process,Queue
    
    def consumer(q):
        while True:
            obj = q.get()
            print('消费了一个数据%s'%obj)
            time.sleep(random.randint(1,3))
    
    if __name__ == '__main__':
        q =Queue()
        p = Process(target=consumer,args=(q,))
        p.daemon = True
        p.start()
    
        for i in range(10):
            time.sleep(random.randint(1,5))
            q.put('food%s'%i)
            print('生产了一个数据food%s'%i)
    简单的生产者消费者模型
    import time
    import random
    from multiprocessing import Process, Queue
    
    
    def consumer(q):
        while True:
            obj = q.get()
            if obj == None:break
            print('消费了一个数据%s' % obj)
            time.sleep(random.randint(1, 3))
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1, 5))
            q.put('food%s' % i)
            print('生产了一个数据food%s' % i)
        q.put(None)
    
    
    if __name__ == '__main__':
        q = Queue()
    
        for i in range(2):
            p_consumer = Process(target=consumer, args=(q,))
            p_consumer.start()
    
    
        for i in range(2):
            p_producer = Process(target=producer, args=(q,))
            p_producer.start()
    多个生产者消费者模型
    import time
    import random
    from multiprocessing import Process, Queue
    
    
    def consumer(q):
        while True:
            obj = q.get()
            if obj == None:break
            print('消费了一个数据%s' % obj)
            time.sleep(random.randint(1, 3))
    
    def producer(q):
        for i in range(10):
            time.sleep(random.randint(1, 5))
            q.put('food%s' % i)
            print('生产了一个数据food%s' % i)
    
    
    if __name__ == '__main__':
        q = Queue()
    
        p1_consumer = Process(target=consumer, args=(q,))
        p2_consumer = Process(target=consumer, args=(q,))
        p1_consumer.start()
        p2_consumer.start()
    
        p1_producer = Process(target=producer, args=(q,))
        p2_producer = Process(target=producer, args=(q,))
        p1_producer.start()
        p2_producer.start()
        p1_producer.join()
        q.put(None)
        p2_producer.join()
        q.put(None)
    View Code

    每个进程的生产者在生产完后向队列中put一个None,消费者get时判断,收到None则退出

    multiprocessing.JoinableQueue()

    创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 

    q = JoinableQueue()

    q.join()

    q.task_done

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
            q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
    
    def producer(name,q):
        for i in range(10):
            time.sleep(random.randint(1,3))
            res='%s%s' %(name,i)
            q.put(res)
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
        q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨头',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        c1.daemon=True
        c2.daemon=True
    
        #开始
        p_l=[p1,p2,p3,c1,c2]
        for p in p_l:
            p.start()
    
        p1.join()
        p2.join()
        p3.join()
        print('') 
        
        #主进程等--->p1,p2,p3等---->c1,c2
        #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
        #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
    JoinableQueue队列实现消费之生产者模型

    multiprocessing.Pipe管道

    #创建管道的类:
    Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
    #参数介绍:
    dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
        conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
     #其他方法:
    conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
    conn1.fileno():返回连接使用的整数文件描述符
    conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
     
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
    conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
     
    conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
    
    介绍
    管道
    (pickle实现) 可以传递更多类型数据
    from  multiprocessing import  Pipe  #(pickle实现)
    lp,rp = Pipe()
    lp.send([1,2,3,])
    print(rp.recv())
    rp.send([1,2,3,])
    print(lp.recv())
    Pipe
    from  multiprocessing import Pipe,Process
    def consumer(lp,rp):
        lp.close()
        while True:
            try:
                print(rp.recv())
            except EOFError:break
    
    if __name__ == '__main__':
        lp, rp =  Pipe()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        rp.close()
        for i in range(100):
            lp.send('food%s'%i)
        lp.close()
    Pipe例子
    应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。 
    注意
    from multiprocessing import Pipe,Process
    # lp,rp = Pipe()
    # lp.send([1,2,3])
    # print(rp.recv())
    # rp.send('aa')
    # print(lp.recv())
    def consumer(lp,rp):
        # lp.close()  #不写close将不会引发EOFError,消费者一直阻塞
        while True:
            # try:
                print(rp.recv())
            # except EOFError:break
    
    if __name__ == '__main__':
        lp, rp =  Pipe()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        Process(target=consumer,args=(lp,rp)).start()
        rp.close()
        for i in range(500):
            lp.send('food%s'%i)
        lp.close()
    EOFERROR
    管道的EOFError是怎么报出来的
    管道空了,且一端关闭了
    管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。
    管道在数据管理上是不安全的,没有锁机制
    队列的实现机制 就是 管道+锁

    multiprocessing.Manager  进程之间的数据共享

    with Manager() as m:     ==    m = Manager()

    dic = m.dict({'count':100})

    #创建100个进程对dic进行减1操作,得到的结果不准确,默认没有锁,需要加锁

    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
            d['count'] -= 1
    
    if __name__ == '__main__':
        lock=Lock()
        with Manager() as m:
            # m = Manager()  #Manager(list,dict)数据不安全的需要加锁
            dic=m.dict({'count':100})
            p_l=[]
            for i in range(100):
                p=Process(target=work,args=(dic,lock))
                p_l.append(p)
                p.start()
            for p in p_l:
                p.join()
            print(dic)
    Manager

    展望未来,基于消息传递的并发编程是大势所趋

    即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。

    这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

    但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

    以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

    multiprocessing.Pool进程池

    进程池

      cpu数量有限,同一时间启动进程数量有限,多进程适合处理高计算型程序,阻塞性不适合用多进程,会一直占用cpu

    进程的个数和cpu数量相关,一般情况设置 进程的个数 = cpu个数或cpu个数加1 ,1切换时间,2进程数量有限,多一个被调用到几率大(不是绝对的)  

    • 对于纯计算型的代码,使用进程池更好
    • 对于高IO的代码,直接多进程更好(不科学,相对的)
    • 进程池比起多进程来说,节省了开启进程回收进程资源的时间,给操作系统调度降低了难度
      • p = Pool(5)  创建一个进程池对象
      • p.apply(func=xxx,args=(123,))  同步提交任务 ,没有多进程的优势,还增加了开启和关闭进程时间
      • p.apply_async(func=xxx,args=(123,),callback=call)  异步提交任务,归还进程后立即派发 (常用) 创建时可以拿到返回值,和close join结合使用的
        • 回调函数 callback    程序执行结束后return内容,返回给callback函数,使用的是主进程的资源
      • p.map(func=wahaha,iterable=(range(101)))  接收一个任务函数,和一个iterable,节省了for循环 close(),join,是一种简便写法. 拿不到返回值
      • p.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
      • p.ready():如果调用完成,返回True
      • p.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
      • p.wait([timeout]):等待结果变为可用
      • p.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
    在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
    
    在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
    为什么要有进程池?进程池的概念
    import time
    import random
    from multiprocessing import Pool,Process
    def wahaha(i):
        print(i ** i)
        time.sleep(random.randint(1,5))
        return i ** i*'-'
    
    if __name__ == '__main__':
        start = time.time()
        p = Pool(5)
        for i in range(101):
            p.apply_async(func=wahaha,args=(i,))   # 异步提交了一个任务
        p.close()
        p.join()
        print(time.time() - start)
    
        start = time.time()
        p_lst = [Process(target=wahaha, args=(i,)) for i in range(101)]
        for p in p_lst:p.start()
        for p in p_lst:p.join()
        print( time.time()- start)
    
        p = Pool(5)
        for i in range(101):
            p.apply_async(func=wahaha,args=(i,))   # 异步提交了一个任务
        p.close()
        p.join()
    
        p = Pool(5)
        for i in range(101):
            p.apply(func=wahaha,args=(i,))   # 同步提交了一个任务
        p.close()
        p.join()
    
        p = Pool()
        p.map(func=wahaha,iterable=range(101))
    
        p = Pool(5)
        result_lst = []
        for i in range(101):
            r = p.apply_async(func=wahaha,args=(i,))   # 异步提交了一个任务
            result_lst.append(r)
        for r in result_lst:print(r.get())
        p.close()
        p.join()
    Pool
    进程池和多进程效率对比
    import time
    import random
    from multiprocessing import Process,Pool
    def wahaha(i):
        print(i*i)
        # time.sleep(random.randint(1,5))
        return  i * i * '-'
    
    def ret(argv):
        print(argv)
    
    if __name__ == '__main__':
        start = time.time()   #apply_async 异步
        p = Pool(5)
        for i in range(101):
            # p.apply_async(func=wahaha,args=(i,),callback=ret) #向池中添加新任务
            p.apply_async(func=wahaha,args=(i,)) #向池中添加新任务
        p.close() #停止往池中添加新任务
        p.join() # join依赖close,一个进程池必须先close(完成提交任务)再join
        print(time.time() - start)
    
        start = time.time()    #多进程执行
        p_lst = [Process(target=wahaha, args=(i,)) for i in range(101)]
        for p in p_lst:p.start()
        for p in p_lst:p.join()
        print( time.time()- start)
    
    =======================
    0.6436324119567871 #
    8.800149202346802 #
    对于纯计算型的代码 使用进程池更好
    import time
    import random
    from multiprocessing import Process,Pool
    def wahaha(i):
        print(i*i)
        time.sleep(random.randint(1,5))
        return  i * i * '-'
    
    def ret(argv):
        print(argv)
    
    if __name__ == '__main__':
        start = time.time()   #apply_async 异步
        p = Pool(5)
        for i in range(101):
            # p.apply_async(func=wahaha,args=(i,),callback=ret) #向池中添加新任务
            p.apply_async(func=wahaha,args=(i,)) #向池中添加新任务
        p.close() #停止往池中添加新任务
        p.join() # join依赖close,一个进程池必须先close(完成提交任务)再join
        print(time.time() - start)
    
        start = time.time()    #多进程执行
        p_lst = [Process(target=wahaha, args=(i,)) for i in range(101)]
        for p in p_lst:p.start()
        for p in p_lst:p.join()
        print( time.time()- start)
    
    =================================
    59.929256200790405
    17.863743543624878
    当子进程中高IO(有阻塞)时,反而不如多进程(不科学,相对)
    进程池比起多进程来说 节省了开启进程回收进程资源的时间,给操作系统调度进程降低了难度
  • 相关阅读:
    0593. Valid Square (M)
    0832. Flipping an Image (E)
    1026. Maximum Difference Between Node and Ancestor (M)
    0563. Binary Tree Tilt (E)
    0445. Add Two Numbers II (M)
    1283. Find the Smallest Divisor Given a Threshold (M)
    C Primer Plus note9
    C Primer Plus note8
    C Primer Plus note7
    C Primer Plus note6
  • 原文地址:https://www.cnblogs.com/eailoo/p/9178763.html
Copyright © 2011-2022 走看看