zoukankan      html  css  js  c++  java
  • Python3的multiprocessing多进程-示例

    Python3的multiprocessing多进程-示例

    一、概述

    由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    但在使用这些共享API的时候,我们要注意以下几点:

    multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
    多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

    Process.PID中保存有PID,如果进程还没有start(),则PID为None。

    window系统下,需要注意的是要想启动一个子进程,必须加上那句if name == “main”,进程相关的要写在这句下面。

    二、简单创建多进程:有两种使用方法

    1、直接传入要运行的方法:

    from multiprocessing import Process
    
    def foo(i):
        print ('say hi', i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    ------------------------------------------
    say hi 0
    say hi 1
    say hi 2
    say hi 3
    say hi 4
    say hi 5
    say hi 6
    say hi 7
    say hi 8
    say hi 9

    2、Process继承并覆盖run()

    from multiprocessing import Process
    import time
    
    
    class MyProcess(Process):
    def __init__(self, arg):
    super(MyProcess, self).__init__()
    self.arg = arg
    
    def run(self):
    print('say hi', self.arg)
    time.sleep(1)
    
    
    if __name__ == '__main__':
    
    for i in range(10):
    p = MyProcess(i)
    p.start()

    三、Process类

    1、构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])
    • group: 线程组,目前还没有实现,库引用中提示必须是None;
    • target: 要执行的方法;
    • name: 进程名;
    • args/kwargs: 要传入方法的参数。

    2、实例方法:

    • is_alive():返回进程是否在运行。
    • join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
    • start():进程准备就绪,等待CPU调度
    • run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
    • terminate():不管任务是否完成,立即停止工作进程

    3、属性:

    • authkey
    • daemon:和线程的setDeamon功能一样
    • exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
    • name:进程名字。
    • pid:进程号。

    例子一:

    from multiprocessing import Process
    
    def foo(i):
        print('say hi', i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    ----------------------------------------------
    say hi 0
    say hi 1
    say hi 2
    say hi 3
    say hi 4
    say hi 5
    say hi 6
    say hi 7
    say hi 8
    say hi 9

    例子二:

    from multiprocessing import Process
    import time
    def foo(i):
        time.sleep(1)
        print('say hi', i)
        time.sleep(1)
    
    
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.daemon=True
            p_list.append(p)
    
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
    
        print('main process end')
    ------------------------------------------------
    say hi 0
    say hi 1
    say hi 2
    say hi 3
    say hi 4
    say hi 5
    say hi 6
    say hi 7
    say hi 8
    say hi 9
    main process end

    可以看出join方法和deamon属性的用法和多线程的基本一致。

    四、Pool类

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量

    1、构造方法:

    Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    • processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
    • initializer:如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
    • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
    • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

    2、实例方法:

    • apply(func[, args[, kwds]]):同步进程池
    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步进程池
    • close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
    • terminate() : 结束工作进程,不在处理未完成的任务
    • join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

    例子一(异步进程池):
    pool.join()必须使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。

    from  multiprocessing import Pool
    import time
    
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    
    def Bar(arg):
        print(arg)
    
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)

    例子二(同步进程池):

    from  multiprocessing import Process, Pool
    import time
    
    
    def Foo(i):
        time.sleep(1)
        print(i + 100)
    
    
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
            pool.apply(Foo, (i,))
    
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)

    可以看出进程同步顺序执行了,效率降低

    例子三:异步进程池使用get()方法获得进程执行结果值(错误使用get()方法获取结果)

    from  multiprocessing import Process, Pool
    import time
    
    def Foo(i):
        time.sleep(1)
        return i+100
    
    def Bar(arg):
        return arg
    
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
            res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
            print(res.get())
    
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)
    ----------------------------------------------------------
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :10.273606538772583

    可以看出由于每个进程的get()方法,程序变成同步执行了

    例子四(正确使用get()方法获取结果)

    from  multiprocessing import Pool
    import time
    
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    
    def Bar(arg):
        return arg
    
    if __name__ == '__main__':
        res_list=[]
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
            res = pool.apply_async(func=Foo, args=(i,), callback=Bar)
            res_list.append(res)
    
        pool.close()
        pool.join()
        for res in res_list:
            print(res.get())
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)
    ---------------------------------------------------------------
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :4.311059236526489

    不能在每个进程中执行获取结果值得方式,首先将结果值存在列表里面,对列表循环再取里面的值实现异步获取。

    五、进程数据共享

    进程各自持有一份数据,默认无法共享数据;

    from multiprocessing import Process
    li = []
    
    def foo(i):
        li.append(i)
        print('say hi', li)
    if __name__ == '__main__':
    
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    
        print('ending', li)

    方法一(使用Array):

    Array(‘i’, range(10))中的‘i’参数C语言中的类型:

    ‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte
    ‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
    ‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double
    from multiprocessing import Process, Array
    
    def f(a):
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        arr = Array('i', range(10))
        p = Process(target=f, args=(arr,))
        p.start()
        p.join()
    
        print(arr[:])
    --------------------------------------------------
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    方法二(使用Manager):
    Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

    from multiprocessing import Process, Manager
    
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)
    -------------------------------------------------------------
    {1: '1', '2': 2, 0.25: None}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
  • 相关阅读:
    装配Bean
    百练
    东软小选拔
    俄罗斯乘法
    POJ
    ACdream
    javascript 链式作用域
    ie6/7 bug
    onreadystatechange 和 status
    瀑布流 <<转>>
  • 原文地址:https://www.cnblogs.com/lizm166/p/14658484.html
Copyright © 2011-2022 走看看