zoukankan      html  css  js  c++  java
  • python之多进程multiprocessing模块

    process类介绍

    multiprocessing 模块官方说明文档

    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。

    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

    • star() 方法启动进程。
    • join() 方法实现进程间的同步,等待所有进程退出。
    • close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    • target 是函数名字,需要调用的函数
    • args 函数需要的参数,以 tuple 的形式传入

    创建子进程方式一:

    rom multiprocessing import Process
    import time
    def f(name):
        time.sleep(2)
        print('hello', name)
     
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

    创建子进程方式二: 

    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
        def run(self):
            print('task <%s> is runing' % self.name)
            time.sleep(2)
            print('task <%s> is done' % self.name)
    
    if __name__ == '__main__':
        p=MyProcess('egon')
        p.start()
    
        print('主')
    

       注意:run方法是必须去重写的。

    查看进程父子进程的进程号,示例:

    from multiprocessing import Process
    import os
     
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        print("
    
    ")
     
    def f(name):
        info('33[31;1mfunction f33[0m')
        print('hello', name)
     
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

     进程间通信

    • 先要声明一点,这里所说的进程间通信指的是具有父子关系的进程间通信机制,如果两个进程间没有任何关系,这里的机制是无法实现的。

    Queues

    使用方法跟threading里的queue差不多

    from multiprocessing import Process, Queue
     
    def f(q):
        q.put([42, None, 'hello'])
     
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

     Pipes

    常用来在两个进程间通信,两个进程分别位于管道的两端。

    multiprocessing.Pipe([duplex])
    

     示例一:

    rom multiprocessing import Process, Pipe
    
    def send(pipe):
        pipe.send(['spam'] + [42, 'egg'])   # send 传输一个列表
        pipe.close()
    
    if __name__ == '__main__':
        (con1, con2) = Pipe()                            # 创建两个 Pipe 实例
        sender = Process(target=send, args=(con1, ))     # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
        sender.start()                                   # Process 类启动进程
        print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 从send收到消息
        con2.close()                                     # 关闭管道
    

     结果:

    con2 got: ['spam', 42, 'egg']
    

     示例二:

    from multiprocessing import Process, Pipe
    
    def talk(pipe):
        pipe.send(dict(name='Bob', spam=42))            # 传输一个字典
        reply = pipe.recv()                             # 接收传输的数据
        print('talker got:', reply)
    
    if __name__ == '__main__':
        (parentEnd, childEnd) = Pipe()                  # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
        child = Process(target=talk, args=(childEnd,))  # 创建一个 Process 进程,名称为 child
        child.start()                                   # 启动进程
        print('parent got:', parentEnd.recv())          # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
        parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
        child.join()                                    # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
        print('parent exit')
    

       结果:

    parent got: {'name': 'Bob', 'spam': 42}
    talker got: {'ss', 'aa', 'pp', 'mm'}
    parent exit
    

     Managers

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

    from multiprocessing import Process, Manager
     
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(1)
        print(l)
     
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
     
            l = manager.list(range(5))
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
     
            print(d)
            print(l)
    

    进程池

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。

    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:

    #!/usr/bin/env python
    # _*_ coding utf-8 _*_
    #Author: aaron
    
    from multiprocessing import Process, Pool
    import time, os
    
    
    def Foo(i):
        time.sleep(5)
        print('in process[Foo]', os.getpid())
        return i + 100
    
    
    def Bar(arg):  # 父进程去执行,而不是子进程调用
        print('-->exec done:', arg)
        print('in process[Bar]', os.getpid())
    
    
    if __name__ == '__main__':
        pool = Pool(5)  # 允许进程池里同时放入5个进程 其他多余的进程处于挂起状态
    
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)  
            # pool.apply(func=Foo, args=(i,))  
    
        print('end:', os.getpid())
        pool.close()  # close() 必须在join()前被调用
        pool.join()   # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    
    • pool.apply_async()用来向进程池提交目标请求。
    • pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。
    • close()terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
    • result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。    
    • 利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低.
  • 相关阅读:
    《代码阅读方法与实践》阅读笔记之二
    《代码阅读方法与实践》阅读笔记一
    专业实训题目需求分析
    阅读计划
    第二阶段Sprint10
    第二阶段Sprint9
    第二阶段Sprint8
    第二阶段Sprint7
    第二阶段个人工作总结(8)
    第二阶段个人工作总结(7)
  • 原文地址:https://www.cnblogs.com/cjaaron/p/9175828.html
Copyright © 2011-2022 走看看