zoukankan      html  css  js  c++  java
  • 使用concurrent.futures模块并发,实现进程池、线程池

    concurrent.futures介绍

    future 是一种对象,表示异步执行的操作。这个概念是 concurrent.futures模块和asyncio包的基础。

    Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码。

    concurrent.futures 模块是Python3.2 引入的,对于Python2x 版本,Python2.5 以上的版本可以安装 futures 包来使用这个模块,使用命令pip install futures安装即可。

    concurrent.futures主要使用的就是两个类,多线程:ThreadPoolExecutor多进程:ProcessPoolExecutor;这两个类都是抽象Executor类的子类,都继承了相同的接口。

    从Python3.4起,标准库中有两个为Future的类:concurrent.futures.Future 和 asyncio.Future。这两个类作用相同:两个Future类的实例都表示可能已经完成或未完成的延迟计算

    concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用

    基本方法

    ●submit(fn, *args, **kwargs)
    异步提交任务
    
    ●map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    ●shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    ●result(timeout=None)
    取得结果
    
    ●add_done_callback(fn)
    回调函数

    进程池

    # 用法
    import os
    import time
    from concurrent.futures import  ProcessPoolExecutor
    
    def task(n):
        print('%s is runing' % os.getpid())
        time.sleep(2)
        return n ** 2
    
    if __name__ == '__main__':
    
        executor = ProcessPoolExecutor(max_workers=3) #不填则默认为cpu的个数
        futures = []
        for i in range(10):
            future = executor.submit(task, i)#submit()方法返回的是一个future实例,要得到结果需要用obj.result()
            futures.append(future)
        executor.shutdown() #默认为True类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
        print('+++++++++++++++++++=>')
        print([obj.result() for obj in futures])
    

    上面方法也可写成下面的方法

    import os
    import time
    from concurrent.futures import  ProcessPoolExecutor
    
    def task(n):
        print('%s is runing' % os.getpid())
        time.sleep(2)
        return n ** 2
    
    if __name__ == '__main__':
    
        start = time.time()
        with ProcessPoolExecutor() as p:   #类似打开文件,可省去.shutdown()
            future_tasks = [p.submit(task, i) for i in range(10)]
        print('=' * 30)
        print([obj.result() for obj in future_tasks])

    线程池

    注意:

      在Windows上创建进程池必须在if __name__ == '__main__':中,否则会报错

    原因:

      这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。但是在multiprocessing.Process的源码中是对子进程再次产生子进程是做了限制的,是不允许的,于是出现如上的错误提示。所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候 name 不是 main ,就不会递归运行了

    import os
    import time
    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    
    def task(n):
        print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
        time.sleep(2)
        return n**2
    
    
    if __name__ == '__main__':
        executor = ThreadPoolExecutor(max_workers = 6)  # 不填则默认为cpu的个数*5
        futures = []
        start = time.time()
        for i in range(10):
            obj = executor.submit(task, i)
            futures.append(obj)
        executor.shutdown()
        print('=' * 30)
        print([obj.result() for obj in futures])
        print(time.time() - start)
    

    上面方法也可写成下面的方法

    import os
    import time
    import threading
    from concurrent.futures import ThreadPoolExecutor
    
    
    def task(n):
        print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
        time.sleep(2)
        return n**2
    
    
    if __name__ == '__main__':
    
        start = time.time()
        with ThreadPoolExecutor(max_workers=6) as executor:   #类似打开文件,可省去.shutdown()
            future_tasks = [executor.submit(task, i) for i in range(10)]
        print('=' * 30)
        print([obj.result() for obj in future_tasks])
        print(time.time() - start)
    

    回调函数

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import requests
    import os
    import time
    from threading import currentThread
    def get_page(url):
        print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
        response=requests.get(url)
        time.sleep(2)
        return {'url':url,'text':response.text}
    def parse_page(res):  #此处的res是一个p.submit获得的一个future对象,不是结果
        res=res.result()  #res.result()拿到的才是对应的结果
        print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
        with open('db.txt','a') as f:
            parse_res='url:%s size:%s
    ' %(res['url'],len(res['text']))
            f.write(parse_res)
    if __name__ == '__main__':
        # p=ProcessPoolExecutor()
        p=ThreadPoolExecutor()
        urls = [
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
        ]
    
        for url in urls:
            # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
            p.submit(get_page, url).add_done_callback(parse_page) #与之前的回调函数拿到的结果不同,这里拿到的是前面submit方法执行完后返回的对象,要.result才能拿到对应的结果
        p.shutdown()
        print('主',os.getpid())
    

    map方法

    和内置函数map差不多的用法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。

    以下是通过concurrent.futures模块下类ThreadPoolExecutor和ProcessPoolExecutor实例化的对象的map方法实现进程池、线程池

    from concurrent.futures import ProcessPoolExecutor
    from concurrent.futures import ThreadPoolExecutor
    import os
    import time
    
    
    def task(n):
        print('%s is running' % os.getpid())
        time.sleep(2)
        return n**2
    
    
    if __name__ == '__main__':
        # executor=ProcessPoolExecutor()
        executor = ThreadPoolExecutor()
        start = time.time()
        obj = executor.map(task, range(10))
        executor.shutdown()
        print('=' * 30)
        print(list(obj))
        print(time.time() - start)

    简化

    1、with ThreadPoolExecutor() as executor: #类似打开文件,可省去.shutdown()

    2、executor.map(task,range(1,12)) #map取代了for+submit

    with ProcessPoolExecutor(max_workers=10) as executor:
        executor.map(print_hello, range(10))

    参考:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html

  • 相关阅读:
    mysql--连接查询(内外连接)
    Mysql--select基础查询
    Mysql--数据定义语言(DDL)
    Mysql--数据操作语言(DML)
    java--String、StringBuilder、StringBuffer的解析和比较?
    Java--equals和 == 的比较和equals()、HashCode()的重写
    Mysql--数据类型
    Mysql--约束
    SpringCloud版本说明
    springBoot 发送邮件
  • 原文地址:https://www.cnblogs.com/luxiaojun/p/11636058.html
Copyright © 2011-2022 走看看