concurrent.futures模块提供了高度封装的异步调用接口,它内部有关的两个池
ThreadPoolExecutor:线程池,提供异步调用,其基础就是老版的Pool
ProcessPoolExecutor: 进程池,提供异步调用
方法
ProcessPoolExecutor(n):n表示池里面存放多少个进程,之后的连接最大就是n的值
submit(fn,*args,**kwargs) 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续,--------》默认 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前
result(timeout=None) #取得结果 add_done_callback(fn) #回调函数
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os def task(n): print('%s is running'% os.getpid()) time.sleep(random.randint(1,3)) return n**2 def handle(res): res=res.result() print("handle res %s"%res) if __name__ == '__main__': # #同步调用 # pool=ProcessPoolExecutor(8) # # for i in range(13): # pool.submit(task, i).result() #变成同步调用,串行了,等待结果 # # pool.shutdown(wait=True) #关门等待所有进程完成 # pool.shutdown(wait=False)#默认wait就等于True # # pool.submit(task,3333) #shutdown后不能使用submit命令 # # print('主') #异步调用 pool=ProcessPoolExecutor(8) for i in range(13): obj=pool.submit(task,i) obj.add_done_callback(handle) #这里用到了回调函数 pool.shutdown(wait=True) #关门等待所有进程完成 print('主')
#提交任务的两种方式 #同步调用:提交完任务后,就在原地等待,等待任务结束,拿到任务的返回值,才能继续下一行代码,导致程序串行执行 #异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行,程序是并发执行 #同步有可能是计算任务而在等待 #ProcessPoolExcutor基于pool开发的 #进程的执行状态 #阻塞:遇到i/o进入的一种状态,等待 #非阻塞:
from concurrent.futures import ThreadPoolExecutor from urllib import request from threading import current_thread import time def get(url): print('%s get %s'%(current_thread().getName(),url)) response=request.urlopen(url) time.sleep(2) # print(response.read().decode('utf-8')) return{'url':url,'content':response.read().decode('utf-8')} def parse(res): res=res.result() print('parse:[%s] res:[%s]'%(res['url'],len(res['content']))) # get('http://www.baidu.com') if __name__ == '__main__': pool=ThreadPoolExecutor(2) urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://www.openstack.org', 'https://www.openstack.org', 'https://www.openstack.org', 'https://www.openstack.org', 'https://www.openstack.org', ] for url in urls: pool.submit(get,url).add_done_callback(parse)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
回调 略