1、阻塞与非阻塞指的是程序的两种运行状态:
阻塞:遇到I/O,程序停在原地,并立即释放CPU资源
非阻塞(就绪态或运行态):没有遇到I/O,或者通过某种手段让程序即便是遇到I/O也不会停原地,力求尽可能多占用CPU资源
2、同步与异步指的是提交任务的两种方式:
同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才能继续执行下一行代码
异步调用:提交完任务后,不需要原地等待而是继续执行其他 任务,结果通过执行任务时绑定的回调函数返回
(进程池,线程池)异步+回调机制
import requests import os import time,random from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor def get(url): print('%s GET %s' % (os.getpid(),url)) response = requests.get(url) time.sleep(random.randint(1,3)) # 假设人为I/O等待时间 if response.status_code == 200: # print('%s 下载长度 %s' % (os.getpid(),len(response.text))) return response.text def parse(res): print('%s 解析结果为: %s' % (os.getpid(),len(res))) if __name__ == '__main__': urls = [ 'https://www.baidu.com/', 'https://www.baidu.com/', 'https://www.baidu.com/', 'https://www.baidu.com/', 'https://www.baidu.com/', 'https://www.baidu.com/', # 'https://www.python.org/', # 'https://www.sina.com/', # 'https://www.tmall.com/', # 'https://www.qq.com/', # 'https://www.oldboyedu.com/', ] pool = ProcessPoolExecutor(4) objs = [] for url in urls: obj = pool.submit(get,url) objs.append(obj) pool.shutdown(wait=True) for obj in objs: res = obj.result() parse(res) print('主进程',os.getpid()) """ 结果: 6912 GET https://www.baidu.com/ 5320 GET https://www.baidu.com/ 6228 GET https://www.baidu.com/ 716 GET https://www.baidu.com/ 5320 GET https://www.baidu.com/ 6912 GET https://www.baidu.com/ 704 解析结果为: 2443 704 解析结果为: 2443 704 解析结果为: 2443 704 解析结果为: 2443 704 解析结果为: 2443 704 解析结果为: 2443 主进程 704 存在问题: 1、任务返回值不能得到及时处理,必须得等到所有任务都运行完毕才能统一进行处理 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析6次需要花费12s """
1 import requests 2 import os 3 import time,random 4 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 5 6 def get(url): 7 print('%s GET %s' % (os.getpid(),url)) 8 response = requests.get(url) 9 time.sleep(random.randint(1,3)) # 假设人为I/O等待时间 10 11 if response.status_code == 200: 12 # 干解析的活儿 13 # 省去重开解析进程的开销 14 parse(response.text) 15 16 def parse(res): 17 if not res: 18 res='' 19 print('%s 解析结果为: %s' % (os.getpid(),len(res))) 20 21 if __name__ == '__main__': 22 urls = [ 23 'https://www.baidu.com/', 24 'https://www.baidu.com/', 25 'https://www.baidu.com/', 26 'https://www.baidu.com/', 27 'https://www.baidu.com/', 28 'https://www.baidu.com/', 29 # 'https://www.python.org/', 30 # 'https://www.sina.com/', 31 # 'https://www.tmall.com/', 32 # 'https://www.qq.com/', 33 # 'https://www.oldboyedu.com/', 34 ] 35 36 pool = ProcessPoolExecutor(4) 37 38 39 for url in urls: 40 obj = pool.submit(get,url) 41 42 print('主进程',os.getpid()) 43 44 45 46 47 """ 48 答案: 49 主进程 7488 50 7620 GET https://www.baidu.com/ 51 7732 GET https://www.baidu.com/ 52 7264 GET https://www.baidu.com/ 53 7212 GET https://www.baidu.com/ 54 7264 解析结果为: 2443 55 7264 GET https://www.baidu.com/ 56 7212 解析结果为: 2443 57 7212 GET https://www.baidu.com/ 58 7620 解析结果为: 2443 59 7732 解析结果为: 2443 60 7212 解析结果为: 2443 61 7264 解析结果为: 2443 62 63 存在问题: 64 1、任务返回值不能得到及时处理,必须得等到所有任务都运行完毕才能统一进行处理 65 2、解析的过程是串行执行的,如果解析一次需要花费2s,解析6次需要花费12s 66 67 解决方案: 68 在每个下载的进程中增加解析工作(无形中将两个工作耦合在一个进程中) 69 """
1 # 进程池 2 # import requests 3 # import os 4 # import time,random 5 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 6 # 7 # def get(url): 8 # print('%s GET %s' % (os.getpid(),url)) 9 # response = requests.get(url) 10 # time.sleep(random.randint(1,3)) # 假设人为I/O等待时间 11 # 12 # if response.status_code == 200: 13 # # 干解析的活 14 # return response.text 15 # 16 # def parse(obj): 17 # res = obj.result() 18 # print('%s 解析结果为: %s' % (os.getpid(),len(res))) 19 # 20 # if __name__ == '__main__': 21 # urls = [ 22 # 'https://www.baidu.com/', 23 # 'https://www.baidu.com/', 24 # 'https://www.baidu.com/', 25 # 'https://www.baidu.com/', 26 # 'https://www.baidu.com/', 27 # 'https://www.baidu.com/', 28 # 'https://www.baidu.com/', 29 # 'https://www.baidu.com/', 30 # 'https://www.baidu.com/', 31 # # 'https://www.python.org/', 32 # # 'https://www.sina.com/', 33 # # 'https://www.tmall.com/', 34 # # 'https://www.qq.com/', 35 # # 'https://www.oldboyedu.com/', 36 # ] 37 # 38 # pool = ProcessPoolExecutor(4) 39 # 40 # 41 # for url in urls: 42 # obj = pool.submit(get,url) # 主进程向进程池提交任务,产生进程池的子进程对象 43 # obj.add_done_callback(parse) # 调用子进程对象的一个方法,该方法给调用者绑定了一个方法,会自动将obj执行完毕的结果【回调】作为参数提交给parse函数 44 # 45 # print('主进程',os.getpid()) 46 # 47 # 48 # 49 # """ 50 # 结果: 51 # 主进程 4888 52 # 5628 GET https://www.baidu.com/ 53 # 7428 GET https://www.baidu.com/ 54 # 4128 GET https://www.baidu.com/ 55 # 1804 GET https://www.baidu.com/ 56 # 4128 GET https://www.baidu.com/ 57 # 4888 解析结果为: 2443 58 # 1804 GET https://www.baidu.com/ 59 # 4888 解析结果为: 2443 60 # 5628 GET https://www.baidu.com/ 61 # 4888 解析结果为: 2443 62 # 7428 GET https://www.baidu.com/ 63 # 4888 解析结果为: 2443 64 # 4128 GET https://www.baidu.com/ 65 # 4888 解析结果为: 2443 66 # 4888 解析结果为: 2443 67 # 4888 解析结果为: 2443 68 # 4888 解析结果为: 2443 69 # 4888 解析结果为: 2443 70 # 71 # 72 # 分析: 73 # 解开之前下载和分析两个工作的耦合,让进程池里的子进程进行下载工作,把下载工作执行完成后的结果回调交给主进程进行分析工作 74 # """
1 # 进程池 2 import requests 3 import os 4 import time,random 5 from threading import current_thread 6 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 7 8 def get(url): 9 print('%s GET %s' % (current_thread().name,url)) 10 response = requests.get(url) 11 time.sleep(random.randint(1,3)) # 假设人为I/O等待时间 12 13 if response.status_code == 200: 14 # 干解析的活 15 return response.text 16 17 def parse(obj): 18 res = obj.result() 19 print('%s 解析结果为: %s' % (current_thread().name,len(res))) 20 21 if __name__ == '__main__': 22 urls = [ 23 'https://www.baidu.com/', 24 'https://www.baidu.com/', 25 'https://www.baidu.com/', 26 'https://www.baidu.com/', 27 'https://www.baidu.com/', 28 'https://www.baidu.com/', 29 'https://www.baidu.com/', 30 'https://www.baidu.com/', 31 'https://www.baidu.com/', 32 # 'https://www.python.org/', 33 # 'https://www.sina.com/', 34 # 'https://www.tmall.com/', 35 # 'https://www.qq.com/', 36 # 'https://www.oldboyedu.com/', 37 ] 38 39 # pool = ProcessPoolExecutor(4) 40 pool = ThreadPoolExecutor(4) 41 42 43 for url in urls: 44 obj = pool.submit(get,url) # 主进程向进程池提交任务,产生进程池的子进程对象 45 obj.add_done_callback(parse) # 调用子进程对象的一个方法,该方法给调用者绑定了一个方法,会自动将obj执行完毕的结果【回调】作为参数提交给parse函数 46 47 print('主进程',current_thread().name) 48 49 50 51 """ 52 结果: 53 54 ThreadPoolExecutor-0_0 GET https://www.baidu.com/ 55 ThreadPoolExecutor-0_1 GET https://www.baidu.com/ 56 ThreadPoolExecutor-0_2 GET https://www.baidu.com/ 57 ThreadPoolExecutor-0_3 GET https://www.baidu.com/ 58 主进程 MainThread 59 ThreadPoolExecutor-0_2 解析结果为: 2443 60 ThreadPoolExecutor-0_2 GET https://www.baidu.com/ 61 ThreadPoolExecutor-0_0 解析结果为: 2443 62 ThreadPoolExecutor-0_0 GET https://www.baidu.com/ 63 ThreadPoolExecutor-0_1 解析结果为: 2443 64 ThreadPoolExecutor-0_1 GET https://www.baidu.com/ 65 ThreadPoolExecutor-0_3 解析结果为: 2443 66 ThreadPoolExecutor-0_3 GET https://www.baidu.com/ 67 ThreadPoolExecutor-0_2 解析结果为: 2443 68 ThreadPoolExecutor-0_2 GET https://www.baidu.com/ 69 ThreadPoolExecutor-0_0 解析结果为: 2443 70 ThreadPoolExecutor-0_1 解析结果为: 2443 71 ThreadPoolExecutor-0_3 解析结果为: 2443 72 ThreadPoolExecutor-0_2 解析结果为: 2443 73 74 75 分析: 76 解开之前下载和分析两个工作的耦合,让进程池里的子进程进行下载工作,把下载工作执行完成后的结果回调交给主进程进行分析工作 77 """
http://python.jobbole.com/87743/