阻塞
调用结果返回之前, 当前线程会被挂起(如遇到IO操作). 函数只有在得到结果之后才会将阻塞的线程激活.
非阻塞
和阻塞的概念相对应, 指在不能立即得到结果之前也会立刻返回, 同时该函数不会阻塞当前线程
同步
在一个功能调用时, 在没有得到结果之前, 该调用就不会返回
异步
和同步相对, 当一个异步功能调用发出后, 调用者不能立刻得到结果.
总结
同步与异步针对的是函数/任务的调用方式: 同步就是当一个进程发起一个函数(任务)调用的时候, 一直等到函数(任务)完成, 而进程继续处于激活状态. 而异步情况下是当一个进程发起一个函数(任务)调用的时候, 不会等函数返回, 而是继续往下执行, 函数返回的时候通过状态, 通知, 事件等方式通知进程任务完成
阻塞与非阻塞针对的是进程或线程: 阻塞是当请求不能满足的时候就将进程挂起, 而非阻塞则不会阻塞当前进程
同步调用 异步调用
同步调用
apply 一个累计1亿次的任务, 该调用会一直等待, 知道任务返回结果为止, 但并未阻塞住(即便是被抢走CPU的执行权限, 那也是处于就绪状态)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import os
import random
def task(i):
print(f"{os.getpid()}开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}任务结束")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i)
print(f"任务结果:{obj.result()}")
# obj 是一个动态对象, 返回的当前的对象的状态, 有可能运行中, 可能就绪阻塞, 还可能是结束了
# obj.result() 必须等到这个任务完成后, 返回了结果之后, 再执行下一个任务
pool.shutdown()
# shutdown 让主进程等待进程池中所有的子进程都结束任务之后再执行, 类似join
# 在上一个进程池没有完成所有的任务之前, 不允许添加新的任务
# 一个任务是通过一个函数实现的, 任务完成了他的返回值就是函数的返回值
print("===主")
异步调用
发起异步调用后, 并不会等待任务结束才返回, 相反, 会立即获取一个临时结果(并不是最终的结果, 可能是封装好的一个对象)
from concurrent.futures import ProcessPoolExecutor
import time
import os
import random
def task(i):
print(f"{os.getpid()}开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}任务结束")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i)
pool.shutdown()
print("===主")
异步调用如何取结果
统一回收结果
不能马上收到任何一个已经完成的任务的返回值, 只能等到所有的任务全部结束统一回收
from concurrent.futures import ProcessPoolExecutor
import time
import os
import random
def task(i):
print(f"{os.getpid()}开始任务")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}任务结束")
return i
if __name__ == '__main__':
lst = []
pool = ProcessPoolExecutor()
for i in range(10):
obj = pool.submit(task, i)
lst.append(obj)
pool.shutdown()
print(lst)
for i in lst:
print(i.result())
print("===主")
完成任务后立刻返回结果
异步调用 + 回调函数
异步调用 + 回调函数
浏览器
工作原理:
向服务端发送一个请求, 服务端验证你的请求, 如果正确, 给你的浏览器返回一个文件, 浏览器接收到文件, 将文件里面的代码渲染成你看到的漂亮美丽的样子.
爬虫
利用代码模拟一个浏览器, 进行浏览器的工作流程得到一堆源代码.
对源代码进行数据清洗得到我想要的数据
import requests
ret = requests.get("http://www.baidu.com")
if ret.status_code == 200:
print(ret.text)
回调函数
版本一
import requests
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/']
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task, url)
obj_list.append(obj)
pool.shutdown()
for res in obj_list:
print(parse(res.result()))
print("==主")
线程池设置4 个线程, 异步发起10 个任务, 每个任务是通过网页获取源码, 并发执行, 最后统一用列表回收10 个任务, 串行着分析源码
缺点:
1. 异步发出10个任务, 并发的执行, 但是统一的接收所有的任务的返回值(效率低, 不能实时的获取结果)
2. 分析结果流程是串行, 影响效率
针对版本一的缺点 2 , 改进, 让串行编程并发或者并行
版本二
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
import requests
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/']
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task, url)
obj_list.append(obj)
pool.shutdown()
for res in obj_list:
print(parse(res.result()))
print("==主")
线程池设置4 个线程, 异步发起10个任务, 每个任务是通过网页获取源码 + 数据分析, 并发执行, 最后将所有的结果展示出来.
耦合性增强了
并发执行任务, 次任务最好是IO阻塞, 才能发挥最大的效果
版本三
基于异步调用回收所有任务的结果, 做到实时回收, 并发执行任务每个任务只是处理 IO 阻塞的, 不能增加新的功能
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
print(len(obj.result()))
if __name__ == '__main__':
url_lst = ["http://www.baidu.com",
"http://www.JD.com",
"http://www.JD.com",
"http://www.JD.com",
"http://www.taobao.com",
"https://www.cnblogs.com/jin-xin/articles/7459977.html",
"https://www.luffycity.com/",
"https://www.cnblogs.com/jin-xin/articles/9811379.html",
"https://www.cnblogs.com/jin-xin/articles/11245654.html",
"https://www.sina.com.cn/"]
pool = ThreadPoolExecutor(4)
for url in url_lst:
obj = pool.submit(task, url)
obj.add_done_callback(parse)
当线程池设置4个线程, 异步发起10个任务, 每个任务是通过网页获取源码, 并发执行, 当一个任务完成之后, 将parse这个分析代码的任务交由剩余的空闲的线程去执行, 你这个线程继续去处理其他任务.
进程池 + 回调函数
回调函数由主进程去执行
线程池 + 回调函数
回到函数由空闲的线程去执行
小结:
异步站在发布任务的角度
回调站在接收结果的角度, 按顺序接收每个任务的结果, 进行下一步处理
异步处理的IO类型
回调处理非IO