zoukankan      html  css  js  c++  java
  • Python multiprocessing使用详解

    multiprocessing包是Python中的多进程管理包。
    与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。
    该进程可以运行在Python程序内部编写的函数。
    该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
    此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。
    所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    但在使用这些共享API的时候,我们要注意以下几点:

    • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
    • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
    • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

    Process.PID中保存有PID,如果进程还没有start(),则PID为None

    我们可以从下面的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进
    程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出

    import os
    import threading
    import multiprocessing
    
    # Main
    print('Main:', os.getpid())
    
    # worker function
    def worker(sign, lock):
        lock.acquire()
        print(sign, os.getpid())
        lock.release()
    
    
    # Multi-thread
    record = []
    lock = threading.Lock()
    
    # Multi-process
    record = []
    lock = multiprocessing.Lock()
    
    if __name__ == '__main__':
        for i in range(5):
            thread = threading.Thread(target=worker, args=('thread', lock))
            thread.start()
            record.append(thread)
    
        for thread in record:
            thread.join()
        
        for i in range(5):
            process = multiprocessing.Process(target=worker, args=('process', lock))
            process.start()
            record.append(process)
        
        for process in record:
            process.join()
    
    Main: 10012
    thread 10012
    thread 10012
    thread 10012
    thread 10012
    thread 10012
    Main: 6052
    process 6052
    Main: 8080
    Main: 4284
    Main: 7240
    process 8080
    process 4284
    process 7240
    Main: 10044
    process 10044
    

    Pipe和Queue

    正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。

    1. Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

    下面的程序展示了Pipe的使用:

    import multiprocessing as mul
    
    
    def proc1(pipe):
        pipe.send('hello')
        print('proc1 rec:', pipe.recv())
    
    
    def proc2(pipe):
        print('proc2 rec:', pipe.recv())
        pipe.send('hello, too')
    
    
    # Build a pipe
    pipe = mul.Pipe()
    if __name__ == '__main__':
        # Pass an end of the pipe to process 1
        p1 = mul.Process(target=proc1, args=(pipe[0],))
        # Pass the other end of the pipe to process 2
        p2 = mul.Process(target=proc2, args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    proc2 rec: hello
    proc1 rec: hello, too
    

    这里的Pipe是双向的。

    Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

    1. Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

    下面的程序展示了Queue的使用:

    import os
    import multiprocessing
    import time
    #==================
    # input worker
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.time())
        queue.put(info)
    
    # output worker
    def outputQ(queue,lock):
        info = queue.get()
        lock.acquire()
        print (str(os.getpid()) + ' get: ' + info)
        lock.release()
    #===================
    # Main
    record1 = []   # store input processes
    record2 = []   # store output processes
    lock  = multiprocessing.Lock()    # To prevent messy print
    queue = multiprocessing.Queue(3)
    
    if __name__ == '__main__':
        # input processes
        for i in range(10):
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
        
        # output processes
        for i in range(10):
            process = multiprocessing.Process(target=outputQ,args=(queue,lock))
            process.start()
            record2.append(process)
        
        for p in record1:
            p.join()
        
        queue.close()  # No more object will come, close the queue
        
        for p in record2:
            p.join()
    
    8572 get: 6300(put):1555486924.3676226
    8136 get: 3464(put):1555486924.412625
    9576 get: 9660(put):1555486924.5126307
    6936 get: 5064(put):1555486924.5976355
    10652 get: 8688(put):1555486924.5976355
    6992 get: 10988(put):1555486924.7526445
    6548 get: 6836(put):1555486924.7456443
    3504 get: 7284(put):1555486924.7666454
    8652 get: 4960(put):1555486924.8536503
    10868 get: 460(put):1555486924.8606508
    

    一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

    进程池

    进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的进程。

    import multiprocessing as mul
    
    
    def f(x):
        return x ** 2
    
    
    if __name__ == '__main__':
        pool = mul.Pool(5)
        rel = pool.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
        print(rel)
    
    [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
    

    我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool运行的每个进程都执行f()函数。我们利用map()方法,将f()函数作用到表的每个元素上。这与built-in的map()函数类似,只是这里用5个进程并行处理。如果进程运行结束后,还有需要处理的元素,那么的进程会被用于重新运行f()函数。除了map()方法外,Pool还有下面的常用方法。

    apply_async(func,args) 从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()方法以获得结果。

    close() 进程池不再创建新的进程

    join() wait进程池中的全部进程。必须对Pool先调用close()方法才能join。

    共享内存

    实例代码:

    import multiprocessing
    
    # Value/Array
    def func1(a, arr):
        a.value = 3.14
        for i in range(len(arr)):
            arr[i] = 0
        a.value = 0
    
    if __name__ == '__main__':
        num = multiprocessing.Value('d', 1.0)  # num=0
        arr = multiprocessing.Array('i', range(10))  # arr=range(10)
        p = multiprocessing.Process(target=func1, args=(num, arr))
        p.start()
        p.join()
        print (num.value)
        print (arr[:])
    
    
    0.0
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    

    这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为1.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

    Manager

    Manager是通过共享进程的方式共享数据。
    Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还可以共享类的实例对象。
    实例代码:

    from multiprocessing import Process,Manager
    def func1(shareList,shareValue,shareDict,lock):
        with lock:
            shareValue.value+=1
            shareDict[1]='1'
            shareDict[2]='2'
            for i in xrange(len(shareList)):
                shareList[i]+=1
    
    if __name__ == '__main__':
        manager=Manager()
        list1=manager.list([1,2,3,4,5])
        dict1=manager.dict()
        array1=manager.Array('i',range(10))
        value1=manager.Value('i',1)
        lock=manager.Lock()
        proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]
        for p in proc:
            p.start()
        for p in proc:
            p.join()
        print list1
        print dict1
        print array1
        print value1
    
    [21, 22, 23, 24, 25]
    {1: '1', 2: '2'}
    array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
    Value('i', 21)
    
  • 相关阅读:
    gojs常用API (中文文档)
    webpack的安装
    win10如何将wps设置成默认应用
    gojs常用API-画布操作
    Access中替代case when的方法 .
    C++ 11 中的右值引用
    形参前的&&啥意思?
    【C语言学习笔记】字符串拼接的3种方法 .
    java项目打jar包
    教你用DrawLayout 实现Android 侧滑菜单
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11863960.html
Copyright © 2011-2022 走看看