zoukankan      html  css  js  c++  java
  • 在爬虫中使用单线程异步协程,包含单任务和多任务,以及数据解析使用回调函数

    aiohttp简介

    aiohttp可以实现单线程并发IO操作,用他来代替非异步模块request来发送请求,请求中的ua,headers,和参数都可以添加,添加方法如下:

    环境安装

    pip install aiohttp

    aiohttp使用

    1.发起请求

    复制代码
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.baidu.com') as resposne:
                print(await resposne.text())
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    2.添加请求参数的方法:

    复制代码
    params = {'key': 'value', 'page': 10}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://www.baidu.com/s',params=params) as resposne:
                print(await resposne.url)
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    3.UA伪装的添加方法:

    复制代码
    url = 'http://httpbin.org/user-agent'
    headers = {'User-Agent': 'test_user_agent'}
    
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url,headers=headers) as resposne:
                print(await resposne.text())
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    4.自定义cookies的方法:

    复制代码
    url = 'http://httpbin.org/cookies'
    cookies = {'cookies_name': 'test_cookies'}
    
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url,cookies=cookies) as resposne:
                print(await resposne.text())
              
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(),]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    5.post请求参数

    复制代码
    url = 'http://httpbin.org'
    payload = {'username': 'zhang', 'password': '123456'}
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=payload) as resposne:
                print(await resposne.text())
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(), ]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    6.设置代理

    复制代码
    url = "http://python.org"
    async def fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(url, proxy="http://some.proxy.com") as resposne:
            print(resposne.status)
    
    loop = asyncio.get_event_loop()
    tasks = [fetch(), ]
    loop.run_until_complete(asyncio.wait(tasks))
    复制代码

    异步IO处理

    复制代码
    # 环境安装:pip install aiohttp
    # 使用该模块中的ClientSession
    import requests
    import asyncio
    import time
    import aiohttp
    
    start = time.time()
    urls = [
        'http://127.0.0.1:5000/tiger','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',
        'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
        'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
        'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
    
    ]
    
    async def get_page(url):
        async with aiohttp.ClientSession() as session:
            #get()、post():
            #headers,params/data,proxy='http://ip:port'
            async with await session.get(url) as response:
                #text()返回字符串形式的响应数据
                #read()返回的二进制形式的响应数据
                #json()返回的就是json对象
                #注意:获取响应数据操作之前一定要使用await进行手动挂起
                page_text = await response.text()
                print(page_text)
    
    tasks = []
    
    for url in urls:
        c = get_page(url)
        task = asyncio.ensure_future(c)
        tasks.append(task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    
    print('总耗时:',end-start)
    复制代码
    # 使用aiohttp替代requests模块
    import time
    import asyncio
    import aiohttp
    
    async def get_page(url):
        async with aiohttp.ClientSession() as session:
            # 只要有耗时就会有阻塞,就得使用await进行挂起操作
            async with await session.get(url=url) as response:
                page_text = await response.text() # 二进制read()/json()
                print('响应数据', page_text)
    
    start = time.time()
    urls = [
        'http://127.0.0.1:5000/tiger',
        'http://127.0.0.1:5000/jay',
        'http://127.0.0.1:5000/tom',
    ]
    loop = asyncio.get_event_loop()
    
    tasks = []
    for url in urls:
        cone = get_page(url)
        task = asyncio.ensure_future(cone)
        tasks.append(task)
    
    loop.run_until_complete(asyncio.wait(tasks))
    print('总耗时: ', time.time()-start)
    支持异步的网络请求模块: aiohttp

    在这里我们将请求库由 requests 改成了 aiohttp,通过 aiohttp 的 ClientSession 类的 get() 方法进行请求,结果如下:

    复制代码
    Hello tom
    Hello jay
    Hello tiger
    Hello tiger
    Hello jay
    Hello tiger
    Hello tom
    Hello jay
    Hello jay
    Hello tom
    Hello tom
    Hello tiger
    总耗时: 2.037203073501587
    复制代码

    成功了!我们发现这次请求的耗时由 6秒变成了 2 秒,耗时直接变成了原来的 1/3。

    代码里面我们使用了 await,后面跟了 get() 方法,在执行这五个协程的时候,如果遇到了 await,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。

    开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第五个 task 的 session.get() 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,那咋办?只好等待了。3 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,3 秒!

    怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等着,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。

    可见,使用了异步协程之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升可谓是非常可观了。

    如何实现数据解析--任务的绑定回调机制(完整的协程过程)

    复制代码
    import time
    import asyncio
    import aiohttp
    
    # 回调函数: 主要用来解析响应数据
    def callback(task):
        print('This is callback')
        # 获取响应数据
        page_text = task.result()
        print("接下来就可以在回调函数中实现数据解析")
    
    async def get_page(url):
        async with aiohttp.ClientSession() as session:
            # 只要有耗时就会有阻塞,就得使用await进行挂起操作
            async with await session.get(url=url) as response:
                page_text = await response.text() # 二进制read()/json()
                print('响应数据', page_text)
                return page_text
    
    start = time.time()
    urls = [
        'http://127.0.0.1:5000/tiger',
        'http://127.0.0.1:5000/jay',
        'http://127.0.0.1:5000/tom',
    ]
    #第一步产生事件循环对象 loop = asyncio.get_event_loop()
    #任务列表 tasks = [] for url in urls: cone = get_page(url)
      #第二步将协程函数对象放进任务中 task = asyncio.ensure_future(cone) # 给任务对象绑定回调函数用于解析响应数据 task.add_done_callback(callback)
      #第三步将所有的任务添加到任务列表中 tasks.append(task) #第四步运行事件循环对象,asyncio.wait()来实现多任务自动在循环中运行 loop.run_until_complete(asyncio.wait(tasks)) print('总耗时: ', time.time()-start)
  • 相关阅读:
    uni-app开发经验分享九: 组件传值
    uni-app开发经验分享八: 实现微信APP支付的全过程详解
    CSS3+JS完美实现放大镜模式
    原生JS拖拽
    LeetCode 797. 所有可能的路径
    面试题 02.01. 移除重复节点
    LeetCode 139. 单词拆分
    LeetCode 1436. 旅行终点站
    LeetCode 16. 最接近的三数之和
    Hadoop 3.13在Ubuntu20.04上的单机伪分布式配置
  • 原文地址:https://www.cnblogs.com/caiwenjun/p/11761736.html
Copyright © 2011-2022 走看看