zoukankan      html  css  js  c++  java
  • Python之路(第四十篇)进程池

     

    一、进程池

    进程池也是通过事先划分一块系统资源区域,这组资源区域在服务器启动时就已经创建和初始化,用户如果想创建新的进程,可以直接取得资源,从而避免了动态分配资源(这是很耗时的)。

    线程池内子进程的数目一般在3~10个之间,当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多(进程开启过多,效率反而会下降,开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)。

    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

    multiprocess.Pool模块

      Pool([numprocess  [,initializer [, initargs]]]):创建进程池
      numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值,如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
      initializer:是每个工作进程启动时要执行的可调用对象,默认为None
      initargs:是要传给initializer的参数组
    

      

    主要方法

      p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
      ​
      ​
      p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
      '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
    将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
         
      p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
      ​
      P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
      ​
      map():Pool类中的map方法,与内置的map函数用法行为基本一致,即针对每个参数都进行func()处理,它会使进程阻塞直到结果一起整体返回。
      ​
      注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
      ​
      terminal():结束工作进程,不在处理未处理的任务。
      ​
      join():主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。
    

      

    例子1

      
      from multiprocessing import Pool
      import time
      import os
      ​
      def func1(n):
          print("子进程%s开始%s"%(n,os.getpid()))
          time.sleep(2)
          print("子进程%s结束%s"%(n,os.getpid()))
          return n
      ​
      def func2(n):
          for i in range(10):
              print(n+2)
      ​
      if __name__ == "__main__":
          p = Pool(5) #5个进程,默认是cpu的核心数
          res = p.map(func1,range(10))  #10个任务,参数必须是可迭代的
          print(res)  #返回值是带所有子进程的结果的列表
      # 默认异步的执行任务,且自带close和join
      #
      # p.map(funcname,iterable)     默认异步的执行任务,且自带close和join
    

      

    例子2

      
      from multiprocessing import Pool
      import time
      import os
      ​
      def func(n):
          print("进程池中的进程开始%s"%n,os.getpid())
          time.sleep(2)
          print("进程池中的进程结束%s"%n,os.getpid())
      ​
      ​
      if __name__ == "__main__":
          pool = Pool(5) 
          for i in range(20):
              # pool.apply(func,args=(i,))  #同步的执行
              pool.apply_async(func,args=(i,)) #异步的执行
          pool.close() #结束进程池提交任务
          pool.join() # 感知进程池中的任务执行结束
     
    

      

    进程池的返回值

    方法apply_async()和map_async()的返回值是AsyncResul的实例obj对象。

    实例具有以下方法

    obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。

    obj.ready():如果调用完成,返回True

    obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常

    obj.wait([timeout]):等待结果变为可用。

    obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

    例子1

      from multiprocessing import Pool
      import time
      ​
      def func(n):
          print("进程%s"%n)
          time.sleep(2)
          return n*n
      ​
      if __name__ == "__main__":
          p = Pool(5)  #5个进程
          res_l = []
          for i in range(10): #
              # res = p.apply(func,args=(i,))  #同步
              # print(res)
              # res_l.append(res)
      ​
              res = p.apply_async(func,args=(i,))  #异步
              # print(res.get()) #注意异步时提交返回值需要用get()方法获取,get()方法自带join()效果,
              #如果在这里打印则会出现和同步时一样的效果
              res_l.append(res)
          # 同步
          # print(res_l)
      ​
          # 异步
          for i in res_l:
              print(i.get())
    

      

    执行结果:同步时是一个接一个的慢慢返回结果,异步时很快的返回整个结果,可能会出现顺序乱的情况

    例子2

      
      import time
      from multiprocessing import Pool
      def func(i):
          time.sleep(0.5)
          return i*i
      ​
      if __name__ == '__main__':
          p = Pool(5)
          ret = p.map(func,range(10))
          print(ret)
    

      

    执行结果:整体一起输出,map()自带join()效果,因此会阻塞,然后整体一起输出结果

    进程池的回调函数

    可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。

    简单的说就是进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数。

    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

    简单例子

      
      from multiprocessing import Pool
      import time
      import os
      ​
      def func1(n):
          print("子进程",os.getpid())
          time.sleep(1)
          return n*n
      ​
      ​
      def func2(m):
          print("回调函数",os.getpid())
          print("回调函数",m)
      ​
      if __name__ == "__main__":
          p = Pool(5)
          for i in range(10):
              p.apply_async(func1,args=(i,),callback=func2)
          p.close()
          p.join()
          print("主进程",os.getpid())
          
          # 从执行结果可以看出执行回调函数的进程是主进程
    

      

    回调函数最多的在爬虫程序过程中使用的较多,爬取网页用子进程,处理数据用回调函数。爬虫:即网络爬虫,可以简单的理解为爬取网页源码的程序,在python中可以使用requests模块实现。

    例子1

      
      import requests
      from multiprocessing import Pool
      ​
      def get(url):
          responds = requests.get(url)
          if responds.status_code == 200:
              return url,responds.content.decode("utf-8")
      ​
      ​
      def call_back(args):
          url,content = args
          print(url,len(content))
      ​
      if __name__ == "__main__":
          url_li = ["https://www.bitmain.com/",
                    "https://www.feixiaohao.com/",
                    "https://www.jinse.com/",
                    "http://www.avalonminer.shop/",
                    "http://www.innosilicon.com/html/product/index.html"
                    ]
          p = Pool(4)
          for url in url_li:
              p.apply_async(get,args=(url,),callback=call_back)
          p.close()
          p.join()
      ​
      # 在爬虫爬取网页过程中,主要耗时间的是爬取的过程,
      # 假设这里的call_back()是放在子进程中执行,则耗费了更多的时间,
      # 与此同时主进程一直是空闲的,这里的call_back()放在主进程执行,节省了程序执行的时间
    

      

    例子2

    import re
    from urllib.request import urlopen
    import requests
    from multiprocessing import Pool
    
    def get_page(url,pattern):
        response=urlopen(url).read().decode('utf-8')  #这里拿到的是有格式的网页源码
        return pattern,response   # 正则表达式编译结果 网页内容
    
    def get(url,pattern):
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                     'Chrome/51.0.2704.63 Safari/537.36'}
        responds = requests.get(url,headers=headers,timeout=30)
        res = responds.content.decode("utf-8") #这里拿到的是简化版的无格式的网页源码
        return pattern,res
    
    
    def parse_page(info):
        pattern,page_content=info
        print("网页内容长度",len(page_content))
        # print(page_content)
        res=re.findall(pattern,page_content)
        for item in res:
            dic={
                'index':item[0].strip(),
                'title':item[1].strip(),
                'actor':item[2].strip(),
                'time':item[3].strip(),
            }
            print(dic)
    
    if __name__ == '__main__':
        regex = r'<dd>.*?<.*?class="board-index.*?>(d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
        pattern1=re.compile(regex,re.S)
        url_dic={'http://maoyan.com/board/7':pattern1}
        p=Pool()
        res_l=[]
        for url,pattern in url_dic.items():
            # res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
            res=p.apply_async(get,args=(url,pattern),callback=parse_page)
            res_l.append(res)
    
        for i in res_l:
            i.get()
    

      

  • 相关阅读:
    准备改进回复功能
    今天的任务
    日历已加上
    web.config中globalization设置的问题
    Request获取url信息的各种方法比较
    增加了高级评论功能
    如何修改日历的CSS
    推荐有关MasterPages的三篇文章
    如何定制日历控件显示的星期文字
    FreeTextBox的问题终于解决了
  • 原文地址:https://www.cnblogs.com/Nicholas0707/p/10854058.html
Copyright © 2011-2022 走看看