zoukankan      html  css  js  c++  java
  • selenium实现并发

    for循环和多线程 + selenium

    实例一

    for循环

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2019/6/22
    Author: Zhang Yafei
    Description: 
    """
    import time
    
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from concurrent.futures import ThreadPoolExecutor
    import functools
    
    
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument('--disable-gpu')
    
    
    def timeit(func):
        """
        装饰器: 判断函数执行时间
        :param func:
        :return:
        """
    
        @functools.wraps(func)
        def inner(*args, **kwargs):
            start = time.time()
            ret = func(*args, **kwargs)
            end = time.time() - start
            if end < 60:
                print(f'花费时间:	{round(end, 2)}秒')
            else:
                min, sec = divmod(end, 60)
                print(f'花费时间	{round(min)}分	{round(sec, 2)}秒')
            return ret
    
        return inner
    
    
    class PolicyUrlDownload(object):
        """ 政策数据下载 """
    
        def __init__(self, url, pages_num, output_file, a_xpath, headless: bool=True):
            self.url_list = [url.format(page) for page in range(1, pages_num+1)]
            self.output_file = output_file
            self.a_xpath = a_xpath
            if headless:
                self.driver = webdriver.Chrome(options=chrome_options)
            else:
                self.driver = webdriver.Chrome()
    
        def start(self, page, url):
            with open(self.output_file, mode='a', encoding='utf-8') as file:
                print(f"make request to {url}")
                self.driver.get(url)
                titles = self.driver.find_elements_by_xpath(self.a_xpath)
                for title in titles:
                    href = title.get_attribute('href')
                    file.write(f'{page}	{href}
    ')
                print(f'{url} download completed')
    
        def run(self):
            for page, url in enumerate(self.url_list):
                self.start(page+1, url)
            self.driver.close()
    
    
    @timeit
    def main(setting):
        policy_data = PolicyUrlDownload(**setting)
        policy_data.run()
    
    
    if __name__ == '__main__':
        start_time = time.time()
        print('######################## 开始下载 #########################')
    
        # 多配置页面地址下载
        settings = [
            {
                'output_file': '药品供应保障综合的管理.txt',
                'url': 'http://cdsip.nhfpc.gov.cn/work/0-{}.html',
                'pages_num': 8,
                'a_xpath': '//div[@id="active0"]/ul/li/a'
            },
            {
                'output_file': '药品供应保障综合的管理.txt',
                'url': 'http://cdsip.nhfpc.gov.cn/policy/0-{}-0.html',
                'pages_num': 9,
                'a_xpath': '//div[@class="infoContent box-body"]/ul/li/a'
            }
        ]
    
    
        for setting in settings:
            main(setting)
            
        print('下载成功, 共花费时间 ', round(time.time() - start_time, 2), '秒')

    结果

    下载成功, 共花费时间  28.46 秒

    多线程

    # -*- coding: utf-8 -*-
    
    """
    Datetime: 2019/6/22
    Author: Zhang Yafei
    Description: 
    """
    import time
    
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from concurrent.futures import ThreadPoolExecutor
    import functools
    
    
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument('--disable-gpu')
    
    
    def timeit(func):
        """
        装饰器: 判断函数执行时间
        :param func:
        :return:
        """
    
        @functools.wraps(func)
        def inner(*args, **kwargs):
            start = time.time()
            ret = func(*args, **kwargs)
            end = time.time() - start
            if end < 60:
                print(f'花费时间:	{round(end, 2)}秒')
            else:
                min, sec = divmod(end, 60)
                print(f'花费时间	{round(min)}分	{round(sec, 2)}秒')
            return ret
    
        return inner
    
    
    class PolicyUrlDownload(object):
        """ 政策数据下载 """
    
        def __init__(self, url, pages_num, output_file, a_xpath, headless: bool=True):
            self.url_list = [url.format(page) for page in range(1, pages_num+1)]
            self.output_file = output_file
            self.a_xpath = a_xpath
            if headless:
                self.driver = webdriver.Chrome(options=chrome_options)
            else:
                self.driver = webdriver.Chrome()
    
        def start(self, page, url):
            with open(self.output_file, mode='a', encoding='utf-8') as file:
                print(f"make request to {url}")
                self.driver.get(url)
                titles = self.driver.find_elements_by_xpath(self.a_xpath)
                for title in titles:
                    href = title.get_attribute('href')
                    file.write(f'{page}	{href}
    ')
                print(f'{url} download completed')
    
        def run(self):
            for page, url in enumerate(self.url_list):
                self.start(page+1, url)
            self.driver.close()
    
    
    @timeit
    def main(setting):
        policy_data = PolicyUrlDownload(**setting)
        policy_data.run()
    
    
    if __name__ == '__main__':
        start_time = time.time()
        print('######################## 开始下载 #########################')
    
        # 多配置页面地址下载
        settings = [
            {
                'output_file': '药品供应保障综合的管理.txt',
                'url': 'http://cdsip.nhfpc.gov.cn/work/0-{}.html',
                'pages_num': 8,
                'a_xpath': '//div[@id="active0"]/ul/li/a'
            },
            {
                'output_file': '药品供应保障综合的管理.txt',
                'url': 'http://cdsip.nhfpc.gov.cn/policy/0-{}-0.html',
                'pages_num': 9,
                'a_xpath': '//div[@class="infoContent box-body"]/ul/li/a'
            }
        ]
        with ThreadPoolExecutor() as pool:
            pool.map(main, settings)
    
        print('下载成功, 共花费时间 ', round(time.time() - start_time, 2), '秒')

    结果

    花费时间:      18.04秒

    实例二

    顺序执行

    # -*- coding: utf-8 -*-
    import os
    import time
    from concurrent.futures import ThreadPoolExecutor
    from hashlib import md5
    
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.chrome.service import Service
    import numpy as np
    
    
    class PolicyPageDownload(object):
        """ 政策数据下载 """
    
        def __init__(self, file, dir_name, url_list):
            self.file = file
            self.dir_name = dir_name
            self.urls = url_list
            self.chrome_options = Options()
            self.chrome_options.add_argument("--headless")
            self.chrome_options.add_argument('--disable-gpu')
            self.driver = webdriver.Chrome(options=self.chrome_options)
            # self.driver = webdriver.Chrome()
    
        def start(self, url):
            """
            开始下载
            :param url:
            :return:
            """
            self.driver.get(url)
            response = self.driver.page_source
            print(f'make request to {url}')
            file_name = md5(bytes(url, encoding='utf-8')).hexdigest() + '.html'
            print('11111111111')
            with open(f'{self.dir_name}/{file_name}', 'w', encoding='utf-8') as file:
                file.write(response)
            print(f'{url} download completed')
    
        def run(self):
            """ 入口函数 """
            [self.start(url) for url in self.urls]
            self.driver.quit()
    
    
    def filter_urls(dir_name, urls):
        """
        过滤url
        :param urls:
        :return:
        """
        encode_urls = [md5(bytes(url, encoding='utf-8')).hexdigest() + '.html' for url in urls]
        has_file = [file for file in os.listdir(dir_name) if os.path.getsize(os.path.join(dir_name, file)) > 0]
        encode_urls = set(encode_urls) - set(has_file)
        down_urls = list(
            filter(lambda url: md5(bytes(url, encoding='utf-8')).hexdigest() + '.html' in encode_urls, urls))
        print(f'共{len(set(urls))}	已下载{len(set(has_file))}	 还需下载{len(encode_urls)}')
        return down_urls
    
    def run(url_list):
        policy = PolicyPageDownload(url_list=url_list, **setting)
        policy.run()
    
    def main(file, dir_name):
        if not os.path.exists(dir_name):
            os.mkdir(dir_name)
        inputfile = open(file, 'r', encoding='utf-8')
        urls = [line.strip().split('	')[1] for index, line in enumerate(inputfile)]
        if os.path.exists(dir_name):
            urls = filter_urls(dir_name, urls)
    
        run(urls)
    
    if __name__ == '__main__':
        start_time = time.time()
        setting = {
            'file': '药品供应保障综合的管理.txt',
            'dir_name': '药品供应保障综合的管理'
            }
        main(**setting)
    
        print('下载成功, 共花费时间 ', round(time.time() - start_time, 2), '秒')

    多线程

    # -*- coding: utf-8 -*-
    import os
    import time
    from concurrent.futures import ThreadPoolExecutor
    from hashlib import md5
    
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.chrome.service import Service
    import numpy as np
    
    
    class PolicyPageDownload(object):
        """ 政策数据下载 """
    
        def __init__(self, file, dir_name, url_list):
            self.file = file
            self.dir_name = dir_name
            self.urls = url_list
            self.chrome_options = Options()
            self.chrome_options.add_argument("--headless")
            self.chrome_options.add_argument('--disable-gpu')
            self.driver = webdriver.Chrome(options=self.chrome_options)
            # self.driver = webdriver.Chrome()
    
        def start(self, url):
            """
            开始下载
            :param url:
            :return:
            """
            self.driver.get(url)
            response = self.driver.page_source
            print(f'make request to {url}')
            file_name = md5(bytes(url, encoding='utf-8')).hexdigest() + '.html'
            print('11111111111')
            with open(f'{self.dir_name}/{file_name}', 'w', encoding='utf-8') as file:
                file.write(response)
            print(f'{url} download completed')
    
        def run(self):
            """ 入口函数 """
            [self.start(url) for url in self.urls]
            self.driver.quit()
    
    
    def filter_urls(dir_name, urls):
        """
        过滤url
        :param urls:
        :return:
        """
        encode_urls = [md5(bytes(url, encoding='utf-8')).hexdigest() + '.html' for url in urls]
        has_file = [file for file in os.listdir(dir_name) if os.path.getsize(os.path.join(dir_name, file)) > 0]
        encode_urls = set(encode_urls) - set(has_file)
        down_urls = list(
            filter(lambda url: md5(bytes(url, encoding='utf-8')).hexdigest() + '.html' in encode_urls, urls))
        print(f'共{len(set(urls))}	已下载{len(set(has_file))}	 还需下载{len(encode_urls)}')
        return down_urls
    
    def run(url_list):
        policy = PolicyPageDownload(url_list=url_list, **setting)
        policy.run()
    
    def main(file, dir_name):
        if not os.path.exists(dir_name):
            os.mkdir(dir_name)
        inputfile = open(file, 'r', encoding='utf-8')
        urls = [line.strip().split('	')[1] for index, line in enumerate(inputfile)]
        if os.path.exists(dir_name):
            urls = filter_urls(dir_name, urls)
    
        with ThreadPoolExecutor() as pool:
            pool.map(run, np.array_split(urls, 4))
    
    
    if __name__ == '__main__':
        start_time = time.time()
        setting = {
            'file': '药品供应保障综合的管理.txt',
            'dir_name': '药品供应保障综合的管理'
            }
        main(**setting)
    
        print('下载成功, 共花费时间 ', round(time.time() - start_time, 2), '秒')

    运行结果

    #  50 for循环: 下载成功, 共花费时间  48.62 秒
    # 150 for循环: 共花费时间  150.22 秒
    # 150 多线程: 共花费时间  80.84 秒
    
    
    • 结论: 建立driver的花销较大,尽量创建一次,多次使用, 并发的话不能共用一个driver,必须重新创建
    • 使用技巧总结:创建多个线程,个数最好和cpu个数相同,每个线程创建一个driver

      

  • 相关阅读:
    shellscript 02 find & xargs
    PL/SQL exception
    PL/SQL 游标
    Eclipse
    【数据存储】操作资源文件
    【AsynTask】Android异步加载一张图品
    【数据存储】利用IO流操作文件
    【数据存储】DOM操作
    【特效】手指滑动:水波纹
    【数据存储】SAX操作
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/11075243.html
Copyright © 2011-2022 走看看