多线程(三)
1.同步/异步and阻塞/非阻塞
进程运行的三种状态:运行,就绪,阻塞
从进程执行的态度:
阻塞:程序运行时,遇到了IO,程序挂起,cpu被切走
非阻塞:程序没有遇到IO,或者程序遇到IO,通过某种手段,让cpu强行运行该程序
从提交任务的角度:
同步:提交一个任务,自任务开始运行直到此任务结束(可能有IO),返回一个返回值之后,再提交下一个任务
异步:一次提交多个任务,然后就直接执行下一行代码
ps:
1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。
2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程
3.阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
#举例:
1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
2.同步调用/异步调用
同步调用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os
def task(i):
print(f"{os.getpid()} 开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()} 任务结束")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) #同步调用
for i in range(10):
obj = pool.submit(task,i)
#obj是一个动态对象,返回的当前对象的状态,可能运行,可能阻塞或结束
print(f'任务结果:{obj.result()}')
#obj.result() 必须等到此任务完成后,得到返回结果,在执行下一个
pool.shutdown(wait = True)
#shutdown:主进程等待进程池中的所有子进程结束任务之后,在执行,类似于join
#wait=True:在上一个进程池没有完成所有的任务之前,不允许添加新的任务
#一个任务是通过一个函数实现的,任务完成了它的返回值就是函数的返回值
print('===主')
异步调用
1.版本1
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os
def task(i):
print(f"{os.getpid()} 开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()} 任务结束")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 异步调用
for i in range(10):
pool.submit(task,i)
pool.shutdown(wait =True)
print('==='主)
异步调用,返回值如何接受?这一问题未解决
2.异步调用接收结果
异步调用,统一回收结果
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,random,os
def task(i):
print(f"{os.getpid()} 开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()} 任务结束")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
l1 = []
for i in range(10):
obj = pool.submit(task,i)
l1.append(obj)
pool.shutdown(wait=True)
for i in l1:
print(i.result())
print("===主")
统一回收结果:不能马上收到任何一个已经完成的任务的返回值,只能等所有的任务全部结束后统一回收
3.异步调用+回调函数
浏览器工作原理:向服务端发送一个请求,服务端验证请求,如果正确,返回一个文件,浏览器接收到文件,将文件里面的代码渲染成可视化页面
爬虫:
1.利用代码模拟一个浏览器,进行浏览器的工作流程得到网页源代码
2.对源代码进行数据清洗得到想要的数据
版本1:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import tim,random,os,requests
def task(url):
ret = requests.get(url) #模拟爬取多个源代码,一定有IO阻塞
if ret.status_code = 200:
return ret.text
def parse(content):
return len(content) #模拟对数据进行分析,一般没有IO
if __name__ == '__main__':
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.taobao.com'
'http://www.sina.com',
'http://www.gitee.com',
'http://www.zhihu.com',
'http://www.7k7k.com',
'http://www.JD.com',
'http://www.hao123.com',
'http://www.4399.com'
]
pool = ThreadPoolExecutor(4) #开启线程池,并发并行的执行
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait = True)
for res in obj_list:
print(parse(res.result())) #串行,耗费时间长
版本1:
1.线程池设置4个线程,异步发出10个任务,每个任务是通过网页获取源码,并发的执行,但是统一的接收所有任务的返回值.(效率低,不能实时的获取结果)
2.分析结果流程是串行,影响效率
版本2:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import tim,random,os,requests
def task(url):
ret = requests.get(url) #模拟爬取多个源代码,一定有IO阻塞
if ret.status_code = 200:
return parse(ret.text)
def parse(content):
return len(content) #模拟对数据进行分析,一般没有IO
if __name__ == '__main__':
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.taobao.com'
'http://www.sina.com',
'http://www.gitee.com',
'http://www.zhihu.com',
'http://www.7k7k.com',
'http://www.JD.com',
'http://www.hao123.com',
'http://www.4399.com'
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait = True)
for res in obj_list:
print(res.result())
版本2:
1.线程池设置4个线程,异步发起10个任务,每个任务是通过网页获取源码+数据分析,并发的执行,最后将所有结果展示出来
2.耦合性增强了,并发执行任务,此任务最好是IO阻塞,才能发挥最大的效果
版本3:
基于异步调用,回收所有任务结果:实现实时回收结果,并发执行每个任务只是处理IO阻塞的,不能增加新的功能.
异步调用+回调函数
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import tim,random,os,requests
def task(url):
ret = requests.get(url) #模拟爬取多个源代码,一定有IO阻塞
if ret.status_code = 200:
return ret.text
def parse(obj):
print(len(obj.result())) #模拟对数据进行分析,一般没有IO
if __name__ == '__main__':
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.taobao.com'
'http://www.sina.com',
'http://www.gitee.com',
'http://www.zhihu.com',
'http://www.7k7k.com',
'http://www.JD.com',
'http://www.hao123.com',
'http://www.4399.com'
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task,url)
obj.add_done_callback(parse)
版本3:
线程池设置4个线程,异步发起10个任务,每个任务是通过网页获取源码,并发执行,当一个任务完成之后,将parse这个分析代码的任务交由剩余的空闲的线程去执行,此线程继续去处理其他任务
如果进程池+回调,回调函数由主进程去执行
如果线程池+回调,回调函数由空闲的线程去执行
异步与回调:
异步是站在发布任务的角度
站在接收结果的角度:回调函数(按顺序接收每个任务的结果,进行下一步处理)
异步+回调:
异步处理的IO类型
回调处理非IO类型