zoukankan      html  css  js  c++  java
  • python之multiprocessing多进程

    multiprocessing 充分利用cpu多核
    一般情况下cpu密集使用进程池,IO密集使用线程池。python下想要充分利用多核CPU,就用多进程。

    Process 类
    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。
    star() 方法启动进程,
    join() 方法实现进程间的同步,等待所有进程退出。
    close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。


    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    group: 线程组,目前还没有实现,库引用中提示必须是None;
    target 是函数名字,需要调用的函数
    args 函数需要的参数,以 tuple 的形式传入


    实例方法:
      is_alive():返回进程是否在运行。
      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
      start():进程准备就绪,等待CPU调度
      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
      terminate():不管任务是否完成,立即停止工作进程
    属性:
      authkey
      daemon:和线程的setDeamon功能一样
      exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
      name:进程名字。
      pid:进程号。

    列子一:

    import multiprocessing
    import os
    
    def run_proc(name):
        print('Child process {0} {1} Running '.format(name, os.getpid()))
    
    if __name__ == '__main__':
        print('Parent process {0} is Running'.format(os.getpid()))
        for i in range(5):
            p = multiprocessing.Process(target=run_proc, args=(str(i),))
            print('process start')
            p.start()
        p.join()
        print('Process close')
    [python@master test]$ python3 a.py 
    Parent process 12665 is Running
    process start
    process start
    process start
    Child process 0 12666 Running 
    process start
    process start
    Child process 2 12668 Running 
    Child process 1 12667 Running 
    Child process 3 12669 Running 
    Child process 4 12670 Running 
    Process close

    列子二:

    #coding=utf-8
    
    import multiprocessing
    
    def do(n) :
     name = multiprocessing.current_process().name
     print(name,'starting')
     print ("worker ", n)
    
    if __name__ == '__main__' :
     for i in range(5) :
        p = multiprocessing.Process(target=do, args=(i,))
        p.start()
        p.join()
        print ("Process end.")
    [python@master test]$ python3 b.py 
    Process-1 starting
    worker  0
    Process end.
    Process-2 starting
    worker  1
    Process end.
    Process-3 starting
    worker  2
    Process end.
    Process-4 starting
    worker  3
    Process end.
    Process-5 starting
    worker  4
    Process end.

    Pool类

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量
    构造方法:

    Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
    initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
    maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
    context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

    实例方法:
      apply(func[, args[, kwds]]):同步进程池
      apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
      close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
      terminate() : 结束工作进程,不在处理未完成的任务
      join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

    pool.join()必须使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于:
    close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
    异步进程池

    #每次循环将会用空闲出来的子进程去调用目录--异步
    #不等待只要进程池的位置空出来就立刻补上

    # coding:utf-8
    from  multiprocessing import Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    def Bar(arg):
        print("callback"+str(arg))
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print ('the program time is :%s' %t)
    [python@master test]$ python3 c.py 
    callback100
    callback103
    callback101
    callback102
    callback104
    callback107
    callback106
    callback109
    callback105
    callback108
    the program time is :4.0822553634643555

    同步进程池

    #阻塞式的请求 自加阻塞  顺序结构
    #必须要在进程池中没有进程的的时候 才会有新进程进入进程池

    # -*- coding:utf-8 -*-
    from  multiprocessing import Process, Pool
    import time
    
    def Foo(i):
        time.sleep(1)
        print (i + 100)
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)   #定义一个进程池,最大的进程数量
        for i in range(10):
            pool.apply(Foo, (i,))
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)
    [python@master test]$ python3 d.py 
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :10.224181175231934

    正确使用get()方法获取结果

    # coding:utf-8
    from  multiprocessing import Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    
    def Bar(arg):
        print('callback'+str(arg))
    
    if __name__ == '__main__':
        res_list=[]
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
           res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
           res_list.append(res)
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        for res in res_list:
            print(res.get())
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print ('the program time is :%s' %t)
    [python@master test]$ python3 e.py 
    callback101
    callback100
    callback102
    callback104
    callback103
    callback105
    callback109
    callback106
    callback107
    callback108
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :4.145965099334717

    进程数据共享

    进程各自持有一份数据,默认无法共享数据

    # coding:utf-8
    from multiprocessing import Process
    li = []
    def foo(i):
        li.append(i)
        print ('say hi', li)
    if __name__ == '__main__':
    
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    
        print ('ending', li)
    [python@master test]$ python3 a.py 
    say hi [0]
    say hi [2]
    say hi [3]
    say hi [5]
    say hi [1]
    say hi [6]
    ending []
    say hi [7]
    say hi [4]
    say hi [9]
    say hi [8]

    方法一(使用Array):

    from multiprocessing import Process, Array
    
    def f(a):
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        arr = Array('i', range(10))
        p = Process(target=f, args=(arr,))
        p.start()
        p.join()
    
        print(arr[:])
    [python@master test]$ python3 b.py 
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    方法二(使用Manager):

    Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

    from multiprocessing import Process, Manager
    
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)
    [python@master test]$ python3 c.py 
    {1: '1', '2': 2, 0.25: None}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

     使用多个进程池:

    #coding: utf-8
    import multiprocessing
    import os, time, random
    
    def Lee():
        print ("
    Run task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
        start = time.time()
        time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
        end = time.time()
        print ('Task Lee, runs %0.2f seconds.' %(end - start))
    
    def Marlon():
        print ("
    Run task Marlon-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print ('Task Marlon runs %0.2f seconds.' %(end - start))
    
    def Allen():
        print ("
    Run task Allen-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print ('Task Allen runs %0.2f seconds.' %(end - start))
    
    def Frank():
        print ("
    Run task Frank-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print ('Task Frank runs %0.2f seconds.' %(end - start))
            
    if __name__=='__main__':
        function_list=  [Lee, Marlon, Allen, Frank] 
        print ("parent process %s" %(os.getpid()))
    
        pool=multiprocessing.Pool(4)
        for func in function_list:
            pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print ('Waiting for all subprocesses done...')
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print ('All subprocesses done.')
        
    [python@master test]$ python3 e.py 
    parent process 20714
    Waiting for all subprocesses done...
    
    Run task Lee-20715
    
    Run task Marlon-20716
    
    Run task Allen-20718
    
    Run task Frank-20717
    Task Lee, runs 3.18 seconds.
    Task Frank runs 11.47 seconds.
    Task Allen runs 23.24 seconds.
    Task Marlon runs 38.09 seconds.
    All subprocesses done.
  • 相关阅读:
    【BZOJ 4151 The Cave】
    【POJ 3080 Blue Jeans】
    【ZBH选讲·树变环】
    【ZBH选讲·拍照】
    【ZBH选讲·模数和】
    【CF Edu 28 C. Four Segments】
    【CF Edu 28 A. Curriculum Vitae】
    【CF Edu 28 B. Math Show】
    【CF Round 439 E. The Untended Antiquity】
    【CF Round 439 C. The Intriguing Obsession】
  • 原文地址:https://www.cnblogs.com/hello-wei/p/10168544.html
Copyright © 2011-2022 走看看