线程池
- 导包:
from multiprocessing.dummy import Pool
- 回调函数异步将可迭代对象中的元素进行某种操作
- 注意事项:callback必须有一个参数,且只能有一个参数
- 异步主要是被应用在耗时的操作
from multiprocessing.dummy import Pool
pool = Pool(3) # 实例化线程池对象,3是线程池的最大线程数
# 参数1:回调函数(只是函数名,不加括号);参数2:列表
# 参数1会接收参数2列表中的某一个元素,回调函数可以对该列表元素进行某种操作
pool.map(callback,list)
测试:同步&异步效率
搭建一个flask,自己启动服务,测试执行时间
- 新建一个
server.py
from flask import Flask, render_template
import time
app = Flask(__name__)
@app.route('/xx')
def index_1():
time.sleep(2)
return render_template('test.html')
@app.route('/yy')
def index_2():
time.sleep(2)
return render_template('test.html')
@app.route('/oo')
def index_3():
time.sleep(2)
return render_template('test.html')
if __name__ == '__main__':
app.run(debug=True)
- 新建一个
templates
文件夹,在该文件夹下创建一个HTML文件,我写的是test.html
,随便写点数据
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>测试</title>
</head>
<body>
<div>
<p>百里守约</p>
</div>
<div class="song">
<p>李清照</p>
<p>王安石</p>
<p>苏轼</p>
<p>柳宗元</p>
<a href="http://www.song.com/" title="赵匡胤" target="_self">
<span>this is span</span>
宋朝是最强大的王朝,不是军队的强大,而是经济很强大,国民都很有钱</a>
<a href="" class="du">总为浮云能蔽日,长安不见使人愁</a>
<img src="http://www.baidu.com/meinv.jpg" alt=""/>
</div>
<div class="tang">
<ul>
<li><a href="http://www.baidu.com" title="qing">清明时节雨纷纷,路上行人欲断魂,借问酒家何处有,牧童遥指杏花村</a></li>
<li><a href="http://www.163.com" title="qin">秦时明月汉时关,万里长征人未还,但使龙城飞将在,不教胡马度阴山</a></li>
<li><a href="http://www.126.com" id="qi">岐王宅里寻常见,崔九堂前几度闻,正是江南好风景,落花时节又逢君</a></li>
<li><a href="http://www.sina.com" class="du">杜甫</a></li>
<li><a href="http://www.dudu.com" class="du">杜牧</a></li>
<li><b>杜小月</b></li>
<li><i>度蜜月</i></li>
<li><a href="http://www.haha.com" id="feng">凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘</a></li>
</ul>
</div>
</body>
</html>
同步&异步执行时间
import requests
from bs4 import BeautifulSoup
import time
# 线程池模块
from multiprocessing.dummy import Pool
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
# 数据的爬取,返回爬取到的页面源码数据
def get_request(url):
page_text = requests.get(url=url).text
return page_text
# 数据的解析,返回标签的文本
def parse(page_text):
soup = BeautifulSoup(page_text, 'lxml')
return soup.select('#feng')[0].text
# 同步代码
if __name__ == '__main__':
start = time.time()
for url in urls:
page_text = get_request(url)
text_data = parse(page_text)
print(text_data)
print(time.time() - start)
"""
执行结果:
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
6.056272029876709
"""
# 异步代码
if __name__ == '__main__':
start = time.time()
pool = Pool(3) # 实例化线程池对象
# 参数1:回调函数(只是函数名,不加括号);参数2:列表
# 参数1会接收参数2列表中的某一个元素,回调函数可以对该列表元素进行某种操作
page_text_list = pool.map(get_request,urls)
text_data = pool.map(parse,page_text_list)
for i in text_data:
print(i)
print(time.time() - start)
"""
执行结果:
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
2.0537397861480713
不用for循环速度能提升0.01秒左右
"""
综上所述:异步代码执行效率显著提高
案例:基于线程池爬取梨视频
- 思路分析
- 爬取到视频详情页对应的url,存储到一个可迭代对象中
- 再次发送请求获取视频详情页真正的视频地址
- 注意:视频详情页的video是js代码动态生成的,需要用到正则解析
- 写一个callback,获取视频的二进制文件,持久化存储
import requests
from lxml import etree
from multiprocessing.dummy import Pool
import re
import os
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
# 梨视频财富板块的地址
main_url = 'https://www.pearvideo.com/category_3'
# 解析出该板块下视频详情页的src
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
# 线程池
video_urls = []
for li in li_list:
# 视频详情页的具体地址和视频标题
detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
name = li.xpath('./div/a/div[2]/text()')[0]
# 对详情页发起请求
page_text = requests.get(url=detail_url, headers=headers).text
# 视频详情页的video是js代码动态生成的,使用正则解析
ex = 'srcUrl="(.*?)",vdoUrl='
video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表类型
dic = {
'url': video_url,
'name': name,
}
video_urls.append(dic)
# 回调函数
def get_video(url):
# 对视频地址发请求,将二进制文件持久化存储
video_data = requests.get(url=url['url'], headers=headers).content
file_name = "./video/" + url['name'] + ".mp4"
with open(file_name, 'wb') as f:
f.write(video_data)
print(url['name'], "下载完毕!")
# 创建存储视频的文件夹
dir_name = 'video'
if not os.path.exists(dir_name):
os.mkdir(dir_name)
# 实例化线程池
pool = Pool(4)
pool.map(get_video, video_urls)
单线程+多任务的异步协程
asyncio
(重点)
特殊函数
- 如果一个函数的定义被async关键字修饰后,则该函数是一个特殊函数。
- 特殊之处:
- 该函数被调用后,函数内部的实现语句不会被立即执行。
- 该函数会返回一个协程对象
协程
-
协程就是一个对象。当特殊函数被调用后,该函数就会返回一个协程对象。
-
协程对象 == 特殊函数
import asyncio from time import sleep async def get_request(url): print('正在请求:', url) sleep(2) print('请求成功:', url) return '666' # 返回一个协程对象 g = get_request("https://www,qq.com")
任务对象
-
就是对协程对象的进一步封装(就是一个高级的协程对象)
-
任务对象 == 协程对象 == 特殊函数(表示某个固定形式的任务)
asyncio.ensure_future(协程对象) task = asyncio.ensure_future(g) # g:协程对象
-
绑定回调:
# 定义一个task的回调函数 def callback(task): task.result() # 表示的是当前任务对象对应的特殊函数的返回值 print("I'm callback:", task) task.add_done_callback(funcName) # task:任务对象 # funcName:回调函数的名称
funcName
这个回调函数必须要带一个参数,这个参数表示的就是当前的任务对象参数.result()
:表示的就是当前任务对象对应的特殊函数的返回值
事件循环对象
-
创建事件循环对象
-
需要将任务对象注册到该事件循环对象中
# 创建事件循环对象 loop = asyncio.get_event_loop() # 将任务对象注册/装载到事件循环对象中,然后需要启动循环对象 loop.run_until_complete(task) # 用于装载且启动事件循环 # task:任务对象
等待
await
:当阻塞操作结束后让loop回头执行阻塞之后的代码。
挂起
asyncio.wait()
:将当前的任务对象交出cpu的使用权。
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait # 挂起操作
tasks # 任务对象列表
重点注意事项
- 在特殊函数实现内部不可以出现不支持异步的模块代码,否则会中断异步效果
aiohttp
(重点)
-
requests
:不支持异步,不可以出现在特殊函数内部。 -
aiohttp
:支持异步的网络请求模块,和asyncio
一起使用pip install aiohttp
-
代码的编写
- 写出基本架构
import asyncio import aiohttp # 基于aiohttp实现异步的网络请求 async def get_requests(url): # 实例化了一个请求对象 with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: with aio.get(url=url) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_text = await response.text() return page_text
- 细节补充(代码参照完整代码)
- 在每一个
with
前加上async
关键字 - 在每一个阻塞操作前加上
await
关键字
- 在每一个
-
完整代码
import asyncio import aiohttp # 基于aiohttp实现异步的网络请求 async def get_requests(url): # 实例化了一个请求对象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_text = await response.text() return page_text
单任务协程操作
import asyncio
from time import sleep
async def get_request(url):
print('正在请求:', url)
sleep(2)
print('请求成功:', url)
return '666'
# 定义一个task的回调函数
def callback(task):
print("I'm callback:", task)
# 返回一个协程对象
g = get_request("https://www,qq.com")
# 创建一个任务对象
task = asyncio.ensure_future(g)
"""
# 给任务对象绑定回调函数
task.add_done_callback(callback)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象注册/装载到事件循环对象中,然后需要启动循环对象
loop.run_until_complete(task) # 用于装载且启动事件循环
"""
执行结果:
正在请求: www,qq.com
正在请求: www,qq.com
"""
多任务协程操作
import asyncio
import time
start = time.time()
async def get_request(url):
print('正在请求:', url)
# await 当阻塞操作结束后让loop回头执行阻塞之后的代码
await asyncio.sleep(2)
print('请求成功:', url)
return '666'
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
tasks = []
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
# 将任务列表注册到事件循环的时候一定要将任务列表进行挂起操作
# asyncio.wait() 挂起操作,将当前的任务对象交出cpu的使用权
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时:', time.time() - start)
单线程&多任务异步爬虫
基于Flask自测
- 测试代码在上述
测试:同步&异步效率
,按照上述步骤启动项目;然后运行下方代码。
import asyncio
import time
import aiohttp
from lxml import etree
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
urls = [
'http://127.0.0.1:5000/xx',
'http://127.0.0.1:5000/yy',
'http://127.0.0.1:5000/oo',
]
start = time.time()
"""
# 发起请求,获取响应数据(不可以实现异步)
async def get_requests(url):
# requests是不支持异步的模块
page_text = requests.get(url).text
return page_text
"""
async def get_requests(url):
"""
基于aiohttp实现异步的网络请求
:param url:
:return:
"""
# 实例化了一个请求对象
async with aiohttp.ClientSession() as aio:
# with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
async with await aio.get(url=url) as response:
# text() 获取字符串形式的响应数据
# read() 获取bytes类型的响应数据
page_text = await response.text()
return page_text
def parse(task):
"""
定义回调函数
:param task:
:return:
"""
page_text = task.result() # 获取特殊函数的返回值(请求到的页面源码数据)
tree = etree.HTML(page_text)
content = tree.xpath('//*[@id="feng"]/text()')[0]
print(content)
tasks = []
for url in urls:
c = get_requests(url)
task = asyncio.ensure_future(c)
task.add_done_callback(parse)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时:', time.time() - start)
案例:基于单线程多任务异步爬取梨视频
- 思路上述
案例:基于线程池爬取梨视频
import asyncio
import time
import aiohttp
from lxml import etree
import re
import os
import requests
# time模块是为了测试爬取视频的耗时
start = time.time()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
}
# 梨视频财富板块的地址
main_url = 'https://www.pearvideo.com/category_3'
main_page_text = requests.get(url=main_url, headers=headers).text
tree = etree.HTML(main_page_text)
li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
urls = [] # [{'url': video_url,'name': name},{}...]
for li in li_list:
detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
name = li.xpath('./div/a/div[2]/text()')[0]
page_text = requests.get(url=detail_url, headers=headers).text
# 视频详情页的video是js代码动态生成的
ex = 'srcUrl="(.*?)",vdoUrl='
video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表类型
dic = {
'url': video_url,
'name': name,
}
urls.append(dic)
# 基于aiohttp实现异步的网络请求
async def get_requests(url):
# 实例化了一个请求对象
async with aiohttp.ClientSession() as aio:
# with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
async with await aio.get(url=url['url'], headers=headers) as response:
# text() 获取字符串形式的响应数据
# read() 获取bytes类型的响应数据
page_read = await response.read()
dic = {
"page_read": page_read,
"name": url['name']
}
return dic
def parse(task):
"""
定义回调函数
:param task:
:return:
"""
dic_info = task.result() # 获取特殊函数的返回值(请求到的页面源码数据)
file_name = "./video/" + dic_info["name"] + ".mp4"
with open(file_name, 'wb') as f:
f.write(dic_info['page_read'])
print(dic_info["name"], "下载完毕!")
tasks = []
for url in urls:
c = get_requests(url)
task = asyncio.ensure_future(c)
task.add_done_callback(parse)
tasks.append(task)
dir_name = 'video'
if not os.path.exists(dir_name):
os.mkdir(dir_name)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时:', time.time() - start)