# 如何提升requests模块爬取数据的效率?
- 多进程或多线程(不建议) 太耗费资源
- 线程池或进程池(适当使用)
- 单线程 + 异步协程(推荐)
# 线程池使用案例
# 梨视频 下载作业
import random
from lxml import etree
from multiprocessing.dummy import Pool # 线程
import requests
import re
url = 'https://www.pearvideo.com/category_3'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36'}
page_text = requests.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
all_video = tree.xpath('//ul[@id="listvideoListUl"]/li/div/a/@href')
mp4_list = []
for video in all_video:
video_url = 'https://www.pearvideo.com/%s' % video
page_video = requests.get(url=video_url, headers=headers).text
tree = etree.HTML(page_video)
name1 = tree.xpath('//*[@id="detailsbd"]/div[1]/div[2]/div/div[1]/h1/text()')[0]
mp4_url = re.findall('srcUrl="(.*?)",vdoUrl', page_video, re.S)[0]
mp4_list.append(mp4_url)
pool = Pool(4) # 将耗时严重的任务异步处理,实例化一个线程池对象
# 视频二进制流获取
def mp4_request(url):
return requests.get(url=url, headers=headers).content
# 数据持久化存储
def mp4_save(mp4_data):
name = str(random.randint(0,9999))+'.mp4' # 随机生成name
with open("./%s.mp4" % name, 'wb') as f:
f.write(mp4_data)
print(name, ',download ok')
mp4_data_list = pool.map(mp4_request, mp4_list) # 获取二进制流
pool.map(mp4_save, mp4_data_list) # data持久化存储
print('Task is OK!') # 任务结束的提醒
pool.close() #关闭线程池
# 下面是 带真实名字的版本
from lxml import etree
from multiprocessing.dummy import Pool
import requests
import re
url = 'https://www.pearvideo.com/category_3'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36'}
page_text = requests.get(url=url, headers=headers).text
tree = etree.HTML(page_text)
all_video = tree.xpath('//ul[@id="listvideoListUl"]/li/div/a/@href')
mp4_list = []
for video in all_video:
video_url = 'https://www.pearvideo.com/%s' % video
page_video = requests.get(url=video_url, headers=headers).text
tree = etree.HTML(page_video)
name = tree.xpath('//*[@id="detailsbd"]/div[1]/div[2]/div/div[1]/h1/text()')[0]
mp4_url = re.findall('srcUrl="(.*?)",vdoUrl', page_video, re.S)[0]
mp4_list.append({name:mp4_url})
mp4_list = mp4_list[2:]
print(mp4_list)
pool = Pool(4) # 将耗时的任务异步处理,实例化一个线程池对象
# 视频二进制流获取
def mp4_request(url):
return {list(url.keys())[0]:requests.get(url=list(url.values())[0], headers=headers).content}
# 数据持久化存储
def mp4_save(mp4_data):
with open("./%s.mp4" % list(mp4_data.keys())[0], 'wb') as f:
f.write(list(mp4_data.values())[0])
print(list(mp4_data.keys())[0], ',download ok')
mp4_data_list = pool.map(mp4_request, mp4_list) # 获取二进制流
pool.map(mp4_save, mp4_data_list) # data持久化存储
print('Task is OK!')
# -- 下面内容都是异步的内容 --
# 基本使用
# 异步轮询的执行
import asyncio
async def hello(name):
print('hello to:',name)
c = hello('zc')#调用 返回协程对象<coroutine协程 object hello at 0x0000000005EDDE08>
# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 将协程对象注册到事件循环中,然后启动事件循环对象
loop.run_until_complete(c) # 输出hello to: zc
# task 的使用 单任务协程
import asyncio
async def hello(name):
print('hello to:',name)
c = hello('zc')
# 创建一个事件循环对象
loop = asyncio.get_event_loop()
# 就协程进行进一步封装,封装到了task对象中
task = loop.create_task(c)
print(task)
loop.run_until_complete(task)
print(task)
# future 的使用
import asyncio
async def hello(name):
print('hello to:',name)
c = hello('zc')
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)
print(task)
loop.run_until_complete(task)
print(task)
# furure 绑定回调
import asyncio
def callback(task): # 回调函数
print('I am callback', task.result()) #接收task的return
async def hello(name):
print('hello to:', name)
return name
c = hello('zc') #创建协程对象
loop = asyncio.get_event_loop() # 创建loop实例
task = asyncio.ensure_future(c) # 任务对象
print(task)
task.add_done_callback(callback) # 添加要执行的回调函数
loop.run_until_complete(task) # 当任务设定完成开始执行
print(task)
# 单线程+多任务异步协程
# 支持异步的网络请求的模块 pip install aiohttp
import asyncio
import aiohttp
import time
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url=url) as response:
page_text = await response.text() # read() 二进制形式的响应数据,json()
print('响应数据:',page_text)
# print('ok %s'%url)
start = time.time()
urls = [
'http://127.0.0.1:5000/bobo',
'http://127.0.0.1:5000/jay',
'http://127.0.0.1:5000/tom',
]
tasks = [] #任务列表 放置多个任务对象
loop = asyncio.get_event_loop()
for url in urls:
c = get_page(url) #创建协程对象
task = asyncio.ensure_future(c) #创建任务对象
tasks.append(task) #添加到任务列表里面
loop.run_until_complete(asyncio.wait(tasks)) #将多个任务对象对应的列表注册到事件循环中
print('总耗时',time.time()-start)
# -- 下面是输出结果 --
# downloading http://127.0.0.1:5000/bobo
# downloading http://127.0.0.1:5000/jay
# downloading http://127.0.0.1:5000/tom
# 下载 ok http://127.0.0.1:5000/bobo
# 下载 ok http://127.0.0.1:5000/jay
# 下载 ok http://127.0.0.1:5000/tom
# 总耗时 2.0021142959594727
# 基于python的flask框架实现的简单的Web服务器,代码:
from flask import Flask
import time
app = Flask(__name__)
@app.route('/bobo')
def index_bobo():
time.sleep(2)
return 'Hello bobo'
@app.route('/jay')
def index_jay():
time.sleep(2)
return 'Hello jay'
@app.route('/tom')
def index_tom():
time.sleep(2)
return 'Hello tom'
if __name__ == '__main__':
app.run(threaded=True)
# 真实网站请求的 高性能异步IO
import asyncio
import aiohttp
import time
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url=url) as response:
page_text = await response.text() # read() 二进制形式的响应数据,json()
# print('响应数据:',page_text)
print('ok %s'%url)
start = time.time()
urls = [
'https://baidu.com',
'https://y.qq.com',
'https://www.taobao.com',
]
tasks = [] #任务列表 放置多个任务对象
loop = asyncio.get_event_loop()
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
# 将多个任务对象对应的列表注册到事件循环中
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时',time.time()-start)
0 and False => 0
0 or False => False