zoukankan      html  css  js  c++  java
  • 进程池的同步与异步用法Pool

    进程池的同步,如下程序:

    from multiprocessing import Pool
    import time
    import os
    
    
    def func(n):
        print('start 进程 %s'%n, os.getpid())
        time.sleep(1)
        print('end 进程 %s'%n, os.getpid())
    
    
    if __name__ == "__main__":
        pool = Pool()  # 如果进程池不加数字,默认值为往前CPU的数量一样
        for i in range(10):
            pool.apply(func, args=(i,))  

    查看结果:

    start 进程 0 21896
    end 进程 0 21896
    start 进程 1 27040
    end 进程 1 27040
    start 进程 2 23408
    end 进程 2 23408
    start 进程 3 29704
    end 进程 3 29704
    start 进程 4 31384
    end 进程 4 31384
    start 进程 5 2140
    end 进程 5 2140
    start 进程 6 25780
    end 进程 6 25780
    start 进程 7 29488
    end 进程 7 29488
    start 进程 8 21896
    end 进程 8 21896
    start 进程 9 27040
    end 进程 9 27040

    可以发现,变成了进程变成了同步得了,运行特别慢。

    apply方法的作用就是将进程池的进程变成同步的。

    将进程池变成异步的,使用apply_async就可以了,如下:

    from multiprocessing import Pool
    import time
    import os
    
    
    def func(n):
        print('start 进程 %s'%n, os.getpid())
        time.sleep(1)
        print('end 进程 %s'%n, os.getpid())
    
    
    if __name__ == "__main__":
        pool = Pool()
        for i in range(10):
            pool.apply_async(func, args=(i,))

    但是发现结果什么都没有,因为主进程没有等子进程的结束就执行完了。

    如下优化,使用join就行了:

    from multiprocessing import Pool
    import time
    import os
    
    
    def func(n):
        print('start 进程 %s'%n, os.getpid())
        time.sleep(1)
        print('end 进程 %s'%n, os.getpid())
    
    
    if __name__ == "__main__":
        pool = Pool()
        for i in range(10):
            pool.apply_async(func, args=(i,))
        pool.close()  # close()保证的是没有新的任务进入进程池
        pool.join()  # 这里的join检测的是子进程里面的任务结束,而不是子进程的结束,因为进程池里面的进程不会结束,永远活着,被用完之后会被还到进程池里面。

    看结果:

    start 进程 0 22364
    start 进程 1 25820
    start 进程 2 27688
    start 进程 3 26840
    start 进程 4 31100
    start 进程 5 27032
    start 进程 6 21452
    start 进程 7 25592
    end 进程 0 22364
    start 进程 8 22364
    end 进程 1 25820
    start 进程 9 25820
    end 进程 2 27688
    end 进程 3 26840
    end 进程 4 31100
    end 进程 5 27032
    end 进程 6 21452
    end 进程 7 25592
    end 进程 8 22364
    end 进程 9 25820

    值得注意的是,不加pool.close(),直接加pool.join()是会报错的,因为进程池里面的进程用完之后不会结束,而是被还到进程池了,因此这里的join检测的是没有任务再进入进程池了,而不是检测子进程的结束。所以要保证没有任务进入进程池,进程池就不会接收到任务,所以pool.close()的作用就是结束进程池接收任务,就是我的任务执行完毕,且没有新的任务进来,这是就被join检测到了。

     另一种问题如下:

    from multiprocessing import Pool
    import time
    
    
    def func(n):
        time.sleep(0.5)
        return n*n
    
    
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(10):
            ret = pool.apply(func, args=(i,))
            print(ret)

    打印结果:

    0
    1
    4
    9
    16
    25
    36
    49
    64
    81  # 每隔0.5秒打印一个结果

    看下一段代码,使用apply_async:

    from multiprocessing import Pool
    import time
    
    
    def func(n):
        time.sleep(0.5)
        return n*n
    
    
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(10):
            ret = pool.apply_async(func, args=(i,))
            print(ret)

    看打印结果:

    <multiprocessing.pool.ApplyResult object at 0x0000023993E70AC8>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70B70>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70CF8>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70DA0>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70E80>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70F28>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E70FD0>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E810B8>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E81160>
    <multiprocessing.pool.ApplyResult object at 0x0000023993E81208>

    是一个个对象,并且是同时打印出来的。

    优化如下,使用get()方法获取值:

    from multiprocessing import Pool
    import time
    
    
    def func(n):
        time.sleep(0.5)
        return n*n
    
    
    if __name__ == "__main__":
        pool = Pool(5)
        for i in range(10):
            ret = pool.apply_async(func, args=(i,))
            print(ret.get())

    看打印结果:

    0
    1
    4
    9
    16
    25
    36
    49
    64
    81

    值出来的,但是你会发现一个现象,值是每隔0.5秒一个一个出来的,不是异步吗???

    这里要注意for循环将任务传入如进程池时,

        for i in range(10):
            ret = pool.apply_async(func, args=(i,))  # for循环传入任务
            print(ret.get())

    执行到print(ret.get()),这里的get需要获取值,但是这是没有值,get就会让程序出现阻塞直到等到值,所以for循环的每一步都要等到值才会继续下一个for循环,就出现了不是异步的情况。

    改进方法:

    from multiprocessing import Pool
    import time
    
    
    def func(n):
        time.sleep(0.5)
        return n*n
    
    
    if __name__ == "__main__":
        pool = Pool(5)
        l_list = []
        for i in range(10):
            ret = pool.apply_async(func, args=(i,))
            l_list.append(ret)
        for m in l_list:
            print(m.get())

    这时候你会发现五个五个的出现。

    结束!

  • 相关阅读:
    Mitmproxy使用教程for mac
    Flink重启策略
    Flink中的Time
    Flink的分布式缓存
    8-Flink中的窗口
    10-Flink集群的高可用(搭建篇补充)
    Flink-Kafka-Connector Flink结合Kafka实战
    15-Flink实战项目之实时热销排行
    16-Flink-Redis-Sink
    17-Flink消费Kafka写入Mysql
  • 原文地址:https://www.cnblogs.com/aaronthon/p/9840488.html
Copyright © 2011-2022 走看看