zoukankan      html  css  js  c++  java
  • python之多进程multiprocessing模块

    process类介绍

    multiprocessing 模块官方说明文档

    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。

    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

    • star() 方法启动进程。
    • join() 方法实现进程间的同步,等待所有进程退出。
    • close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
    multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    • target 是函数名字,需要调用的函数
    • args 函数需要的参数,以 tuple 的形式传入

    创建子进程方式一:

    rom multiprocessing import Process
    import time
    def f(name):
        time.sleep(2)
        print('hello', name)
     
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

    创建子进程方式二: 

    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
        def run(self):
            print('task <%s> is runing' % self.name)
            time.sleep(2)
            print('task <%s> is done' % self.name)
    
    if __name__ == '__main__':
        p=MyProcess('egon')
        p.start()
    
        print('主')
    

       注意:run方法是必须去重写的。

    查看进程父子进程的进程号,示例:

    from multiprocessing import Process
    import os
     
    def info(title):
        print(title)
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())
        print("
    
    ")
     
    def f(name):
        info('33[31;1mfunction f33[0m')
        print('hello', name)
     
    if __name__ == '__main__':
        info('33[32;1mmain process line33[0m')
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()
    

     进程间通信

    • 先要声明一点,这里所说的进程间通信指的是具有父子关系的进程间通信机制,如果两个进程间没有任何关系,这里的机制是无法实现的。

    Queues

    使用方法跟threading里的queue差不多

    from multiprocessing import Process, Queue
     
    def f(q):
        q.put([42, None, 'hello'])
     
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

     Pipes

    常用来在两个进程间通信,两个进程分别位于管道的两端。

    multiprocessing.Pipe([duplex])
    

     示例一:

    rom multiprocessing import Process, Pipe
    
    def send(pipe):
        pipe.send(['spam'] + [42, 'egg'])   # send 传输一个列表
        pipe.close()
    
    if __name__ == '__main__':
        (con1, con2) = Pipe()                            # 创建两个 Pipe 实例
        sender = Process(target=send, args=(con1, ))     # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
        sender.start()                                   # Process 类启动进程
        print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 从send收到消息
        con2.close()                                     # 关闭管道
    

     结果:

    con2 got: ['spam', 42, 'egg']
    

     示例二:

    from multiprocessing import Process, Pipe
    
    def talk(pipe):
        pipe.send(dict(name='Bob', spam=42))            # 传输一个字典
        reply = pipe.recv()                             # 接收传输的数据
        print('talker got:', reply)
    
    if __name__ == '__main__':
        (parentEnd, childEnd) = Pipe()                  # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
        child = Process(target=talk, args=(childEnd,))  # 创建一个 Process 进程,名称为 child
        child.start()                                   # 启动进程
        print('parent got:', parentEnd.recv())          # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
        parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
        child.join()                                    # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
        print('parent exit')
    

       结果:

    parent got: {'name': 'Bob', 'spam': 42}
    talker got: {'ss', 'aa', 'pp', 'mm'}
    parent exit
    

     Managers

    A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

    A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array. For example:

    from multiprocessing import Process, Manager
     
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.append(1)
        print(l)
     
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
     
            l = manager.list(range(5))
            p_list = []
            for i in range(10):
                p = Process(target=f, args=(d, l))
                p.start()
                p_list.append(p)
            for res in p_list:
                res.join()
     
            print(d)
            print(l)
    

    进程池

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。

    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:

    #!/usr/bin/env python
    # _*_ coding utf-8 _*_
    #Author: aaron
    
    from multiprocessing import Process, Pool
    import time, os
    
    
    def Foo(i):
        time.sleep(5)
        print('in process[Foo]', os.getpid())
        return i + 100
    
    
    def Bar(arg):  # 父进程去执行,而不是子进程调用
        print('-->exec done:', arg)
        print('in process[Bar]', os.getpid())
    
    
    if __name__ == '__main__':
        pool = Pool(5)  # 允许进程池里同时放入5个进程 其他多余的进程处于挂起状态
    
        for i in range(10):
            pool.apply_async(func=Foo, args=(i,), callback=Bar)  
            # pool.apply(func=Foo, args=(i,))  
    
        print('end:', os.getpid())
        pool.close()  # close() 必须在join()前被调用
        pool.join()   # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
    
    • pool.apply_async()用来向进程池提交目标请求。
    • pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。
    • close()terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。
    • result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。    
    • 利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低.
  • 相关阅读:
    sublime text 4 vim 插件配置
    ssh-keygen 的使用
    distribution transaction solution
    bilibili 大数据 视频下载 you-get
    Deepin 20.2.1 安装 MS SQL 2019 容器版本
    【转】使用Linux下Docker部署MSSQL并加载主机目录下的数据库
    【转】You Can Now Use OneDrive in Linux Natively Thanks to Insync
    dotnet 诊断工具安装命令
    Linux 使用 xrandr 设置屏幕分辨率
    【转】CentOS 7.9 2009 ISO 官方原版镜像下载
  • 原文地址:https://www.cnblogs.com/cjaaron/p/9175828.html
Copyright © 2011-2022 走看看