zoukankan      html  css  js  c++  java
  • 爬虫的高效率解决方式--单线程+异步协程, 线程池爬虫

    标准的异步协程http请求的模板,asyncio, aiohttp

    import asyncio
    import aiohttp
    #在实现该函数的时候,其函数实现内部不可以出现非异步模块的代码
    async def request(url):
       async with aiohttp.ClientSession() as s:
           async with await s.get(url=url) as response:
                page_text = await response.text()
    
                return page_text
    
    def callback(task):
        print(task.result())
    def callback1(task):
        print(task.result())
    #事件循环对象:
    loop = asyncio.get_event_loop()
    c = request('https://www.baidu.com')
    c1 = request('https://www.sogou.com')
    task = asyncio.ensure_future(c)
    task.add_done_callback(callback)
    
    task1 = asyncio.ensure_future(c1)
    task1.add_done_callback(callback1)
    
    tasks = [task1,task]
    loop.run_until_complete(asyncio.wait(tasks))
    View Code

    使用线程池的异步爬虫

      对比一下同步爬取和异步爬取的时间, 使用time.sleep()模拟请求访问的延迟.

    1.同步请求

    from time import sleep
    import time
    from multiprocessing.dummy import Pool
    def request(url):
        print('正在请求:',url)
        sleep(2)
        print('下载成功:',url)
    start = time.time()
    urls = ['www.baidu.com','www.sogou.com','www.goubanjia.com']
    
    for url in urls:
        request(url)
    print(time.time()-start)

    2.使用线程池

    from time import sleep
    import time
    from multiprocessing.dummy import Pool
    def request(url):
        print('正在请求:',url)
        sleep(2)
        print('下载成功:',url)
    start = time.time()
    urls = ['www.baidu.com','www.sogou.com','www.goubanjia.com']
    
    pool = Pool(3)
    pool.map(request,urls)
    
    print(time.time()-start)

    • 示例: 使用线程池爬取梨视频中的视频数据
      • 社会分类下的前5个视频
    from multiprocessing.dummy import Pool
    import requests
    from lxml import etree
    import re
    import random
    
    url = 'https://www.pearvideo.com/category_1'
    
    headers = {
        "User-Agent":'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
    }
    
    response = requests.get(url=url,headers=headers).text
    tree = etree.HTML(response)
    li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
    video_list = []
    for li in li_list:
        video = li.xpath('./div/a/@href')[0]
        detail_url = 'https://www.pearvideo.com/' + video
        detail_page_text = requests.get(url=detail_url,headers=headers).text
        ret = re.findall(',srcUrl="(.*?)",',detail_page_text,re.S)[0]
        print(ret)
        video_list.append(ret)
       
    def get_video_data(url):
        return requests.get(url=url,headers=headers).content
    
    
    def save_video(data):
        name = str(random.randint(0,9999)) + '.mp4'
        with open(name,'wb') as f:
            f.write(data)
        print(name,"下载成功")
    
    # 四个视频的二进制数据组成的列表
    all_video_data_list = pool.map(get_video_data,video_list)
    pool = Pool(4)
    pool.map(save_video,all_video_data_list)
    viewcode

    协程基础

      异步的效率比较高,而线程的开销比进程的小,协程的开销又比线程的小,所以在爬虫时,推荐采用单线程+异步协程的方式,可以提高很大的效率

    • event_loop:事件循环,相当于一个无限循环,我们可以把一些特殊函数注册(放置)到这个事件循环上,当满足某些条件的时候,函数就会被循环执行。程序是按照设定的顺序从头执行到尾,运行的次数也是完全按照设定。当在编写异步程序时,必然其中有部分程序的运行耗时是比较久的,需要先让出当前程序的控制权,让其在背后运行,让另一部分的程序先运行起来。当背后运行的程序完成后,也需要及时通知主程序已经完成任务可以进行下一步操作,但这个过程所需的时间是不确定的,需要主程序不断的监听状态,一旦收到了任务完成的消息,就开始进行下一步。loop就是这个持续不断的监视器。
    • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
    • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
    • future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。

       另外我们还需要了解 async/await 关键字,它是从 Python 3.6 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。官方文档

    为了测试以下两部分代码,我们可以自己搭建一个服务器用来保证阻塞时间,使用 flask简易搭建

    from flask import Flask
    import time
    
    app = Flask(__name__)
    
    
    @app.route('/test1')
    def index_bobo():
        time.sleep(2)
        return 'Hello test1'
    
    @app.route('/test2')
    def index_jay():
        time.sleep(2)
        return 'Hello test2'
    
    @app.route('/test3')
    def index_tom():
        time.sleep(2)
        return 'Hello test3'
    
    if __name__ == '__main__':
        app.run(threaded=True)
    flaskServer

     1.协程的基本使用,asynico模块

    import asyncio
    async def request(url):
        print('正在请求:',url)
        print('下载成功:',url)
    
    c = request('www.baidu.com')
    
    #实例化一个事件循环对象
    loop = asyncio.get_event_loop()
    #创建一个任务对象,将协程对象封装到了该对象中
    # task = loop.create_task(c)
    #另一种形式实例化任务对象的方法
    task = asyncio.ensure_future(c)
    
    print(task)
    
    #将协程对象注册到事件循环对象中,并且我们需要启动事件循环对象
    loop.run_until_complete(task)
    
    print(task)

    2.给任务对象绑定回调函数

    import asyncio
    
    async def request(url):
        print('正在请求:',url)
        print('下载成功:',url)
        return url
    
    #回调函数必须有一个参数:task
    #task.result():任务对象中封装的协程对象对应的特殊函数内部的返回值
    def callbak(task):
        print('this is callback!')
        print(task.result())
    
    c = request('www.baidu.com')
    
    #给任务对象绑定一个回调函数
    task = asyncio.ensure_future(c)
    task.add_done_callback(callbak)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)

    3.多任务异步协程

      这里注意,当使用time.sleep()模拟阻塞时,会报错,原因是异步协程中,不可以出现不支持异步的相关代码,而且协程的函数前要加关键字await,否则会报错

    from time import sleep
    import asyncio
    import time
    urls = ['www.baidu.com','www.sogou.com','www.goubanjia.com']
    start = time.time()
    async def request(url):
        print('正在请求:',url)
        #在多任务异步协程实现中,不可以出现不支持异步的相关代码。
        # sleep(2)
        await asyncio.sleep(2)
        print('下载成功:',url)
    
    loop = asyncio.get_event_loop()
    #任务列表:放置多个任务对象
    tasks = []
    for url in urls:
        c = request(url)
        task = asyncio.ensure_future(c)
        tasks.append(task)
    
    loop.run_until_complete(asyncio.wait(tasks))
    
    print(time.time()-start)

    4.多任务异步协程应用在爬虫当中

       当我们简单应用到使用requests的模块爬虫程序中时,发现并没有提高效果,是因为requests模块不支持异步操作,所以我们使用aiohttp进行请求

    requests模块不支持异步

    所以应该使用aiohttp进行请求访问

    import requests
    import asyncio
    import time
    import aiohttp
    #单线程+多任务异步协程
    urls = [
        'http://127.0.0.1:5000/test1',
        'http://127.0.0.1:5000/test2',
        'http://127.0.0.1:5000/test3'
    ]
    #代理操作:
    #async with await s.get(url,proxy="http://ip:port") as response:
    async def get_pageText(url):
       async with aiohttp.ClientSession() as s:
          async with await s.get(url) as response:
               page_text = await response.text()
                # 借助于回调函数进行响应数据的解析操作
               return page_text
    #封装回调函数用于数据解析
    def parse(task):
        #1.获取响应数据
        page_text = task.result()
        print(page_text+',即将进行数据解析!!!')
        #解析操作写在该位置
    
    start = time.time()
    tasks = []
    for url in urls:
        c = get_pageText(url)
        task = asyncio.ensure_future(c)
        #给任务对象绑定回调函数用于数据解析
        task.add_done_callback(parse)
        tasks.append(task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    print(time.time()-start)
  • 相关阅读:
    [uoj173]鏖战表达式
    [cf1168E]Xor Permutations
    [cf578F]Mirror Box
    [cf1261F]Xor-Set
    [loj2506]tree
    [atARC068F]Solitaire
    [atARC066F]Contest with Drinks Hard
    [cf1270I]Xor on Figures
    [cf516D]Drazil and Morning Exercise
    无题
  • 原文地址:https://www.cnblogs.com/robertx/p/10951219.html
Copyright © 2011-2022 走看看