zoukankan      html  css  js  c++  java
  • 爬虫—使用协程构建高性能爬虫

    使用协程构建高性能爬虫

    一、简介

      在执行一些 IO 密集型任务的时候,程序常常会因为等待 IO 而阻塞。比如在网络爬虫中,如果我们使用 requests 库来进行请求的话,如果网站响应速度过慢,程序一直在等待网站响应,最后导致其爬取效率是非常非常低的。为了解决这类问题,本文就来探讨一下 Python 中异步协程来加速的方法,此种方法对于 IO 密集型任务非常有效。如将其应用到网络爬虫中,爬取效率甚至可以成倍地提升。本文使用 async/await 来实现,需要 Python 3.5 及以上版本。

    二、概念介绍

    1.阻塞

      程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。包括 CPU 切换上下文时,多核 CPU 则正在执行上下文切换操作的核不可被利用。

    2.非阻塞

      程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞,以提高效率。

    3.同步

      不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。例如购物系统中更新商品库存,需要用“锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。同步意味着有序。

    4.异步

      不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定,异步是无序的。

    5.协程

      Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

      网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势

    6.异步协程asyncio

      Python 中使用协程最常用的库莫过于 asyncio。  

    • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
    • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
    • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
    • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

      另外我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

    三、代码实现

    1.首先我们使用Flask简单的实现一个服务器

      如果没有安装 Flask 的话可以执行如下命令安装:

    pip3 install flask

      然后编写服务器代码如下:

    from flask import Flask
    import time
    
    app = Flask(__name__)
    
    
    @app.route('/')
    def index():
        # 模拟耗时io
        time.sleep(2)
        return 'hello'
    
    
    if __name__ == '__main__':
        # 启动多线程模式
        app.run(threaded=True)

      这里我们定义了一个 Flask 服务,主入口是 index() 方法,方法里面先调用了 sleep() 方法休眠 2 秒,然后接着再返回结果,也就是说,每次请求这个接口至少要耗时 2 秒,这样我们就模拟了一个慢速的服务接口。

    2.使用asyncio进行测试

    import asyncio
    import requests
    import time
    
    start = time.time()
    
    
    async def get(url):
        return requests.get(url)
    
    
    async def request():
        url = 'http://127.0.0.1:5000'
        print('Waiting for ', url)
        response = await get(url)
        print('Get response from ', url, 'Result:', response.text)
    
    
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print(
        'Cost time:', end - start
    )

      在这里我们还是创建了五个 task,然后将 task 列表传给 wait() 方法并注册到时间循环中执行

      输出结果:

    Waiting for  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Waiting for  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Waiting for  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Waiting for  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Waiting for  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Cost time: 10.043976068496704

      可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 10 秒,平均一个请求耗时 2 秒,说好的异步处理呢?其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。要实现异步,接下来我们再了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕

      仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

    3.使用aiohttp

      aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。

      安装方式如下:

    pip3 install aiohttp

      官方文档链接为:https://aiohttp.readthedocs.io/,它分为两部分,一部分是 Client,一部分是 Server,详细的内容可以参考官方文档。

      

    import aiohttp
    import asyncio
    import time
    
    start = time.time()
    
    
    async def get(url):
        session = aiohttp.ClientSession()         # 实例化Clientsession()对象
        response = await session.get(url)         # 支持get(),post(),params/data,proxy='..'等参数
        result = await response.text()            # text()字符串,json()json类型,read()二进制
        await session.close()                     # 关闭资源,使用with语句可以自动释放
        return result
    
    
    
    
    async def request():
        url = 'http://127.0.0.1:5000'
        print('Waiting fro ', url)
        # result = await get(url)
        result = await get_w(url)
        print('Get response from ', url, 'Result:', result)
    
    
    tasks = [asyncio.ensure_future(request()) for _ in range(5)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Cost time:', end - start)

      输出结果如下:

    Waiting fro  http://127.0.0.1:5000
    Waiting fro  http://127.0.0.1:5000
    Waiting fro  http://127.0.0.1:5000
    Waiting fro  http://127.0.0.1:5000
    Waiting fro  http://127.0.0.1:5000
    Get response from  http://127.0.0.1:5000 Result: hello
    Get response from  http://127.0.0.1:5000 Result: hello
    Get response from  http://127.0.0.1:5000 Result: hello
    Get response from  http://127.0.0.1:5000 Result: hello
    Get response from  http://127.0.0.1:5000 Result: hello
    Cost time: 2.012542963027954

      开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第五个 task 的 session.get() 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,那咋办?只好等待了。2 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,2秒!

      上面的代码也可以配合with使用,

    # 使用with语句
    async def get_w(rul):
        async with aiohttp.ClientSession() as session:
            async with await  session.get(rul) as response:
                result = await response.text()
                return result

    4.与多进程进行结合使用aiomultiprocess,Python3.6以上版本适用

      安装方式:

    pip3 install aiomultiprocess
    import asyncio
    import aiohttp
    import time
    from aiomultiprocess import Pool
    
    start = time.time()
    
    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        result = await response.text()
        session.close()
        return result
    
    async def request():
        url 'http://127.0.0.1:5000'
        urls = [url for _ in range(100)]
        async with Pool() as pool:
            result = await pool.map(get, urls)
            return result
    
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    
    end = time.time()
    print('Cost time:', end - start)

      当然最后的耗时结果其实和异步是差不多的

      做爬取的时候遇到的情况千变万化,一方面我们使用异步协程来防止阻塞,另一方面我们使用 multiprocessing 来利用多核成倍加速,节省时间其实还是非常可观的。

  • 相关阅读:
    Java集合知识点小结
    集合类说明及区别
    微信翻译机器人实战
    细说:域名与SEO存在什么关系
    我为PHP摇旗呐喊!
    每个程序员都该知道的10大编码原则
    浏览器加载模式:window.onload和$(document).ready()
    修改Mysql编码集
    java实现LIS算法,出操队形问题
    [笔记]: 前向星 标签: 数据结构存储 2017-05-25 09:13 35人阅读 评论(0) 收藏
  • 原文地址:https://www.cnblogs.com/zivli/p/11657116.html
Copyright © 2011-2022 走看看