zoukankan      html  css  js  c++  java
  • 高性能异步爬虫

    异步IO

    所谓「异步 IO」,就是你发起一个 IO阻塞 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。

    举个例子,同步相当于打电话通知,异步相当于发微信通知。

    实现异步IO的方式

    单线程+异步协程实现异步IO操作

    异步协程用法

    从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。首先我们需要了解下面几个概念:


      • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行。程序是按照设定的顺序从头执行到尾,运行的次数也是完全按照设定。当在编写异步程序时,必然其中有部分程序的运行耗时是比较久的,需要先让出当前程序的控制权,让其在背后运行,让另一部分的程序先运行起来。当背后运行的程序完成后,也需要及时通知主程序已经完成任务可以进行下一步操作,但这个过程所需的时间是不确定的,需要主程序不断的监听状态,一旦收到了任务完成的消息,就开始进行下一步。loop就是这个持续不断的监视器。


      • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。


      • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。


      • future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。


    我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

    定义一个协程

    import asyncio
    async def execute(x):
        print('Number:', x)
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    print('After calling loop')
    
    -----------------
    输出结果
    Coroutine: 
    After calling execute
    Number: 1
    After calling loop

    我们引入了 asyncio 这个包,这样我们才可以使用 async 和 await,然后我们使用 async 定义了一个 execute() 方法,方法接收一个数字参数,方法执行之后会打印这个数字。

    随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop() 方法创建了一个事件循环 loop,并调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop 中,然后启动。最后我们才看到了 execute() 方法打印了输出结果。

    可见,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。

    task的使用:

     task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。

    在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明

    import asyncio
    async def execute(x):
        print('Number:', x)
        return x
    
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(coroutine)
    print('Task:', task)
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')
    
    ------------------
    输出结果
    Coroutine: 
    After calling execute
    Task: >
    Number: 1
    Task:  result=1>
    After calling loop

     ensure_future()的使用

    定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future() 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

    import asyncio
    
    async def execute(x):
        print('Number:', x)
        return x
    
    coroutine = execute(1)
    print('Coroutine:', coroutine)
    print('After calling execute')
    
    task = asyncio.ensure_future(coroutine)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    print('After calling loop')

    绑定回调

    也可以为某个 task 绑定一个回调方法,来看下面的例子:

    import asyncio
    import requests
    
    async def request():
        url = 'https://www.baidu.com'
        status = requests.get(url).status_code
        return status
    
    def callback(task):
        print('Status:', task.result())
    
    coroutine = request()
    task = asyncio.ensure_future(coroutine)
    task.add_done_callback(callback)
    print('Task:', task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task)
    print('Task:', task)
    
    
    ---------------------
    输出
    Task:  cb=[callback() at demo.py:11]>
    Status: 
    Task:  result=>

    我们定义了一个 request() 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print() 语句。随后我们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,

    然后调用 print() 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback() 方法。

    那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback() 方法即可,我们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback() 方法了,

    同时 task 对象还会作为参数传递给 callback() 方法,调用 task 对象的 result() 方法就可以获取返回结果了。

    多任务协程

    想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait() 方法即可执行。

    import asyncio
    import time
    
    
    async def request(url):
        print('正在下载', url)
        # 在异步协程中如果出现了同步模块相关的代码,那么就无法实现异步。
        # time.sleep(2)
        # 当在asyncio中遇到阻塞操作必须进行手动挂起
        await asyncio.sleep(2)
        print('下载完毕', url)
        return url
    
    # 回调函数
    def callback_func(task):
        print('callback ->' + task.result())  # callback对应函数的返回值
    
    
    start = time.time()
    urls = [
        'www.baidu.com',
        'www.sogou.com',
        'www.goubanjia.com'
    ]
    # 任务列表:存放多个任务对象
    stasks = []
    for url in urls:
        c = request(url)
        task = asyncio.ensure_future(c)
        # 将回调函数绑定到对象中
        task.add_done_callback(callback_func)
        stasks.append(task)
    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    
    # 需要将任务列表封装到wait中
    loop.run_until_complete(asyncio.wait(stasks))
    print(time.time() - start)
    
    -----
    输出:
    正在下载 www.baidu.com
    正在下载 www.sogou.com
    正在下载 www.goubanjia.com
    下载完毕 www.baidu.com
    下载完毕 www.sogou.com
    下载完毕 www.goubanjia.com
    callback ->www.baidu.com
    callback ->www.sogou.com
    callback ->www.goubanjia.com
    2.003114700317383

    高性能异步爬虫---aiohttp

    aiohttp可以实现单线程并发IO操作,

    安装pip install aiohttp

    aiohttp使用

    发起请求

    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.baidu.com') as resposne:
                print(await resposne.text())
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))

    添加请求参数

    params = {'key': 'value', 'page': 10}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.baidu.com/s',params=params) as resposne:
                print(await resposne.url)
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))

    UA伪装

    url = 'http://httpbin.org/user-agent'
    headers = {'User-Agent': 'test_user_agent'}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url,headers=headers) as resposne:
                print(await resposne.text())
    

    自定义cookies

    url = 'http://httpbin.org/cookies'
    cookies = {'cookies_name': 'test_cookies'}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url,cookies=cookies) as resposne:
                print(await resposne.text())
    

    post请求参数

    url = 'http://httpbin.org'
    payload = {'username': 'zhang', 'password': '123456'}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=payload) as resposne:
                print(await resposne.text())

    设置代理

    url = "http://python.org"
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url, proxy="http://some.proxy.com") as resposne:
            print(resposne.status)

    异步IO处理

    # 环境安装:pip install aiohttp
    # 使用该模块中的ClientSession
    import requests
    import asyncio
    import time
    import aiohttp
    
    
    async def get_page(url):
        async with aiohttp.ClientSession() as session:
            # get()、post():
            # headers,params/data,proxy='http://ip:port'
            async with await session.get(url) as response:
                # text()返回字符串形式的响应数据
                # read()返回的二进制形式的响应数据
                # json()返回的就是json对象
                # 注意:获取响应数据操作之前一定要使用await进行手动挂起
                page_text = await response.text()
                # print(page_text)
            return page_text
    
    # 回调函数
    
    
    def callback_func(task):
        print('callback ->' + task.result()[0:50])  # callback对应函数的返回值
    
    
    start = time.time()
    urls = [
        'https://www.baidu.com/',
        'https://www.sogou.com/',
        'https://www.douban.com/'
    ]
    # 任务列表:存放多个任务对象
    stasks = []
    for url in urls:
        c = get_page(url)
        task = asyncio.ensure_future(c)
        # 将回调函数绑定到对象中
        task.add_done_callback(callback_func)
        stasks.append(task)
    loop = asyncio.get_event_loop()
    
    # 需要将任务列表封装到wait中
    loop.run_until_complete(asyncio.wait(stasks))
    print(time.time() - start)
  • 相关阅读:
    Cocos2d-x之Vector<T>
    Cocos2d-x之Array
    Cocos2d-x之Value
    Cocos2d-x之String
    Cocos2d-x中使用的数据容器类
    Cocos2d-x之Action
    Cocos2d-x之定时器
    Cocos2d-x之MessageBox
    Cocos2d-x之Log输出机制
    Cocos2d-x之事件处理机制
  • 原文地址:https://www.cnblogs.com/xiao-apple36/p/12240094.html
Copyright © 2011-2022 走看看