zoukankan      html  css  js  c++  java
  • multiprocessing 多进程模块-python

    之前使用工具是jupyter导致执行效果和网络教程不一致,使用系统的python就可以达到效果
    multiprocessing 是 Python 的标准模块,它既可以用来编写多进程,也可以用来编写多线程。如果是多线程的话,用 multiprocessing.dummy 即可,用法与 multiprocessing 基本相同.

    基础

    利用 multiprocessing.Process 对象可以创建一个进程,Process 类适合简单的进程创建,如需资源共享可以结合 multiprocessing.Queue 使用;如果想要控制进程数量,则建议使用进程池 Pool 类

    Process 介绍:
    构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])
    group: 线程组,目前还没有实现,库引用中提示必须是 None;
    target: 要执行的方法;
    name: 进程名;
    args/kwargs: 要传入方法的参数。

    实例方法:

    is_alive():返回进程是否在运行。
    join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的 timeout(可选参数)。
    start():进程准备就绪,等待 CPU 调度。
    run():strat() 调用 run 方法,如果实例进程时未制定传入 target,start 执行默认 run() 方法。
    terminate():不管任务是否完成,立即停止工作进程。

    属性:

    authkey
    daemon:和线程的 setDeamon 功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束)。
    exitcode(进程在运行时为 None、如果为 –N,表示被信号 N 结束)。
    name:进程名字。
    pid:进程号。

    示例1:

    import multiprocessing
    
    
    def worker(num):
        """thread worker function"""
        print('Worker:', num)
    
    
    if __name__ == '__main__':
        jobs = []
        for i in range(5):
            p = multiprocessing.Process(target=worker, args=(i,))
            jobs.append(p)
            p.start()
            
    # 输出
    # Worker: 1
    # Worker: 0
    # Worker: 2
    # Worker: 3
    # Worker: 4
    #注意,测试时还是按照顺序执行,并没有出现序列无序情况
    

    确定当前进程

    每个Process实例都有一个名称,其默认值可以在创建进程时更改。命名进程对于跟踪它们非常有用,尤其是在同时运行多种类型进程的应用程序中

    import multiprocessing
    import time
    
    
    def worker():
        name = multiprocessing.current_process().name
        print(name, 'Starting')
        time.sleep(2)
        print(name, 'Exiting')
    
    
    def my_service():
        name = multiprocessing.current_process().name
        print(name, 'Starting')
        time.sleep(3)
        print(name, 'Exiting')
    
    
    if __name__ == '__main__':
        service = multiprocessing.Process(
            name='my_service',
            target=my_service,
        )
        worker_1 = multiprocessing.Process(
            name='worker 1',
            target=worker,
        )
        worker_2 = multiprocessing.Process(  # default name
            target=worker,
        )
    
        worker_1.start()
        worker_2.start()
        service.start()
        
    # output
    # worker 1 Starting
    # worker 1 Exiting
    # Process-3 Starting
    # Process-3 Exiting
    # my_service Starting
    # my_service Exiting
    
    
    

    守护进程

    默认情况下,在所有子进程退出之前,主程序不会退出。有些时候,启动后台进程运行而不阻止主程序退出是有用的,例如为监视工具生成“心跳”的任务。

    要将进程标记为守护程序很简单,只要将daemon属性设置为 True 就可以了。

    import multiprocessing
    import time
    import sys
    
    
    def daemon():
        p = multiprocessing.current_process()
        print('Starting:', p.name, p.pid)
        sys.stdout.flush()
        time.sleep(2)
        print('Exiting :', p.name, p.pid)
        sys.stdout.flush()
    
    
    def non_daemon():
        p = multiprocessing.current_process()
        print('Starting:', p.name, p.pid)
        sys.stdout.flush()
        print('Exiting :', p.name, p.pid)
        sys.stdout.flush()
    
    
    if __name__ == '__main__':
        d = multiprocessing.Process(
            name='daemon',
            target=daemon,
        )
        d.daemon = True
    
        n = multiprocessing.Process(
            name='non-daemon',
            target=non_daemon,
        )
        n.daemon = False
    
        d.start()
        time.sleep(1)
        n.start()
        
    # output
    # Starting: daemon 41838
    # Starting: non-daemon 41841
    # Exiting : non-daemon 41841
    
    

    输出不包括来自守护进程的“退出”消息,因为所有非守护进程(包括主程序)在守护进程从两秒休眠状态唤醒之前退出。
    守护进程在主程序退出之前自动终止,这避免了孤立进程的运行。这可以通过查找程序运行时打印的进程 ID 值来验证,然后使用 ps 命令检查该进程。

    总体来讲,设置子进程为守护状态时,如果主进程退出(即发起子进程的主进程)退出,会把该进程发起的子进程杀死(如果该子进程还在运行).子进程未设置该状态,在子进程退出前,主进程不会退出

    等待进程

    要等到进程完成其工作并退出,请使用 join()方法

    import multiprocessing
    import time
    import sys
    
    
    def daemon():
        name = multiprocessing.current_process().name
        print('Starting:', name)
        time.sleep(2)
        print('Exiting :', name)
    
    
    def non_daemon():
        name = multiprocessing.current_process().name
        print('Starting:', name)
        print('Exiting :', name)
    
    
    if __name__ == '__main__':
        d = multiprocessing.Process(
            name='daemon',
            target=daemon,
        )
        d.daemon = True
    
        n = multiprocessing.Process(
            name='non-daemon',
            target=non_daemon,
        )
        n.daemon = False
    
        d.start()
        time.sleep(1)
        n.start()
    
        d.join()
        n.join()
        
    # output
    # Starting: non-daemon
    # Exiting : non-daemon
    # Starting: daemon
    # Exiting : daemon
    
    

    来源:https://juejin.im/post/5c07b27af265da611b58234c

    https://www.cnblogs.com/chenhuabin/p/10074895.html

    python 进程操作

    ​ 创建进程有两种方式,分别是通过定义函数的方式和通过定义类的方式。两种方式创建进程都必须通过实例化Process类。

      Process类参数如下:

      1) group:这一参数值始终为None,尚未启用,是为以后Python版本准备的

      2) target:表示调用对象,即子进程要执行的任务

      3) args:表示调用对象的位置参数元组,即target的位置参数,必须是元组,如args=(0,1,[1,2,3])

      4) kwargs:表示调用对象的字典参数,kwargs={'name':'egon','age':18}

      5) name:为子进程的名称

    通过定义函数的方式创建进程

    from multiprocessing import Process
    import time
    def func(n):
        print("子进程开始运行:{}……".format(n))
        time.sleep(1)
        print("子进程结束运行:{}……".format(n))
    
    
    if __name__ == '__main__':  # 创建进程执行一定要这一行代码
        print("主进程开始运行……")
        p = Process(target=func, args=(10,))  # 注册
        p.start()  # 启动一个子进程
        time.sleep(1)
        p.join()
        print("主进程结束运行……")
        
    
    ##执行结果
    主进程开始运行……
    子进程开始运行:10……
    子进程结束运行:10……
    主进程结束运行……
    
    
    ##去掉p.join()
    from multiprocessing import Process
    import time
    def func(n):
        print("子进程开始运行:{}……".format(n))
        time.sleep(1)
        print("子进程结束运行:{}……".format(n))
    
    
    if __name__ == '__main__':  # 创建进程执行一定要这一行代码
        print("主进程开始运行……")
        p = Process(target=func, args=(10,))  # 注册
        p.start()  # 启动一个子进程
        time.sleep(1)
        print("主进程结束运行……")
        
    #执行结果
    主进程开始运行……
    子进程开始运行:10……
    主进程结束运行……
    子进程结束运行:10……
    
        
    ##结论:
    p.join() 会迫使主进程等待子进程执行完毕后才继续等待,即子进程执行完毕后主进程才继续执行
    
    
    

    通过定义类的方式创建进程

    from multiprocessing import Process
    import os,time
    
    class MyProcess(Process):
    
        def __init__(self , arg1 , arg2):
    
            super().__init__()
    
            self.arg1 = arg1
    
            self.arg2 = arg2
    
        def run(self):
    
            print('子进程:' , self.pid , self.arg1 , self.arg2)
            print("沉睡中...")
            time.sleep(3)
    
    
    if  __name__ == "__main__" :
         p1 = MyProcess('我是arg1' , '我是arg2')
         p1.start()
         p2 = MyProcess('我是arg1' , '我是arg2')
         p2.start()
    
         print('主进程:', os.getpid())
         print("主进程结束")
        
        
        
        ##一般先执行主进程中的操作,然后才执行子进程,但是如果主进程阻塞,子进程就会线执行(不添加p.join())
    

    Process****的常用属性和方法

    Process类常用属性如下:

      1)daemon:默认值为False,如果设为True,则设为守护进程。

      2)name:进程的名称

      3)pid:进程的pid

      Process类常用方法如下:

      1)start():启动进程,并调用该子进程中的p.run() ;

      2 )run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法;

      3 )terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁;

      4)is_alive():如果p仍然运行,返回True;

      5)join([timeout]):主线程等待子进程终止

    守护进程:daemon

    若要将一个进程设置为守护进程,在进程start之前,将其daemon属性设置为True即可。但什么是守护进程呢?我们通过如下代码来说明,我们要通过代码实现如下效果:主进程创建p1、p2进程,之后立马沉睡5秒,p1进程设置为守护进程,进程p1每隔1秒打印一条语句,进程p2打印一条语句后立马沉睡10秒。代码如下:

    import time
    
    import os
    
    from multiprocessing import Process
    
    def func():
    
        i = 1
    
        while True:
    
            time.sleep(1)
    
            print('{}--子进程p1正在执行,pid:{}'.format(i , os.getpid()))
    
            i+=1
    
    def func2():
    
        print('子进程p2开始执行,pid:{}'.format(os.getpid()))
    
        time.sleep(10)
    
        print('子进程p2结束执行,pid:{}'.format(os.getpid()))
    
    if __name__=='__main__':
    
        print('主进程代码开始运行,pid:{}'.format(os.getpid()))
    
        p = Process(target=func)
    
        p.daemon = True # 设置为守护进程
    
        p.start()
    
        p2 = Process(target=func2)
    
        p2.start()
    
        time.sleep(5)
    
    print('主进程代码运行完了,pid:{}'.format(os.getpid()))
    
    
    
    ##
    运行结果:
    
      主进程代码开始运行,pid:11608
    
      子进程p2开始执行,pid:7260
    
      1--子进程p1正在执行,pid:1060
    
      2--子进程p1正在执行,pid:1060
    
      3--子进程p1正在执行,pid:1060
    
      4--子进程p1正在执行,pid:1060
    
      主进程代码运行完了,pid:11608
    
      子进程p2结束执行,pid:7260
    
      从运行结果字面上似乎看不出什么,因为区别在于输出时间上。在主进程运行的那5秒时间(输出“主进程代码运行完了,pid:11608”之前),p1进程确实可以每隔1秒输出一条语句,但是主进程结束那5秒后,p1不在输出,且在任务管理器中也可以查看到,p1进程也已经死亡,主进程代码虽然运行完了,但依然存活,这时候p2进程依然还在沉睡,10秒后,p2进程打印“子进程p2结束执行,pid:7260”,然后主进程和p2进程一起死亡。
    
      可以得出结果,守护进程依附于主进程代码,只要主进程代码运行完了,那么无论守护进程代码是否运行完,守护进程都会结束。另外,守护进程不能创建自己的子进程。
    
    

    进程终结于存活检查:terminate()与is_alive()

    terminate()与is_alive()都是由进程实例调用,分别用来终结一个进程、检查一个进程是否依然存活:

    import time
    
    import os
    
    from multiprocessing import Process
    
     
    
    def func():
    
        i = 1
    
        while True:
    
            time.sleep(1)
    
            print('{}--子进程p1正在执行,pid:{}'.format(i , os.getpid()))
    
            i+=1
    
    
    if __name__=='__main__':
    
        p = Process(target=func)
    
        p.start()
    
        time.sleep(3)
    
        p.terminate() # 终结进程p
    
        print(p.is_alive()) # 检查p是否依然存活
    
        time.sleep(1)
    
        print(p.is_alive())
        
    ##输出结果:
    
      主进程代码开始运行,pid:13164
    
      1--子进程p1正在执行,pid:8896
    
      2--子进程p1正在执行,pid:8896
    
      True
    
      False
    
      主进程代码运行完了,pid:13164
    
      为什么结束之后第一次调用is_alive()方法输出的是True呢?因为terminate()方法终结一个进程时操作系统需要一定的响应时间,所以可能会有延迟    
    

    join()方法

    join方法功能是阻塞当前所在进程(例如下面的主进程),等到被join的进程(下面的进程p1)结束之后,回到当前进程继续执行。

    from multiprocessing import Process
    
    import time
    
    def func1 ():
    
        print("进程1开始运行……")
    
        for i in range(3):
    
            time.sleep(1)
    
            print("进程1运行了{}秒……".format(i+1))
    
        print("进程1结束运行……")
    
     
    
    def func2 ():
    
        print("进程2开始运行……")
    
        for i in range(6):
    
            time.sleep(1)
    
            print("进程2运行了{}秒……".format(i+1))
    
        print("进程2结束运行……")
    
     
    
    if __name__ == '__main__':
    
        print("主进程开始运行……")
    
        p1 = Process(target=func1)
    
        p2 = Process(target=func2)
    
        p1.start()
    
        p2.start()
    
        time.sleep(1)
    
        p1.join()
    
        # p2.join()
    
        print("主进程结束运行……" )
    
  • 相关阅读:
    golang_并发安全: slice和map并发不安全及解决方法
    什么情况下需要用到互斥锁sync.Mutex?
    使用Charles进行HTTPS抓包
    centos6 yum 源失效 404,终于解决了
    GOMAXPROCS你设置对了吗?
    容器资源可见性问题与 GOMAXPROCS 配置
    gflags 使用方式
    分布式训练问题
    NCCL常用环境变量
    git 常用命令
  • 原文地址:https://www.cnblogs.com/g2thend/p/12526485.html
Copyright © 2011-2022 走看看