zoukankan      html  css  js  c++  java
  • python -- 多进程

    python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

    Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。

    1.Process

    创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

    属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

      (1)创建多进程

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 from multiprocessing import Process
     5 import time
     6 
     7 
     8 def f(name):
     9     time.sleep(1)
    10     print("hello ", name)
    11 
    12 if __name__ == '__main__':
    13     p1 = Process(target=f, args=['Bob1', ])     #target是创建的进程要执行的函数,args是传递给该函数的参数
    14     p2 = Process(target=f, args=['Bob2', ])   #这里进程p*的父进程是当前脚本,脚本的父进程是执行环境(eg:pycharm)
    15     p1.start()
    16     p2.start()
    17     p1.join()
    18     p2.join()
    19     time.sleep(1)
    20     print("----main process over----")

      执行结果

    hello  Bob2
    hello  Bob1
    ----main process over----

    2、LOCK

      当多个进程需要访问共享资源的时候,LOCK可以用来避免访问的冲突 

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 '''
     5 进程同步是说的进程一起执行
     6 '''
     7 from multiprocessing import Process,Lock
     8 
     9 def f(lock,i):
    10     lock.acquire()
    11     try:
    12         print('hello world',i)
    13     finally:
    14         lock.release()
    15 
    16 if __name__ == '__main__':
    17     lock = Lock()
    18 
    19     for num in range(10):
    20         Process(target=f,args=(lock,num)).start()

      执行结果

    hello world 0
    hello world 3
    hello world 2
    hello world 6
    hello world 5
    hello world 1
    hello world 4
    hello world 7
    hello world 9
    hello world 8

    3、Queue、Pipe、Manager

      分别利用Queue、Pipe、Manager实现进程间通信

     (1)Queue

      Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

      get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
     

      (2) Pipe

      Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

       send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 
     

      (3) Manager

      Manager()支持:list、dict、Namespace、Lock、Rlock、Semaphore、 BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array 其中常用的就list、dict。 注意:其中的list、dict都要通过Manager() 实现。
     
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    '''
    进程之间是不能直接通信的,需要第三方作为通信媒介
    '''
    
    '''以下是Queue实现数据传递代码'''
    from multiprocessing import  Process,Queue
    
    def f1(q):
        q.put([22,'root','hello'])
    
    def f2(q):
        q.put([23,'admin','hello'])
    
    
    '''以下是Pipe实现数据传递代码'''
    from multiprocessing import  Process,Pipe
    
    def f3(conn):
        conn.send([23,'admin1','hello'])
    
    def f4(conn):
        conn.send([24, 'root', 'hello'])
    
    
    '''以下是Manager实现数据共享代码'''
    from multiprocessing import Process,Manager
    
    def f(d,l,n):
        d[n] = n
        d['name'] = 'root'
        l.append(n)
        print(l)
    
    #以下是主程序
    if __name__ == '__main__':
        print(
            '''33[42;1m 利用Queue实现进程间的通信 33[0m '''
        )
        '''
        这里的进程间通信是在同一父进程下不同子进程间的通信,因为队列q是在父进程实例化并
        传递到每个子进程中。(当然可以在一个第三方文件里面创建一个队列,其余文件来调用是可以的)
        '''
        q = Queue()    #生成一个第三方队列,用来存放子进程put的值
                    # 这里的Queue是在queue.Queue的基础上,封装了能多进程访问
        pq1 = Process(target=f1,args=(q,))
        pq2 = Process(target=f2,args=(q,))  #要把队列对象q当参数传递给各个子进程
        pq1.start()
        pq2.start()
        print('from parent 1:',q.get())
        print('from parent 2:',q.get())
    
        print(
            '''
    33[43;1m 利用Pipe实现进程间的通信 33[0m '''
        )
        parent_conn,child_conn = Pipe()
        pp1 = Process(target=f3,args=(child_conn,))
        pp2 = Process(target=f4,args=(child_conn,))
        pp1.start()
        pp2.start()
        print('parent data from pp1:', parent_conn.recv())
        print('parent data from pp2:', parent_conn.recv())
        parent_conn.close() #这个在子进程或父进程关闭都可以
    
        print(
            '''
    33[43;1m 利用Manager实现进程间的数据共享 33[0m '''
        )
        '''
        这里Manager()支持:list、dict、Namespace、Lock、Rlock、Semaphore、
         BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array
         其中常用的就list、dict。
         注意:其中的list、dict都要通过Manager() 实现
        '''
        #manager = Manager()  #注意这样后面需要关闭,用with...as...语句更方便
        with Manager() as manager:
            d = manager.dict()
            # l = manager.list()  #这样生成空列表
            l = manager.list(range(5))
    
            process_list = []
            for i in range(10):
                p = Process(target=f,args=(d,l,i))
                p.start()
                process_list.append(p)
    
            for res in process_list:
                res.join()
    
            print("执行结束l:",l)
            print("执行结束d:",d)

    4、POOL

      在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    函数解释:

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
    • close()    关闭pool,使其不在接受新的任务。
    • terminate()    结束工作进程,不在处理未完成的任务。
    • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 '''
     5 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有
     6 可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
     7 
     8 进程池方法:
     9     apply: 同步
    10     apply_async: 异步
    11 '''
    12 from multiprocessing import Process,Pool,freeze_support
    13 import time
    14 
    15 def Foo(i):
    16     time.sleep(1)
    17     return i,i+100
    18 
    19 def Bar(arg):
    20     # print('arg:',arg)
    21     data.append(arg)
    22     print('--> exec done [%s]: %s' % (arg[0],arg[1]))
    23 
    24 if __name__ == '__main__':
    25     freeze_support()   #要导入,在win下必须有它,不然会报错。linux下不用
    26 
    27     pool = Pool(3)  #生成一个有5个进程序列的进程池实例
    28     data = []
    29     for i in range(10):
    30         # func为进程每次执行的函数,args是参数,callback是执行完func后执行的函数,
    31         # 并且callback会自动接收func的返回值作为函数参数。Foo是子进程执行的函数,callback是在
    32         # 父进程执行的函数,所以这里说明是子进程把执行结果返回给父进程里面的函数
    33         pool.apply_async(func=Foo,args=(i,),callback=Bar)
    34         #pool.apply(func=Foo,args=(i,))   #进程同步,没啥用
    35 
    36     print("end")
    37     pool.close()
    38     pool.join()  #这里表示进程池中进程执行完毕后再关闭,若注释,那么程序直接关闭
    39         #注意:这里是先close,再join
    40     print('全局变量data:',data)

      执行结果

    end
    --> exec done [0]: 100
    --> exec done [1]: 101
    --> exec done [2]: 102
    --> exec done [3]: 103
    --> exec done [4]: 104
    --> exec done [5]: 105
    --> exec done [7]: 107
    --> exec done [6]: 106
    --> exec done [8]: 108
    --> exec done [9]: 109
    全局变量data: [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104), (5, 105), (7, 107), (6, 106), (8, 108), (9, 109)]

    5、Semaphore

      Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

     1 #!/usr/bin/env python
     2 # -*- coding:utf8 -*-
     3 
     4 import multiprocessing
     5 import time
     6 
     7 
     8 def run(s, i):
     9     s.acquire()
    10     print(multiprocessing.current_process().name + "acquire");
    11     time.sleep(i)
    12     print(multiprocessing.current_process().name + "release
    ");
    13     s.release()
    14 
    15 
    16 if __name__ == "__main__":
    17     s = multiprocessing.Semaphore(2)  #最多允许3个进程同时运行
    18 
    19     process_list = []
    20     for i in range(3):
    21         p = multiprocessing.Process(target=run, args=(s, i * 2))
    22         p.start()
    23         process_list.append(p)
    24 
    25     for process in process_list:
    26         process.join()
    27 
    28     print('33[31;1m 全部结束 33[0m')

      执行结果

    Process-1acquire
    Process-1release
    
    Process-3acquire
    Process-2acquire
    Process-2release
    
    Process-3release
    
     全部结束 

    6、Event

      用Event实现进程同步

     1 #!/usr/bin/env python
     2 # -*- coding:utf8 -*-
     3 
     4 import multiprocessing
     5 import time
     6 
     7 
     8 def wait_for_event(e):
     9     print("wait_for_event: starting")
    10     e.wait()
    11     print("wairt_for_event: e.is_set()->" + str(e.is_set()))
    12 
    13 
    14 def wait_for_event_timeout(e, t):
    15     print("wait_for_event_timeout:starting")
    16     e.wait(t)
    17     print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
    18 
    19 
    20 if __name__ == "__main__":
    21     e = multiprocessing.Event()
    22     w1 = multiprocessing.Process(name="block",
    23                                  target=wait_for_event,
    24                                  args=(e,))
    25 
    26     w2 = multiprocessing.Process(name="non-block",
    27                                  target=wait_for_event_timeout,
    28                                  args=(e, 2))
    29     w1.start()
    30     w2.start()
    31 
    32     time.sleep(3)
    33 
    34     e.set()
    35     print("main: event is set")

      执行结果

    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    wairt_for_event: e.is_set()->True
    main: event is set
  • 相关阅读:
    自动登录跳板机->开发机
    关于写代码的一下规范
    vscode 配置 GOPATH
    thinkphp6.0 nginx 配置
    vue-cli 3.x 构建项目,webpack没有了?
    Laravel6.0 使用 Jwt-auth 实现多用户接口认证
    怎么在 localhost 下访问多个 Laravel 项目,通过一个IP访问多个项目(不仅仅是改变端口哦)
    laravel 5.8 实现消息推送
    vs code 设置 保存自动格式化vue代码
    项目开发规范(编码规范、命名规范、安全规范、前端优化、源码提交规范、代码维护规范、产品发布规范)
  • 原文地址:https://www.cnblogs.com/xtsec/p/6973382.html
Copyright © 2011-2022 走看看