zoukankan      html  css  js  c++  java
  • 爬虫之线程&协程&异步

    线程池

    • 导包:from multiprocessing.dummy import Pool
    • 回调函数异步将可迭代对象中的元素进行某种操作
      • 注意事项:callback必须有一个参数,且只能有一个参数
    • 异步主要是被应用在耗时的操作
    from multiprocessing.dummy import Pool
    
    pool = Pool(3)  # 实例化线程池对象,3是线程池的最大线程数
    # 参数1:回调函数(只是函数名,不加括号);参数2:列表
    # 参数1会接收参数2列表中的某一个元素,回调函数可以对该列表元素进行某种操作
    pool.map(callback,list)
    

    测试:同步&异步效率

    搭建一个flask,自己启动服务,测试执行时间

    • 新建一个server.py
    from flask import Flask, render_template
    import time
    
    app = Flask(__name__)
    
    
    @app.route('/xx')
    def index_1():
        time.sleep(2)
        return render_template('test.html')
    
    
    @app.route('/yy')
    def index_2():
        time.sleep(2)
        return render_template('test.html')
    
    
    @app.route('/oo')
    def index_3():
        time.sleep(2)
        return render_template('test.html')
    
    
    if __name__ == '__main__':
        app.run(debug=True)
    
    • 新建一个templates文件夹,在该文件夹下创建一个HTML文件,我写的是test.html,随便写点数据
    <html lang="en">
    <head>
        <meta charset="UTF-8"/>
        <title>测试</title>
    </head>
    <body>
    <div>
        <p>百里守约</p>
    </div>
    <div class="song">
        <p>李清照</p>
        <p>王安石</p>
        <p>苏轼</p>
        <p>柳宗元</p>
        <a href="http://www.song.com/" title="赵匡胤" target="_self">
            <span>this is span</span>
            宋朝是最强大的王朝,不是军队的强大,而是经济很强大,国民都很有钱</a>
        <a href="" class="du">总为浮云能蔽日,长安不见使人愁</a>
        <img src="http://www.baidu.com/meinv.jpg" alt=""/>
    </div>
    <div class="tang">
        <ul>
            <li><a href="http://www.baidu.com" title="qing">清明时节雨纷纷,路上行人欲断魂,借问酒家何处有,牧童遥指杏花村</a></li>
            <li><a href="http://www.163.com" title="qin">秦时明月汉时关,万里长征人未还,但使龙城飞将在,不教胡马度阴山</a></li>
            <li><a href="http://www.126.com" id="qi">岐王宅里寻常见,崔九堂前几度闻,正是江南好风景,落花时节又逢君</a></li>
            <li><a href="http://www.sina.com" class="du">杜甫</a></li>
            <li><a href="http://www.dudu.com" class="du">杜牧</a></li>
            <li><b>杜小月</b></li>
            <li><i>度蜜月</i></li>
            <li><a href="http://www.haha.com" id="feng">凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘</a></li>
        </ul>
    </div>
    </body>
    </html>
    

    同步&异步执行时间

    import requests
    from bs4 import BeautifulSoup
    import time
    # 线程池模块
    from multiprocessing.dummy import Pool
    
    urls = [
        'http://127.0.0.1:5000/xx',
        'http://127.0.0.1:5000/yy',
        'http://127.0.0.1:5000/oo',
    ]
    
    # 数据的爬取,返回爬取到的页面源码数据
    def get_request(url):
        page_text = requests.get(url=url).text
        return page_text
    
    # 数据的解析,返回标签的文本
    def parse(page_text):
        soup = BeautifulSoup(page_text, 'lxml')
        return soup.select('#feng')[0].text
    
    # 同步代码
    if __name__ == '__main__':
        start = time.time()
        for url in urls:
            page_text = get_request(url)
            text_data = parse(page_text)
            print(text_data)
        print(time.time() - start)
    """
    执行结果:
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    6.056272029876709
    """
    
    # 异步代码
    if __name__ == '__main__':
        start = time.time()
        pool = Pool(3)  # 实例化线程池对象
        # 参数1:回调函数(只是函数名,不加括号);参数2:列表
        # 参数1会接收参数2列表中的某一个元素,回调函数可以对该列表元素进行某种操作
        page_text_list = pool.map(get_request,urls)
        text_data = pool.map(parse,page_text_list)
        for i in text_data:
            print(i)
        print(time.time() - start)
    """
    执行结果:
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘
    2.0537397861480713
    
    不用for循环速度能提升0.01秒左右
    """
    

    综上所述:异步代码执行效率显著提高

    案例:基于线程池爬取梨视频

    • 思路分析
      • 爬取到视频详情页对应的url,存储到一个可迭代对象中
      • 再次发送请求获取视频详情页真正的视频地址
        • 注意:视频详情页的video是js代码动态生成的,需要用到正则解析
      • 写一个callback,获取视频的二进制文件,持久化存储
    import requests
    from lxml import etree
    from multiprocessing.dummy import Pool
    import re
    import os
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
    }
    
    # 梨视频财富板块的地址
    main_url = 'https://www.pearvideo.com/category_3'
    # 解析出该板块下视频详情页的src
    main_page_text = requests.get(url=main_url, headers=headers).text
    tree = etree.HTML(main_page_text)
    li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
    # 线程池
    video_urls = []
    for li in li_list:
        # 视频详情页的具体地址和视频标题
        detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
        name = li.xpath('./div/a/div[2]/text()')[0]
        # 对详情页发起请求
        page_text = requests.get(url=detail_url, headers=headers).text
        # 视频详情页的video是js代码动态生成的,使用正则解析
        ex = 'srcUrl="(.*?)",vdoUrl='
        video_url = re.findall(ex, page_text, re.S)[0]  # 返回的是列表类型
        dic = {
            'url': video_url,
            'name': name,
        }
        video_urls.append(dic)
    
    # 回调函数
    def get_video(url):
        # 对视频地址发请求,将二进制文件持久化存储
        video_data = requests.get(url=url['url'], headers=headers).content
        file_name = "./video/" + url['name'] + ".mp4"
        with open(file_name, 'wb') as f:
            f.write(video_data)
            print(url['name'], "下载完毕!")
    
    # 创建存储视频的文件夹
    dir_name = 'video'
    if not os.path.exists(dir_name):
        os.mkdir(dir_name)
    # 实例化线程池
    pool = Pool(4)
    pool.map(get_video, video_urls)
    

    单线程+多任务的异步协程

    asyncio(重点)

    特殊函数

    • 如果一个函数的定义被async关键字修饰后,则该函数是一个特殊函数。
    • 特殊之处:
      • 该函数被调用后,函数内部的实现语句不会被立即执行。
      • 该函数会返回一个协程对象

    协程

    • 协程就是一个对象。当特殊函数被调用后,该函数就会返回一个协程对象。

    • 协程对象 == 特殊函数

      import asyncio
      from time import sleep
      
      async def get_request(url):
          print('正在请求:', url)
          sleep(2)
          print('请求成功:', url)
          return '666'
      # 返回一个协程对象
      g = get_request("https://www,qq.com")
      

    任务对象

    • 就是对协程对象的进一步封装(就是一个高级的协程对象)

    • 任务对象 == 协程对象 == 特殊函数(表示某个固定形式的任务)

      asyncio.ensure_future(协程对象)
      
      task = asyncio.ensure_future(g)
      
      # g:协程对象
      
    • 绑定回调:

      # 定义一个task的回调函数
      def callback(task):
          task.result()	# 表示的是当前任务对象对应的特殊函数的返回值
          print("I'm callback:", task)
      
      task.add_done_callback(funcName)
      
      # task:任务对象
      # funcName:回调函数的名称
      
      • funcName这个回调函数必须要带一个参数,这个参数表示的就是当前的任务对象
        • 参数.result():表示的就是当前任务对象对应的特殊函数的返回值

    事件循环对象

    • 创建事件循环对象

    • 需要将任务对象注册到该事件循环对象中

      # 创建事件循环对象
      loop = asyncio.get_event_loop()
      # 将任务对象注册/装载到事件循环对象中,然后需要启动循环对象
      loop.run_until_complete(task)  # 用于装载且启动事件循环
      
      # task:任务对象
      

    等待

    await:当阻塞操作结束后让loop回头执行阻塞之后的代码。

    挂起

    asyncio.wait():将当前的任务对象交出cpu的使用权。

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    asyncio.wait   # 挂起操作
    tasks	# 任务对象列表
    

    重点注意事项

    • 在特殊函数实现内部不可以出现不支持异步的模块代码,否则会中断异步效果

    aiohttp(重点)

    • requests:不支持异步,不可以出现在特殊函数内部。

    • aiohttp:支持异步的网络请求模块,和asyncio一起使用

      • pip install aiohttp
    • 代码的编写

      • 写出基本架构
      import asyncio
      import aiohttp
      
      # 基于aiohttp实现异步的网络请求
      async def get_requests(url):
          # 实例化了一个请求对象
          with aiohttp.ClientSession() as aio:
              # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
              with aio.get(url=url) as response:
                  # text() 获取字符串形式的响应数据
                  # read() 获取bytes类型的响应数据
                  page_text = await response.text()
                  return page_text
      
      • 细节补充(代码参照完整代码)
        • 在每一个with前加上async关键字
        • 在每一个阻塞操作前加上await关键字
    • 完整代码

      import asyncio
      import aiohttp
      
      # 基于aiohttp实现异步的网络请求
      async def get_requests(url):
          # 实例化了一个请求对象
          async with aiohttp.ClientSession() as aio:
              # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
              async with await aio.get(url=url) as response:
                  # text() 获取字符串形式的响应数据
                  # read() 获取bytes类型的响应数据
                  page_text = await response.text()
                  return page_text
      

    单任务协程操作

    import asyncio
    from time import sleep
    
    async def get_request(url):
        print('正在请求:', url)
        sleep(2)
        print('请求成功:', url)
        return '666'
    
    # 定义一个task的回调函数
    def callback(task):
        print("I'm callback:", task)
    
    # 返回一个协程对象
    g = get_request("https://www,qq.com")
    
    # 创建一个任务对象
    task = asyncio.ensure_future(g)
    """
    
    # 给任务对象绑定回调函数
    task.add_done_callback(callback)
    
    # 创建事件循环对象
    loop = asyncio.get_event_loop()
    # 将任务对象注册/装载到事件循环对象中,然后需要启动循环对象
    loop.run_until_complete(task)  # 用于装载且启动事件循环
    """
    执行结果:
    正在请求: www,qq.com
    正在请求: www,qq.com
    """
    

    多任务协程操作

    import asyncio
    import time
    
    start = time.time()
    async def get_request(url):
        print('正在请求:', url)
        # await 当阻塞操作结束后让loop回头执行阻塞之后的代码
        await asyncio.sleep(2)
        print('请求成功:', url)
        return '666'
    
    urls = [
        'http://127.0.0.1:5000/xx',
        'http://127.0.0.1:5000/yy',
        'http://127.0.0.1:5000/oo',
    ]
    tasks = []
    for url in urls:
        c = get_request(url)
        task = asyncio.ensure_future(c)
        tasks.append(task)
    
    loop = asyncio.get_event_loop()
    # 将任务列表注册到事件循环的时候一定要将任务列表进行挂起操作
    # asyncio.wait()  挂起操作,将当前的任务对象交出cpu的使用权
    loop.run_until_complete(asyncio.wait(tasks))
    print('总耗时:', time.time() - start)
    

    单线程&多任务异步爬虫

    基于Flask自测

    • 测试代码在上述测试:同步&异步效率,按照上述步骤启动项目;然后运行下方代码。
    import asyncio
    import time
    import aiohttp
    from lxml import etree
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
    }
    
    urls = [
        'http://127.0.0.1:5000/xx',
        'http://127.0.0.1:5000/yy',
        'http://127.0.0.1:5000/oo',
    ]
    
    start = time.time()
    
    """
    # 发起请求,获取响应数据(不可以实现异步)
    async def get_requests(url):
        # requests是不支持异步的模块
        page_text = requests.get(url).text
        return page_text
    """
    
    async def get_requests(url):
        """
        基于aiohttp实现异步的网络请求
        :param url: 
        :return: 
        """
        # 实例化了一个请求对象
        async with aiohttp.ClientSession() as aio:
            # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
            async with await aio.get(url=url) as response:
                # text() 获取字符串形式的响应数据
                # read() 获取bytes类型的响应数据
                page_text = await response.text()
                return page_text
    
    def parse(task):
        """
        定义回调函数
        :param task:
        :return:
        """
        page_text = task.result()  # 获取特殊函数的返回值(请求到的页面源码数据)
        tree = etree.HTML(page_text)
        content = tree.xpath('//*[@id="feng"]/text()')[0]
        print(content)
    
    tasks = []
    for url in urls:
        c = get_requests(url)
        task = asyncio.ensure_future(c)
        task.add_done_callback(parse)
        tasks.append(task)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print('总耗时:', time.time() - start)
    

    案例:基于单线程多任务异步爬取梨视频

    • 思路上述案例:基于线程池爬取梨视频
    import asyncio
    import time
    import aiohttp
    from lxml import etree
    import re
    import os
    import requests
    
    # time模块是为了测试爬取视频的耗时
    start = time.time()
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'
    }
    # 梨视频财富板块的地址
    main_url = 'https://www.pearvideo.com/category_3'
    main_page_text = requests.get(url=main_url, headers=headers).text
    tree = etree.HTML(main_page_text)
    li_list = tree.xpath('//*[@id="listvideoListUl"]/li')
    urls = []  # [{'url': video_url,'name': name},{}...]
    for li in li_list:
        detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0]
        name = li.xpath('./div/a/div[2]/text()')[0]
        page_text = requests.get(url=detail_url, headers=headers).text
        # 视频详情页的video是js代码动态生成的
        ex = 'srcUrl="(.*?)",vdoUrl='
        video_url = re.findall(ex, page_text, re.S)[0]  # 返回的是列表类型
        dic = {
            'url': video_url,
            'name': name,
        }
        urls.append(dic)
    
    # 基于aiohttp实现异步的网络请求
    async def get_requests(url):
        # 实例化了一个请求对象
        async with aiohttp.ClientSession() as aio:
            # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response:
            async with await aio.get(url=url['url'], headers=headers) as response:
                # text() 获取字符串形式的响应数据
                # read() 获取bytes类型的响应数据
                page_read = await response.read()
                dic = {
                    "page_read": page_read,
                    "name": url['name']
                }
                return dic
    
    
    def parse(task):
        """
        定义回调函数
        :param task:
        :return:
        """
        dic_info = task.result()  # 获取特殊函数的返回值(请求到的页面源码数据)
        file_name = "./video/" + dic_info["name"] + ".mp4"
        with open(file_name, 'wb') as f:
            f.write(dic_info['page_read'])
            print(dic_info["name"], "下载完毕!")
    
    tasks = []
    for url in urls:
        c = get_requests(url)
        task = asyncio.ensure_future(c)
        task.add_done_callback(parse)
        tasks.append(task)
    
    dir_name = 'video'
    if not os.path.exists(dir_name):
        os.mkdir(dir_name)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print('总耗时:', time.time() - start)
    
    
  • 相关阅读:
    Python+fiddler(基于Cookie绕过验证码自动登录)
    Python+selenium(警告框处理)
    Python+selenium(多表单、多窗口切换)
    Python+selenium(定位一组元素)
    Python+selenium登录测试
    【转载】python format遇上花括号{}
    【转载】判断当前使用的编译器及操作系统
    动态库的创建,隐式加载和显式加载
    Google C++单元测试框架GoogleTest---AdvancedGuide(译文)
    三次样条插值 cubic spline interpolation
  • 原文地址:https://www.cnblogs.com/Golanguage/p/12535623.html
Copyright © 2011-2022 走看看