zoukankan      html  css  js  c++  java
  • Python3 并发 | concurrent.futures

    # -*- coding: utf-8 -*-
    from concurrent.futures import ThreadPoolExecutor, wait, as_completed, ALL_COMPLETED
    import logging
    import random
    import queue
    import time
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - [line:%(lineno)d] %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    fh = logging.FileHandler(r'spider.log', encoding='utf-8')
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    
    
    class TPE(ThreadPoolExecutor):
        """
        并行任务
        
        # 官方文档 https://docs.python.org/zh-cn/3.7/library/concurrent.futures.html
        """
    
        def __init__(self, max_workers=None, thread_name_prefix=''):
            super().__init__(max_workers, thread_name_prefix)
    
    
        def run(self):
            self.num_list = [i for i in range(0, 100)]
            num = self.submit(self.manage_task)
    
            # 设置任务列表大小
            group_size = 10
    
            while self.num_list:
                # # # 方式一
                # 每次 10 个任务
                result = self.map(self.query_task, self.num_list[:group_size])
                for result in result:
                    logger.info(f"result | {result}")
    
                # # # 方式二
                # 重新定义最大线程数 5
                # with ThreadPoolExecutor(max_workers=5) as executor:
                #     # 列表模式
                #     futures = [executor.submit(self.query_task, f"{n}") for n in self.num_list[:group_size]]
                #     # 等待 futures 中的任务全部结束
                #     wait(futures, return_when=ALL_COMPLETED)
                #     for future in as_completed(futures):
                #         logger.info(future.result())
    
                #     # 字典模式
                #     futures = {executor.submit(self.query_task, n): n for n in self.num_list[:group_size]}
                #     for future in as_completed(futures):
                #         infos = futures[future]
                #         data = future.result()
                #         logger.info(data)
    
                # # # 方式三
                # 既然都继承了直接 submit
                # futures = [self.submit(self.query_task, f"{n}") for n in self.num_list[:group_size]]
                # # 等待 futures 中的任务全部结束
                # wait(futures, return_when=ALL_COMPLETED)
                # for future in as_completed(futures):
                #     logger.info(future.result())
    
                # 重新赋值,切割列表
                self.num_list = self.num_list[group_size:]
                logger.info(self.num_list)
                time.sleep(1)
    
        def query_task(self, num):
            """
            任务
            :param num:
            :return:
            """
            logger.info(f"query task | {num}")
            time.sleep(1)
            result = {}
            result['succeed'] = True
            result['num'] = num
            return result
    
        def manage_task(self):
            while True:
                if not self.num_list:
                    return True
    
                n = random.randint(-1, 999)
                self.num_list.append(n)
                logger.info(f"add new task | {n}")
                time.sleep(1.5)
    
    
    if __name__ == '__main__':
        # 设置最大并发数
        t = TPE(max_workers=10)
        t.run()
    
    

    使用队列

    # -*- coding: utf-8 -*-
    from concurrent.futures import ThreadPoolExecutor
    import logging
    import random
    import queue
    import time
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - [line:%(lineno)d] %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    fh = logging.FileHandler(r'queue.log', encoding='utf-8')
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    
    class TPE(ThreadPoolExecutor):
        def __init__(self, max_workers=None, thread_name_prefix=''):
            super().__init__(max_workers, thread_name_prefix)
            self.proxy_queue = queue.Queue(maxsize=0)
            
            self.domain_list = []
    
        def run(self):
            # 开始程序, 添加 10 个值
            [self.proxy_queue.put(random.randint(1, 99)) for i in range(10)]
            self.submit(self.manage_queue)
            time.sleep(0.1)
            self.submit(self.talk)
            
        def talk(self):
             while not self.proxy_queue.empty():
                # 取值
                for i in range(3):
                    logger.info(f"获取值:{self.proxy_queue.get()} | 队列: {self.proxy_queue.qsize()}")
                print()
                time.sleep(2.1)
        
        def manage_queue(self):
            while True:
                if self.proxy_queue.empty():
                    return True
                
                # 队列添加值
                n = random.randint(1, 99)
                self.proxy_queue.put(n)
                logger.info(f"添加值:{n}")
                time.sleep(0.9)
    
    if __name__ == '__main__':
        t = TPE(max_workers=5)
        t.run()
    
  • 相关阅读:
    Enigmatic Partition【二阶差分】-2020牛客暑期多校8
    Tetrahedron【几何】-2020杭电多校5
    Set1【组合数】-2020杭电多校5
    Paperfolding【组合数】-2020杭电多校5
    并发编程学习总结(二、AQS实现类总结)
    并发编程学习笔记(三十五、线程池源码五,submit方法分析)
    并发编程学习笔记(三十四、线程池源码四,execute方法分析)
    并发编程学习笔记(三十三、线程池源码三,线程池状态)
    并发编程学习笔记(三十二、线程池源码二,ThreadPoolExecutor构造函数)
    并发编程学习笔记(三十一、线程池源码一,工作线程Worker)
  • 原文地址:https://www.cnblogs.com/zl158218/p/13495611.html
Copyright © 2011-2022 走看看