zoukankan      html  css  js  c++  java
  • concurrent.futures模块 -----进程池 ---线程池 ---回调

    concurrent.futures模块提供了高度封装的异步调用接口,它内部有关的两个池

    ThreadPoolExecutor:线程池,提供异步调用,其基础就是老版的Pool

    ProcessPoolExecutor: 进程池,提供异步调用

    方法

    ProcessPoolExecutor(n):n表示池里面存放多少个进程,之后的连接最大就是n的值

    submit(fn,*args,**kwargs) 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续,--------》默认 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前
    result(timeout
    =None) #取得结果 add_done_callback(fn) #回调函数
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random,os
    
    def task(n):
        print('%s is running'% os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    def handle(res):
        res=res.result()
        print("handle res %s"%res)
    
    if __name__ == '__main__':
        # #同步调用
        # pool=ProcessPoolExecutor(8)
        #
        # for i in range(13):
        #     pool.submit(task, i).result() #变成同步调用,串行了,等待结果
        # # pool.shutdown(wait=True) #关门等待所有进程完成
        # pool.shutdown(wait=False)#默认wait就等于True
        # # pool.submit(task,3333) #shutdown后不能使用submit命令
        #
        # print('主')
    
        #异步调用
        pool=ProcessPoolExecutor(8)
        for i in range(13):
             obj=pool.submit(task,i)
             obj.add_done_callback(handle) #这里用到了回调函数
        pool.shutdown(wait=True) #关门等待所有进程完成
        print('')
    ProcessPoolExecutor
    #提交任务的两种方式
    #同步调用:提交完任务后,就在原地等待,等待任务结束,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
    #异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行,程序是并发执行
    
    #同步有可能是计算任务而在等待
    #ProcessPoolExcutor基于pool开发的
    
    #进程的执行状态
    #阻塞:遇到i/o进入的一种状态,等待
    #非阻塞:
    进程其他说明
    from concurrent.futures import ThreadPoolExecutor
    from urllib import request
    from threading import current_thread
    import time
    
    def get(url):
        print('%s get %s'%(current_thread().getName(),url))
        response=request.urlopen(url)
        time.sleep(2)
        # print(response.read().decode('utf-8'))
        return{'url':url,'content':response.read().decode('utf-8')}
    
    def parse(res):
        res=res.result()
        print('parse:[%s] res:[%s]'%(res['url'],len(res['content'])))
    
    # get('http://www.baidu.com')
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(2)
    
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
            'https://www.openstack.org',
    
        ]
    
        for url in urls:
            pool.submit(get,url).add_done_callback(parse)
    ThreadPoolExecutor
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    
    import os,time,random
    def task(n):
        print('%s is runing' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
    
        executor=ThreadPoolExecutor(max_workers=3)
    
        # for i in range(11):
        #     future=executor.submit(task,i)
    
        executor.map(task,range(1,12)) #map取代了for+submit
    map用法

    回调 略

  • 相关阅读:
    CF174 div1 B. Cow Program 记忆化搜索
    调整方向,思考
    国际歌法文歌词
    博客认证
    卖东西
    实际上网上社区是一个微型的社会,拿来做社会学的实验我相信非常有意思.
    关于博客应用
    转:在线工具
    关于社区气质
    定餐网
  • 原文地址:https://www.cnblogs.com/mmyy-blog/p/9453453.html
Copyright © 2011-2022 走看看