zoukankan      html  css  js  c++  java
  • Python学习笔记18:标准库之多进程(multiprocessing包)

    我们能够使用subprocess包来创建子进程。但这个包有两个非常大的局限性:
    1) 我们总是让subprocess执行外部的程序,而不是执行一个Python脚本内部编写的函数。
    2) 进程间仅仅通过管道进行文本交流。
    以上限制了我们将subprocess包应用到更广泛的多进程任务。
    这种比較实际是不公平的,由于subprocessing本身就是设计成为一个shell,而不是一个多进程管理包。



    一 threading和multiprocessing

    multiprocessing包是Python中的多进程管理包。
    与threading.Thread类似。它能够利用multiprocessing.Process对象来创建一个进程。
    该进程能够执行在Python程序内部编写的函数。
    该Process对象与Thread对象的使用方法同样,也有start(), run(), join()的方法。
    此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象能够像多线程那样,通过參数传递给各个进程),用以同步进程。其使用方法与threading包中的同名类一致。
    所以。multiprocessing的非常大一部份与threading使用同一套API,仅仅只是换到了多进程的情境。




    但在使用这些共享API的时候,我们要注意下面几点:
    1)在UNIX平台上,当某个进程终结之后。该进程须要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。


    所以。有必要对每一个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,因为仅仅有一个进程。所以不存在此必要性。
    2)multiprocessing提供了threading包中没有的IPC(比方Pipe和Queue),效率上更高。


    应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (由于它们占领的不是用户进程的资源)。
    3)多进程应该避免共享资源。

    在多线程中,我们能够比較easy地共享资源,比方使用全局变量或者传递參数。
    在多进程情况下。因为每一个进程有自己独立的内存空间。以上方法并不合适。此时我们能够通过共享内存和Manager的方法来共享资源。
    但这样做提高了程序的复杂度,并由于同步的须要而减少了程序的效率。Process.PID中保存有PID,假设进程还没有start()。则PID为None。




    我们能够从以下的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。
    但问题是,全部的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。
    使用Lock同步,在一个任务输出完毕之后,再同意还有一个任务输出,能够避免多个任务同一时候向终端输出。




    # Similarity and difference of multi thread vs. multi process
     
    import os
    import threading
    import multiprocessing
     
    # worker function
    def worker(sign, lock):
        lock.acquire()
        print(sign, os.getpid())
        lock.release()
     
    # Main
    print('Main:',os.getpid())
     
    # Multi-thread
    record = []
    lock  = threading.Lock()
    for i in range(5):
        thread = threading.Thread(target=worker,args=('thread',lock))
        thread.start()
        record.append(thread)
     
    for thread in record:
        thread.join()
     
    # Multi-process
    record = []
    lock = multiprocessing.Lock()
    for i in range(5):
        process = multiprocessing.Process(target=worker,args=('process',lock))
        process.start()
        record.append(process)
     
    for process in record:
        process.join()

    全部Thread的PID都与主程序同样,而每一个Process都有一个不同的PID。

    二 Pipe和Queue

    管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue能够用来传送常见的对象。


    1) Pipe能够是单向(half-duplex)。也能够是双向(duplex)。


    我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默觉得双向)。
    一个进程从PIPE一端输入对象,然后被PIPE还有一端的进程接收,单向管道仅仅同意管道一端的进程输入,而双向管道则同意从两端输入。


    以下的程序展示了Pipe的使用:

    # Multiprocessing with Pipe
     
    import multiprocessing as mul
     
    def proc1(pipe):
        pipe.send('hello')
        print('proc1 rec:',pipe.recv())
     
    def proc2(pipe):
        print('proc2 rec:',pipe.recv())
        pipe.send('hello, too')
     
    # Build a pipe
    pipe = mul.Pipe()
     
    # Pass an end of the pipe to process 1
    p1   = mul.Process(target=proc1, args=(pipe[0],))
    
    
    # Pass the other end of the pipe to process 2
    p2   = mul.Process(target=proc2, args=(pipe[1],))
    
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    这里的Pipe是双向的。
    Pipe对象建立的时候,返回一个含有两个元素的表。每一个元素代表Pipe的一端(Connection对象)。
    我们对Pipe的某一端调用send()方法来传送对象,在还有一端使用recv()来接收。


    2) Queue与Pipe相类似,都是先进先出的结构。但Queue同意多个进程放入,多个进程从队列取出对象。
    Queue使用mutiprocessing.Queue(maxsize)创建。maxsize表示队列中能够存放对象的最大数量。
    以下的程序展示了Queue的使用:
    import os
    import multiprocessing
    import time
    #==================
    # input worker
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.time())
        queue.put(info)
     
    # output worker
    def outputQ(queue,lock):
        info = queue.get()
        lock.acquire()
        print (str(os.getpid()) + '(get):' + info)
        lock.release()
    #===================
    # Main
    record1 = []   # store input processes
    record2 = []   # store output processes
    lock  = multiprocessing.Lock()    # To prevent messy print
    queue = multiprocessing.Queue(3)
     
    # input processes
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)
     
    # output processes
    for i in range(10):
        process = multiprocessing.Process(target=outputQ,args=(queue,lock))
        process.start()
        record2.append(process)
     
    for p in record1:
        p.join()
     
    queue.close()  # No more object will come, close the queue
     
    for p in record2:
        p.join()

    一些进程使用put()在Queue中放入字符串,这个字符串中包括PID和时间。
    还有一些进程从Queue中取出,并打印自己的PID以及get()的字符串。


    三 进程池

    进程池 (Process Pool)能够创建多个进程。这些进程就像是随时待命的士兵,准备运行任务(程序)。

    一个进程池中能够容纳多个待命的士兵。


    比方以下的程序:

    import multiprocessing as mul
     
    def f(x):
        return x**2
     
    pool = mul.Pool(5)
    rel  = pool.map(f,[1,2,3,4,5,6,7,8,9,10])
    print(rel)

    我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool执行的每一个进程都执行f()函数。
    我们利用map()方法,将f()函数作用到表的每一个元素上。这与built-in的map()函数类似。仅仅是这里用5个进程并行处理。
    假设进程执行结束后,还有须要处理的元素。那么的进程会被用于又一次执行f()函数。除了map()方法外。Pool还有以下的经常用法。


    1)apply_async(func,args)从进程池中取出一个进程运行func。args为func的參数。
    它将返回一个AsyncResult的对象。你能够对该对象调用get()方法以获得结果。


    2)close()进程池不再创建新的进程
    3)join()wait进程池中的所有进程。

    必须对Pool先调用close()方法才干join。


    四 共享资源

    多进程共享资源必定会带来进程间相互竞争。而这样的竞争又会造成race condition,我们的结果有可能被竞争的不确定性所影响。
    但假设须要,我们依旧能够通过共享内存和Manager对象这么做。


    1 共享内存

    依据共享内存(shared memory)的原理,这里给出用Python实现的样例:
    import multiprocessing
     
    def f(n, a):
        n.value   = 3.14
        a[0]      = 5
     
    num   = multiprocessing.Value('d', 0.0)
    arr   = multiprocessing.Array('i', range(10))
     
    p = multiprocessing.Process(target=f, args=(num, arr))
    p.start()
    p.join()
     
    print num.value
    print arr[:]

    这里我们实际上仅仅有主进程和Process对象代表的进程。
    我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为0.0。
    而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们改动了Value和Array对象。
    回到主程序。打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。


     

    2 Manager

    Manager对象类似于服务器与客户之间的通信 (server-client),与我们在Internet上的活动非常类似。


    我们用一个进程作为server,建立Manager来真正存放资源。其他的进程能够通过參数传递或者依据地址来訪问Manager,建立连接后。操作server上的资源。


    在防火墙同意的情况下,我们全然能够将Manager运用于多计算机。从而模仿了一个真实的网络情境。
    以下的样例中,我们对Manager的使用类似于shared memory。但能够共享更丰富的对象类型。

    import multiprocessing
     
    def f(x, arr, l):
        x.value = 3.14
        arr[0] = 5
        l.append('Hello')
     
    server = multiprocessing.Manager()
    x    = server.Value('d', 0.0)
    arr  = server.Array('i', range(10))
    l    = server.list()
     
    proc = multiprocessing.Process(target=f, args=(x, arr, l))
    proc.start()
    proc.join()
     
    print(x.value)
    print(arr)
    print(l)

    Manager利用list()方法提供了表的共享方式。
    实际上你能够利用dict()来共享词典,Lock()来共享threading.Lock(注意,我们共享的是threading.Lock。而不是进程的mutiprocessing.Lock。

    后者本身已经实现了进程共享)等。
    这样Manager就同意我们共享很多其它样的对象。

  • 相关阅读:
    shell中的 echo命令
    shell中的运算符
    shell中的替换
    shell中的元字符
    springcloud-sleuth的使用
    springcloud-sleuth之zipkun运行和概念介绍
    springcloud-sleuth是什么
    消息总线(bus)和消息驱动(stream)的区别
    springcloud-stream之持久化
    springcloud-stream之消费者重复消费
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/6784482.html
Copyright © 2011-2022 走看看