1. 阻塞与非阻塞
执行的角度:
阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。
非阻塞:程序没有遇到IO阻塞,或者程序遇到IO,通过某种方式,让CPU强行运行程序。
2. 同步与异步
发布的角度:
同步调用:在发出一个任务时,自任务开始运行直到结束(可能遇到IO),只有返回一个返回值之后,才会发出下一个任务。
异步调用:一次发布多个任务,然后就直接执行下一行代码。不用等待结束。
shutdown:
- 让主进程等待进程池中所有的子进程都结束后,再执行。
2. 在上一个进程池没有完成所有的任务之前,不允许添加新任务。
一个任务是通过一个函数实现的,任务完成了,它的返回值就是函数的返回值。
2.1 异步调用
# 异步调用
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,2))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor() # 4个进程
for i in range(10):
pool.submit(task, i)
pool.shutdown(wait=True)
print("===主")
2.2 同步调用
result() :将异步调用变成同步,必须等到任务完成,返回结果后,再执行下一个。能够接收返回值。
# 同步调用
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,2))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i) # 动态对象,能够返回当前对象的状态, running、peeding、finishhed三种状态。
print(f'任务结果:{obj.result()}') # 将返回的结果打印
pool.shutdown(wait=True)
print("===主")
2.3 异步调用回收的第一种方式
统一回收结果,不能够马上接收任何一个返回值(实时)。
# 异步调用 接收返回值的第一种方式:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f'{os.getpid()}开始任务')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}任务结束')
return i
if __name__ == '__main__':
# 异步调用
pool = ProcessPoolExecutor() # 4个进程
l1 = []
for i in range(10):
obj = pool.submit(task,i)
l1.append(obj) # 将对象放入容器中。
pool.shutdown(wait=True)
print(l1)
for i in l1:
print(i.result())
print('===主')
3. 异步调用+回调函数
3.1 requests模块
浏览器原理:向服务端发送一个请求,服务端验证,如果是正确的,会返回一个文件;浏览器接收到文件,将文件里面的代码渲染后展示到屏幕上。
爬虫:
1.利用代码模拟浏览器发送伪装的请求,会得到一堆源代码;
2.对源代码进行数据清洗得到想要的数据。
3.2 异步调用回收的第二种方式
要求:
- 要做到实时回收结果,第一种方式没有实时。
- 并发执行任务,只是用来处理IO阻塞的(爬虫也是阻塞),不能增加新的功能(清洗数据功能),否则容易造成耦合。
解决:增加回调函数。
回调函数:按顺序接收每个任务的结果,进行下一步的数据处理。 对象. add_done_callback(方法名)
异步处理IO类型,回调函数处理非IO,才可用异步 + 调用。
线程 + 回调:将处理的数据交给空闲的线程去执行;
进程 + 回调:将处理的数据交给主进程去执行。
from concurrent.futurse import ThreadPoolExecutor
import requests
def task(url):
"""模拟爬取多个源代码,有IO操作"""
response = requests.get(url)
if response.status_code == 200:
return response.text
def parse(obj):
"""模拟对数据进行分析,一般没有IO"""
print(len(obj.result()))
if __name__ == '__main__':
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.cnblogs.com/yzm1017/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/']
pool = ThreadPoolExecutor(4) # 4个线程
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse) # 回调函数
"""
线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码, 并发执行,
当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,你这个线程继续去处理其他任务。
"""
"""
2381
99306
99306
2708
21634
47067
99306
143930
571258
48110
"""