zoukankan      html  css  js  c++  java
  • python之路_并发编程之多进程3

    一、生产者消费模型补充

      总结:

        ---生产者消费者模型程序中两种角色:①负责生产数据(生产者);②负责处理数据(消费者)

        ---生产者消费者模型的作用:平衡生产者与消费者之间的速度差。

        ---实现方式:生产者——>队列——>消费者

      如上篇博客内容关于生产消费模型内容,在生产者生产数据的过程结束后,即使消费者已将数据完全获取,消费者程序也不能结束,需由主进程或者生产者在结束生产程序后发送给消费者结束口令,消费者程序才会结束。但是如果出现多个消费者和多个生产者,这种情况又该如何解决?方法如下两种:

    1、根据消费者数量传送结束信号(low)

    from multiprocessing import Process,Queue
    import time
    import random
    import os
    def producer(name,q):
        for i in range(10):
            res='%s%s' %(name,i)
            time.sleep(random.randint(1, 3))
            q.put(res)
            print('%s生产了%s' %(os.getpid(),res))
    def consumer(name,q):
        while True:
            res=q.get()
            if not res:break
            print('%s吃了%s' %(name,res))
    if __name__=='__main__':
        q=Queue()
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('泔水',q))
        p3=Process(target=producer, args=('骨头',q))
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('egon',q))
    
    
        _p=[p1,p2,p3,c1,c2]
        for p in _p:
            p.start()
        p1.join()
        p2.join()
        p3.join()
        '''保证生产程序结束后,再发送结束信号,发送数量和消费者数量一致'''
        q.put(None)
        q.put(None)

    2、JoinableQueue队列机制

     JoinableQueue与Queue队列基本相似,但前者队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。Queue实例的对象具有的方法JoinableQueue同样具有,除此JoinableQueue还具有如下方法:

      ①q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

      ②q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

    from multiprocessing import Process,JoinableQueue
    import time
    import random
    def producer(name,food,q):
        for i in range(10):
            res='%s%s' %(food,i)
            time.sleep(random.randint(1, 3))
            q.put(res)
            print('%s生产了%s' %(name,res))
        q.join()  #阻塞生产者进程,保证此进程结束时消费者进程已处理完其产生的数据
    def consumer(name,q):
        while True:
            res=q.get()
            if not res:break
            print('%s吃了%s' %(name,res))
            q.task_done()
    if __name__=='__main__':
        q=JoinableQueue()
        p1=Process(target=producer,args=(1,'包子',q))
        p2=Process(target=producer,args=(2,'泔水',q))
        p3 = Process(target=producer, args=(3,'骨头', q))
        c1=Process(target=consumer,args=('alex',q))
        c2=Process(target=consumer,args=('egon',q))
        '''守护进程保证主进程结束时,守护进程也立即结束'''
        c1.daemon=True
        c2.daemon=True
    
        _p=[p1,p2,p3,c1,c2]
        for p in _p:
            p.start()
        p1.join()
        p2.join()
        p3.join()

    二、共享内存(Manager)

      展望未来,基于消息传递的并发编程是大势所趋,即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中,进程间通信应该尽量避免使用本节所讲的共享数据的方式。

    from multiprocessing import Process,Lock,Manager
    def work(d,lock):
        '''加锁是保证数据被修改时是逐一进行的,避免多个进程同时拿到一个起始数据,导致数据混乱'''
        with lock:
            d['count']-=1
    if __name__=='__main__':
        lock=Lock()
        m=Manager()
        d=m.dict({'count':100})                  #也可以对其他数据类型进行性共享,如list
        _p=[]
        for i in range(10):
            p=Process(target=work,args=(d,lock))
            p.start()
            _p.append(p)
        for p in _p:
            p.join()
        print(d)                                 #{'count': 90}

    三、进程池(Pool)

      多进程是实现并发的主要手段之一,但是通常会有如下问题:a.很明显需要并发执行的任务通常要远大于核数;b.一个操作系统不可能无限开启进程,通常有几个核就开几个进程;c.进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)。

      Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。执行任务的进程数始终未进程池中指定的那几个。

    1、同步调用(apply)

      同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果。

    from multiprocessing import Pool
    import time,os
    import random
    def work(n):
        print('%s is working' %(os.getpid()))
        time.sleep(random.randint(1,3))
        return n**2
    
    
    if __name__=='__main__':
        p=Pool(4)                                   #从无到有开启4个进程,而且一直是这4个进程
    for i in range(10):
    res
    =p.apply(work,args=(i,)) #等进程执行完,并能得到结果,然后才开启下一个进程,相当于串行
    print(res)

    2、异步调用(apply_async) 

      异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才能get结果。

    from multiprocessing import Pool
    import time,os
    import random
    def work(n):
        print('%s is working' %(os.getpid()))
        time.sleep(random.randint(1,3))
        return n**2
    
    
    if __name__=='__main__':
        p=Pool(4)                              #从无到有开启4个进程,而且一直是这4个进程
        result=[]
        for i in range(10):
            obj=p.apply_async(work,args=(i,))  #提交任务,不会在原地等结果
            result.append(obj)
    '''异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了'''
        p.close()
        p.join()
        for obj in result:
            print(obj.get())
    '''使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get'''

    四、回调函数

      进程池执行完一个获得数据的进程,即刻要求通知主进程拿去解析数据。主进程调用一个函数去处理,这个函数便被称为回调函数,要求进程池进程的结果为回调函数的参数。

      爬虫实例:

    from multiprocessing import Pool,Process
    import requests
    import os
    import time,random
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
        if response.status_code == 200:
            print('%s DONE %s' % (os.getpid(), url))
            return {'url':url,'text':response.text}
    
    def parse(dic):
        print('%s PARSE %s' %(os.getpid(),dic['url']))
        time.sleep(1)
        res='%s:%s
    ' %(dic['url'],len(dic['text']))
        with open('db.txt','a') as f:
            f.write(res)
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        p=Pool(2)
        for url in urls:
            p.apply_async(get,args=(url,),callback=parse) #主进程负责干回调函数的活
        p.close()
        p.join()

      我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数。

  • 相关阅读:
    错误网络异常:android.os.NetworkOnMainThreadException
    方法服务android学习笔记42_服务的生命周期
    芯片软件随想录(关于核心技术)
    数组比特【编程珠玑】如何优化程序打印出小于100000的素数
    宋体函数Java乔晓松oracle分组函数
    调试客户端windbg远程调试方法
    方法对象Hibernate联合主键
    文件运行跟踪 root.sh 的 执行过程
    移植交叉编译pcDuino + busybox 成功搭建最小linux系统
    方法定制iOS学习笔记8UITableView的定制
  • 原文地址:https://www.cnblogs.com/seven-007/p/7657378.html
Copyright © 2011-2022 走看看