zoukankan      html  css  js  c++  java
  • 并发编程之 multiprocessing 和 concurrent.futures(二)

    1. multiprocessing

    Python 实现多进程的模块最常用的是multiprocessing,此外还有multiprocess、pathos、concurrent.futures、pp、parallel、pprocess等模块。

    1.1 multiprocessing.Process

    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    

    参数

    • group: 为预留参数
    • target:子进程要执行的目标函数
    • name:线程名称
    • args、kwargs:参数,args 必须是元组
    • deamonbool 值:表示是否为守护进程

    实例

    # coding=utf-8
    import multiprocessing
    import time
    
    
    def run(a):
        time.sleep(5)
        print(a)
        return a * a
    
    
    if __name__ == '__main__':
        p = multiprocessing.Process(target=run, args=(123456,))
        p.start()	# 运行进程实例
        p.join()    # 阻塞主进程,当子进程结束后,才会继续执行主进程
        print(123)
    

    1.2 multiprocessing.Pool

    创建多个子进程最好是采用进程池 multiprocessing.Pool

    multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
    

    参数

    • processes:进程数量,如果 processesNone那么使用 os.cpu_count()返回的数量
    • initializer: 如果 initializer不是 None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
    • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是None,意味着只要Pool存在工作进程就会一直存活
    • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

    创建子进程的几种方式

    • apply():同步阻塞执行,上一个子进程结束后才能进行下一个子进程(不推荐)
    • apply_async():异步非阻塞执行,每个子进程都是异步执行的(并行)(推荐)
    • map():同步阻塞
    • map_async():异步非阻塞
    • imap():内存不够用可以采用此种方式,速度慢于 map()
    • imap_unorderedimap() 的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例

    1.2.1 apply

    同步阻塞执行,上一个子进程结束后才能进行下一个子进程

    apply(func, args=(), kwds={}, callback=None, error_callback=None) 
    

    1.2.2 apply_async

    异步非阻塞执行,每个子进程都是异步执行的(并行),异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成

    # callback 回调,error_back 错误回调
    apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
    

    示例

    # coding=utf-8
    
    import multiprocessing
    
    
    def callback(result):
        """回调函数"""
        with open("result.txt", "a+", encoding="utf-8") as f:
            f.write(str(result) + "
    ")
    
    
    def run(num):
        return num * num
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(6)
        for i in range(1000):
            pool.apply_async(run, args=(i,), callback=callback)
            
            # # 如有多个参数,可传一个 iterable
            # pool.apply_async(run, args=([i, 123, 456]), callback=callback)
    
        pool.close()
        pool.join()
    

    1.2.3 map

    若子进程有返回值,且需集中处理,建议采用此种方式(但是它是同步阻塞的):

    # iterable 可迭代类型,将 iterable 中每个元素作为参数应用到 func 函数中,返回 list
    map(func, iterable, chunksize=None)
    

    1.2.4 map_async

    map 的异步非阻塞版本,返回 MapResult 实例,使用 get() 方法,获取结果(list 方法):

    map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
    

    apply_async 与 map_async 对比

    # coding=utf-8
    
    import multiprocessing
    import time
    
    
    def run(a):
        return a * a
    
    
    data = []
    
    
    def my_callback(result):
        data.append(result)
    
    
    if __name__ == '__main__':
        st = time.time()
        pool = multiprocessing.Pool(6)
    
        # 总耗时:0.4497215747833252
        future = pool.map_async(run, range(20000))
        print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    
        # # 总耗时:3.019148111343384
        # for i in range(20000):
        #     pool.apply_async(run, args=(i,), callback=my_callback)
        # 
        # print(data)
    
        pool.close()
        pool.join()
        print(f"总耗时:{time.time() - st}")
    

    结论

    • map_asyncapply_async 速度快
    • 若想统一处理结果,map_asyncapply_async 更方便

    1.2.5 imap 和 imap_unordered

    内存不够可以采用 imap 方式,map 的迭代器版本,返回迭代器实例,速度远慢于 map,但是堆内存需求小。

    imap_unorderedimap 的无序版本

    imap(func, iterable, chunksize=1)
    imap_unordered(func, iterable, chunksize=1)
    

    实例:

    # coding=utf-8
    
    import multiprocessing
    import time
    
    
    def run(a):
        return a * a
    
    
    data = []
    
    
    def my_callback(result):
        data.append(result)
    
    
    if __name__ == '__main__':
        st = time.time()
        pool = multiprocessing.Pool(6)
    
        # # 总耗时:0.4497215747833252
        # future = pool.map_async(run, range(20000))
        # print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    
        # # 总耗时:3.019148111343384
        # for i in range(20000):
        #     pool.apply_async(run, args=(i,), callback=my_callback)
        #
        # print(data)
    
        future = pool.imap(run, range(20000))   # 总耗时:4.171960115432739
        print(future)
        for i in future:
            print(i)
    
        pool.close()
        pool.join()
        print(f"总耗时:{time.time() - st}")  # 总耗时:0.4497215747833252
    

    1.2.6 starmap 和 starmap_async

    starmap 可以使子进程活动接收多个参数,而 map 只能接收一个参数:

    # 子进程活动 func允许包含多个参数,也即iterable的每个元素也是iterable(其每个元素作为func的参数),返回结果组成的 list
    starmap(func, iterable, chunksize=None)
    
    # 异步并行版本,返回 MapResult 实例,get() 方法可以获取结果组成的 list
    starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)
    
    # 使用方式
    pool.starmap_async(f, ((a0, b0), (a1, b1), ...)).get()
    

    1.3 进程间通信(数据共享)

    每个进程是相互独立的,内存无法共享,实现进程间数据共享的方式有:

    • multiprocessing.Value(typecode_or_type, *args, lock=True):共享单个数据,共享内存
    • multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):共享数组,共享内存
    • multiprocessing.Manager() :共享进程,支持多种数据结构的数据共享

    Manager 支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,ValueArray不仅可以在本地进程间共享,甚至可以在多客户端实现网络共享,不过 Manager占用资源较大。

    1、共享 dict

    # coding=utf-8
    
    # 多个进程将数据添加到字典 dd 中
    
    import multiprocessing
    
    
    def run(d, k, v):
        d[k] = v
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(6)
        manager = multiprocessing.Manager()
    
        dd = manager.dict()
    
        for i in range(20):
            future = pool.apply_async(run, args=(dd, i, i * i))
    
        pool.close()
        pool.join()
    
        print(dict(dd))
        
    # 运行结果
    {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64, 7: 49, 9: 81, 10: 100, 11: 121, 12: 144, 13: 169, 14: 196, 15: 225, 16: 256, 17: 289, 18: 324, 19: 361}
    

    2、管理队列,并让不同的进程可以访问它:

    import multiprocessing
    
    
    def worker(name, que):
        que.put("%d is done" % name)
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=3)
        m = multiprocessing.Manager()
        q = m.Queue()
    
        for i in range(20):
            pool.apply_async(worker, (i, q))
    
        pool.close()
        pool.join()
    
    # coding=utf-8
    
    import multiprocessing
    
    
    def write(name, que):
        que.put("%d is done" % name)
        print(f'{name} write done!')
    
    
    def read(que):
        while not que.empty():
            val = que.get(True)
            print('read===>: ', val)
    
            # while True:
            #     if not que.empty():
            #         val = que.get(True)
            #         print('read===>: ', val)
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=3)
        m = multiprocessing.Manager()
        q = m.Queue()
    
        for i in range(20):
            pool.apply_async(write, (i, q))
    
        p1 = multiprocessing.Process(target=read, args=(q,))
        p1.start()
        p1.join()
    
        pool.close()
        pool.join()
    

    注意:在操作共享对象元素时,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。比如:dic['k'] = []; dic['k'].append(x),将不会修改 dic 的内容

    1.4 进程间通信(数据传递)

    • 队列
      • multiprocessing.Queue(maxsize=0) :建立共享的队列实例
      • multiprocessing.JoinableQueue(maxsize=0):建立可阻塞的队列实例
    • 管道
      • multiprocessing.Pipe(duplex=True):建立一对管道对象,用于在两个进程之间传递数据

    参考文章:python并行计算(上):multiprocessing、multiprocess模块

    2. concurrent.futures 模块

    concurrent.futures3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口,分为两类:

    • ThreadPoolExecutor:多线程编程
    • ProcessPoolExecutor:多进程编程

    两者实现了同样的接口,这些接口由抽象类 Executor 定义;这个模块提供了两大类型:

    • Executor:执行器,用于管理工作池
    • Future:管理工作计算出的结果

    2.1 concurrent.futures.Executor 类

    提供了一系列方法,可以用于异步执行调用,定义的方法有:

    # 调用对象执行,fn(*args, **kwargs),返回 Future 对象,可用 future.result() 获取执行结果
    submit(fn, *args, **kwargs)
    
    # 异步执行 func,并支持多次并发调用,返回一个迭代器
    # timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间
    # ProcessPoolExecutor 这个方法将 iterables 划分为多块,作为独立的任务提交到进程池(不是 1)可显著提升性能,ThreadPoolExecutor,chunksize 不起作用
    map(func, *iterables, timeout=None, chunksize=1)
    
    # 告诉当执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源
    # True 会等待所有 future 执行完毕,且 executor 的资源都释放完会才会返回,False 会立即返回,executor 的资源会在 future 执行完后释放
    shutdown(wait=True)
    

    2.2 ThreadPoolExecutor

    ThreadPoolExecutor + requests 并发执行

    # coding=utf-8
    
    import time
    import requests
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    
    def fetch(req_url):
        r = requests.get(req_url)
        return r.json()['args']['a']
    
    
    if __name__ == '__main__':
        start = time.time()
        numbers = range(12)
        url = 'http://httpbin.org/get?a={}'
    
        # submit() 方式
        with ThreadPoolExecutor(max_workers=3) as executor:
            # task_list = [executor.submit(fetch(url.format(n))) for n in range(12)]
            task_list = [executor.submit(fetch, url.format(n)) for n in range(12)]
    
            for future in as_completed(task_list):
                print(future.result())
                # data = future.result()        # 总耗时:2.903249740600586
                # print(data)
    
        ## map() 方式
        # with ThreadPoolExecutor(max_workers=3) as executor:
        #     future = executor.map(fetch, (url.format(n) for n in range(12)))
        #
        # for result in future:
        #     print(result)
    
        print(f'总耗时:{time.time() - start}')  # 总耗时:2.630300760269165
    

    实测 submit 未按顺序返回结果

    2.3 ProcessPoolExecutor

    ProcessPoolExecutor 使用进程池来异步执行调用,适合计算密集型任务,方法参数:

    concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
    

    示例:

    # coding=utf-8
    
    
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    
    def fib(n):
        if n <= 2:
            return 1
    
        return fib(n - 1) + fib(n - 2)
    
    
    if __name__ == '__main__':
        numbers = range(20)
        with ProcessPoolExecutor(max_workers=3) as executor:
            # # map 方式
            # for num, result in zip(numbers, executor.map(fib, numbers)):
            #     print(f"{num}====>{result}")
    
            # submit 方式
            work_dict = {executor.submit(fib, i): i for i in numbers}
            for future in as_completed(work_dict):
                num = work_dict[future]
                try:
                    data = future.result()
                except Exception as e:
                    print(e)
                else:
                    print(f"fib({num} = {data})")
    

    2.4 Future 类

    Future 类封装了可调用对象的异步执行,由 Executor.submit() 产生,有如下方法:

    • cancel() :尝试取消调用,如果该调用正在执行中,无法取消,本方法返回 False,其他情况下调用会被取消,并返回 True;只有当任务提交了还没执行才可以通过这种方式取消
    • cancelled(): 如果调用已经被成功取消,返回 True
    • running() :如果调用正在执行,无法被取消,则返回 True
    • done() :如果调用成功被取消或者已经执行完毕,返回 True
    • result(timeout=None): 返回调用的返回值。如果调用还没有完成,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutError``。timeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。如果 future 在完成之前被取消了,会抛出 CancelledError 异常,如果调用抛出异常,这个方法会抛出同样的异常。同时它也会阻塞直到任务完成,获取被取消
    • exception(timeout=None) :返回被调用抛出的异常,如果调用还没有执行完毕,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutErrortimeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。
      如果 future 在完成之前被取消了,会抛出 CancelledError 异常,如果调用完成并且没有抛出异常,返回 None
    • add_done_callback(fn):为 future 附加可调用对象 fn,当 future 运行完毕或者被取消时,它会被用作 fn 的唯一参数,并调用 fn。可调用对象按照添加顺序依次调用,并且总是在添加时所处进程的一个线程内调用它。如果该可调用对象抛出了属于 Exception 子类的异常,它会被记录并忽略。如果它抛出了属于 BaseException 子类的异常,该行为未定义。
      如果 future 已经完成或者已经取消,fn 会被立即调用

    通过 add_done_callack() 获取返回值和捕获异常

    concurrent.futuresthread.ProcessPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用 concurrent.futures.Future.exception(timeout=None) 方法主动获取 worker 的异常:

    # coding=utf-8
    
    
    from concurrent.futures import ProcessPoolExecutor, as_completed
    
    
    def fib(n):
        if n <= 2:
            return 1
    
        return fib(n - 1) + fib(n - 2)
    
    
    def call_back(future):
        """
        回调(可获取多进程返回值、错误)
        :param future: future 对象
        :return:
        """
        # 获取错误信息
        worker_exception = future.exception()
        if worker_exception:
            print(worker_exception)
        
        # 获取返回值
        print(future.result())
    
    
    def test(n):
        if n % 2 == 0:
            n / 0	# 发生异常
    
        return n * 2
    
    
    if __name__ == '__main__':
        numbers = range(20)
        with ProcessPoolExecutor(max_workers=3) as executor:
            # # map 方式
            # for num, result in zip(numbers, executor.map(fib, numbers)):
            #     print(f"{num}====>{result}")
    
            # submit 方式
            # work_dict = {executor.submit(fib, i): i for i in numbers}
            # for future in as_completed(work_dict):
            #     num = work_dict[future]
            #     try:
            #         data = future.result()
            #     except Exception as e:
            #         print(e)
            #     else:
            #         print(f"fib({num} = {data})")
    
            # 其他方法
            for i in numbers:
                executor.submit(test, i).add_done_callback(call_back)
    

    实测 map() 方式提交的会触发异常,submit() 方式需要通过 add_done_callback() 主动捕获异常!

    参考文章

    • https://blog.csdn.net/jpch89/article/details/87643972
    • https://blog.csdn.net/makingLJ/article/details/98084973

    3. 实例

    3.1 多进程(池)向同一文件写入数据

    # coding=utf-8
    
    """
    Function:回调函数解决多进程向同一文件写入数据
    """
    import multiprocessing
    
    
    def callback(result):
        """回调函数"""
        with open("result.txt", "a+", encoding="utf-8") as f:
            f.write(str(result) + "
    ")
    
    
    def run(num):
        return num * num
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(6)
        for i in range(1000):
            pool.apply_async(run, args=(i,), callback=callback)
    
        pool.close()
        pool.join()
    
  • 相关阅读:
    hdu 2199 Can you solve this equation? 二分
    STL 学习代码~
    hdu 1551 Cable master 二分
    fzu 1564 Combination 组合数是否包含因数
    fafu 1079 帮冬儿忙 组合数包含因数个数
    soj 3290 Distribute The Apples I 组合数对素数取余
    fzu 2020 组合 组合数对素数取余
    hdu 1969 Pie 二分
    hdu 2141 Can you find it? 二分
    hdu 2899 Strange fuction 二分
  • 原文地址:https://www.cnblogs.com/midworld/p/14614634.html
Copyright © 2011-2022 走看看