zoukankan      html  css  js  c++  java
  • python3之concurrent.futures一个多线程多进程的直接对接模块,python3.2有线程池了

    Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。 
    concurrent.futures基础模块是executor和future。

    Executor

    Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。 
    ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

    class Executor(object):
        """This is an abstract base class for concrete asynchronous executors."""
    
        def submit(self, fn, *args, **kwargs):
            raise NotImplementedError()
    
        def map(self, fn, *iterables, timeout=None):
            if timeout is not None:
                end_time = timeout + time.time()
    
            fs = [self.submit(fn, *args) for args in zip(*iterables)]
            def result_iterator():
                try:
                    for future in fs:
                        if timeout is None:
                            yield future.result()
                        else:
                            yield future.result(end_time - time.time())
                finally:
                    for future in fs:
                        future.cancel()
            return result_iterator()
    
        def shutdown(self, wait=True):
            pass
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.shutdown(wait=True)
            return False
    

      

    submit()方法

    Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。 
    通过下面的例子来理解submit对线程池/进程池的操作。

    # coding: utf-8
    
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def return_future(msg):
        time.sleep(3)
        return msg
    
    
    # 创建一个线程池
    pool = ThreadPoolExecutor(max_workers=2)
    
    # 往线程池加入2个task
    f1 = pool.submit(return_future, 'hello')
    f2 = pool.submit(return_future, 'world')
    
    print(f1.done())
    time.sleep(3)
    print(f2.done())
    
    print(f1.result())
    print(f2.result())
    

      

    改写为进程池形式很简单,把ThreadPoolExecutor替换为ProcessPoolExecutor即可。如果需要提交多个task,可以通过循环多次submit()

    map()方法

    除了submit,Exectuor还为我们提供了map方法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。可以通过下面的例子来理解:

    # coding: utf-8
    
    from concurrent.futures import ThreadPoolExecutor as Pool
    import requests
    
    URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
    
    
    def task(url, timeout=10):
        return requests.get(url, timeout=timeout)
    
    
    pool = Pool(max_workers=3)
    results = pool.map(task, URLS)
    
    for ret in results:
        print('%s, %s' % (ret.url, len(ret.content)))
    

      执行结果

    http://www.baidu.com/, 2381
    http://www.qq.com/, 252160
    http://www.sina.com.cn/, 607265
    

      

    Future

    Future可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

    import requests    
    
    data = requests.get('http://www.baidu.com').content    
    print len(data)
    

    Future实例是由Executor.submit()创建的。Future提供了丰富的方法来处理调用。

    # coding: utf-8
    from concurrent.futures import ThreadPoolExecutor as Pool
    from concurrent.futures import as_completed
    import requests
    
    URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
    
    
    def task(url, timeout=10):
        return requests.get(url, timeout=timeout)
    
    
    with Pool(max_workers=3) as executor:
        future_tasks = [executor.submit(task, url) for url in URLS]
    
        for f in future_tasks:
            if f.running():
                print('%s is running' % str(f))
    
        for f in as_completed(future_tasks):
            try:
                ret = f.done()
                if ret:
                    f_ret = f.result()
                    print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
            except Exception as e:
                f.cancel()
                print(str(e))
    

      结果

    <Future at 0x7fc2716e1f60 state=running> is running
    <Future at 0x7fc27136d4e0 state=running> is running
    <Future at 0x7fc27136d710 state=running> is running
    <Future at 0x7fc27136d710 state=finished returned Response>, done, result: http://www.baidu.com/, 2381
    <Future at 0x7fc2716e1f60 state=finished returned Response>, done, result: http://www.qq.com/, 252343
    <Future at 0x7fc27136d4e0 state=finished returned Response>, done, result: http://www.sina.com.cn/, 602366

    从运行结果可以看出,as_completed不是按照URLS列表元素的顺序返回的。这也表明,并发访问不通的url时,没有阻塞。

    wait

    wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。

    # coding: utf-8
    from concurrent.futures import ThreadPoolExecutor as Pool
    from concurrent.futures import wait
    import requests
    
    URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
    
    
    def task(url, timeout=10):
        return requests.get(url, timeout=timeout)
    
    
    with Pool(max_workers=3) as executor:
        future_tasks = [executor.submit(task, url) for url in URLS]
    
        for f in future_tasks:
            if f.running():
                print('%s is running' % str(f))
    
        results = wait(future_tasks)
        done = results[0]
        for x in done:
            print(x)
    

      wait有timeout和return_when两个参数可以设置。 
    timeout控制wait()方法返回前等待的时间。 
    return_when决定方法什么时间点返回:如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成;如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成。

    转自

    http://blog.csdn.net/dutsoft/article/details/54728706

  • 相关阅读:
    window.location.href的使用方法
    hdu 2850 Load Balancing (优先队列 + 贪心)
    几种常见模式识别算法整理和总结
    【DateStructure】 Charnming usages of Map collection in Java
    编写你自己的单点登录(SSO)服务
    微软历史最高市值是多少?
    Tomcat配置一个ip绑定多个域名
    递归算法:求序列的全排列
    SMTP协议分析
    platform_device与platform_driver
  • 原文地址:https://www.cnblogs.com/935415150wang/p/7719597.html
Copyright © 2011-2022 走看看