zoukankan      html  css  js  c++  java
  • python之multiprocessing多进程

    multiprocessing 充分利用cpu多核
    一般情况下cpu密集使用进程池,IO密集使用线程池。python下想要充分利用多核CPU,就用多进程。

    Process 类
    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。
    star() 方法启动进程,
    join() 方法实现进程间的同步,等待所有进程退出。
    close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。


    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    group: 线程组,目前还没有实现,库引用中提示必须是None;
    target 是函数名字,需要调用的函数
    args 函数需要的参数,以 tuple 的形式传入


    实例方法:
      is_alive():返回进程是否在运行。
      join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
      start():进程准备就绪,等待CPU调度
      run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
      terminate():不管任务是否完成,立即停止工作进程
    属性:
      authkey
      daemon:和线程的setDeamon功能一样
      exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
      name:进程名字。
      pid:进程号。

    列子一:

    import multiprocessing
    import os
    
    def run_proc(name):
        print('Child process {0} {1} Running '.format(name, os.getpid()))
    
    if __name__ == '__main__':
        print('Parent process {0} is Running'.format(os.getpid()))
        for i in range(5):
            p = multiprocessing.Process(target=run_proc, args=(str(i),))
            print('process start')
            p.start()
        p.join()
        print('Process close')
    [python@master test]$ python3 a.py 
    Parent process 12665 is Running
    process start
    process start
    process start
    Child process 0 12666 Running 
    process start
    process start
    Child process 2 12668 Running 
    Child process 1 12667 Running 
    Child process 3 12669 Running 
    Child process 4 12670 Running 
    Process close

    列子二:

    #coding=utf-8
    
    import multiprocessing
    
    def do(n) :
     name = multiprocessing.current_process().name
     print(name,'starting')
     print ("worker ", n)
    
    if __name__ == '__main__' :
     for i in range(5) :
        p = multiprocessing.Process(target=do, args=(i,))
        p.start()
        p.join()
        print ("Process end.")
    [python@master test]$ python3 b.py 
    Process-1 starting
    worker  0
    Process end.
    Process-2 starting
    worker  1
    Process end.
    Process-3 starting
    worker  2
    Process end.
    Process-4 starting
    worker  3
    Process end.
    Process-5 starting
    worker  4
    Process end.

    Pool类

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量
    构造方法:

    Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
    processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
    initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
    maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
    context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

    实例方法:
      apply(func[, args[, kwds]]):同步进程池
      apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
      close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
      terminate() : 结束工作进程,不在处理未完成的任务
      join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

    pool.join()必须使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于:
    close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
    异步进程池

    #每次循环将会用空闲出来的子进程去调用目录--异步
    #不等待只要进程池的位置空出来就立刻补上

    # coding:utf-8
    from  multiprocessing import Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    def Bar(arg):
        print("callback"+str(arg))
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print ('the program time is :%s' %t)
    [python@master test]$ python3 c.py 
    callback100
    callback103
    callback101
    callback102
    callback104
    callback107
    callback106
    callback109
    callback105
    callback108
    the program time is :4.0822553634643555

    同步进程池

    #阻塞式的请求 自加阻塞  顺序结构
    #必须要在进程池中没有进程的的时候 才会有新进程进入进程池

    # -*- coding:utf-8 -*-
    from  multiprocessing import Process, Pool
    import time
    
    def Foo(i):
        time.sleep(1)
        print (i + 100)
    if __name__ == '__main__':
        t_start=time.time()
        pool = Pool(5)   #定义一个进程池,最大的进程数量
        for i in range(10):
            pool.apply(Foo, (i,))
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        t_end=time.time()
        t=t_end-t_start
        print('the program time is :%s' %t)
    [python@master test]$ python3 d.py 
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :10.224181175231934

    正确使用get()方法获取结果

    # coding:utf-8
    from  multiprocessing import Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        return i + 100
    
    def Bar(arg):
        print('callback'+str(arg))
    
    if __name__ == '__main__':
        res_list=[]
        t_start=time.time()
        pool = Pool(5)
    
        for i in range(10):
           res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
           res_list.append(res)
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
        for res in res_list:
            print(res.get())
        pool.terminate()
        t_end=time.time()
        t=t_end-t_start
        print ('the program time is :%s' %t)
    [python@master test]$ python3 e.py 
    callback101
    callback100
    callback102
    callback104
    callback103
    callback105
    callback109
    callback106
    callback107
    callback108
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    the program time is :4.145965099334717

    进程数据共享

    进程各自持有一份数据,默认无法共享数据

    # coding:utf-8
    from multiprocessing import Process
    li = []
    def foo(i):
        li.append(i)
        print ('say hi', li)
    if __name__ == '__main__':
    
        for i in range(10):
            p = Process(target=foo, args=(i,))
            p.start()
    
        print ('ending', li)
    [python@master test]$ python3 a.py 
    say hi [0]
    say hi [2]
    say hi [3]
    say hi [5]
    say hi [1]
    say hi [6]
    ending []
    say hi [7]
    say hi [4]
    say hi [9]
    say hi [8]

    方法一(使用Array):

    from multiprocessing import Process, Array
    
    def f(a):
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        arr = Array('i', range(10))
        p = Process(target=f, args=(arr,))
        p.start()
        p.join()
    
        print(arr[:])
    [python@master test]$ python3 b.py 
    [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    方法二(使用Manager):

    Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

    from multiprocessing import Process, Manager
    
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
    
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
    
            print(d)
            print(l)
    [python@master test]$ python3 c.py 
    {1: '1', '2': 2, 0.25: None}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

     使用多个进程池:

    #coding: utf-8
    import multiprocessing
    import os, time, random
    
    def Lee():
        print ("
    Run task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID
        start = time.time()
        time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
        end = time.time()
        print ('Task Lee, runs %0.2f seconds.' %(end - start))
    
    def Marlon():
        print ("
    Run task Marlon-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print ('Task Marlon runs %0.2f seconds.' %(end - start))
    
    def Allen():
        print ("
    Run task Allen-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print ('Task Allen runs %0.2f seconds.' %(end - start))
    
    def Frank():
        print ("
    Run task Frank-%s" %(os.getpid()))
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print ('Task Frank runs %0.2f seconds.' %(end - start))
            
    if __name__=='__main__':
        function_list=  [Lee, Marlon, Allen, Frank] 
        print ("parent process %s" %(os.getpid()))
    
        pool=multiprocessing.Pool(4)
        for func in function_list:
            pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print ('Waiting for all subprocesses done...')
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print ('All subprocesses done.')
        
    [python@master test]$ python3 e.py 
    parent process 20714
    Waiting for all subprocesses done...
    
    Run task Lee-20715
    
    Run task Marlon-20716
    
    Run task Allen-20718
    
    Run task Frank-20717
    Task Lee, runs 3.18 seconds.
    Task Frank runs 11.47 seconds.
    Task Allen runs 23.24 seconds.
    Task Marlon runs 38.09 seconds.
    All subprocesses done.
  • 相关阅读:
    江苏高考的程序题
    visio使用小记
    debian+postfix+dovecot+squirrelmail安装配置笔记
    System.Net.Mail
    DBHelper
    朝发白帝城
    《计算机网络》复习题2010
    mvc3在view中获取是否有验证错误
    Validation failed for one or more entities. See 'EntityValidationErrors' property for more details.
    ASP.NET MVC3 Model验证总结
  • 原文地址:https://www.cnblogs.com/hello-wei/p/10168544.html
Copyright © 2011-2022 走看看