zoukankan      html  css  js  c++  java
  • python -- 多进程

    python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。

    Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。

    1.Process

    创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
    方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

    属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

      (1)创建多进程

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 from multiprocessing import Process
     5 import time
     6 
     7 
     8 def f(name):
     9     time.sleep(1)
    10     print("hello ", name)
    11 
    12 if __name__ == '__main__':
    13     p1 = Process(target=f, args=['Bob1', ])     #target是创建的进程要执行的函数,args是传递给该函数的参数
    14     p2 = Process(target=f, args=['Bob2', ])   #这里进程p*的父进程是当前脚本,脚本的父进程是执行环境(eg:pycharm)
    15     p1.start()
    16     p2.start()
    17     p1.join()
    18     p2.join()
    19     time.sleep(1)
    20     print("----main process over----")

      执行结果

    hello  Bob2
    hello  Bob1
    ----main process over----

    2、LOCK

      当多个进程需要访问共享资源的时候,LOCK可以用来避免访问的冲突 

     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 '''
     5 进程同步是说的进程一起执行
     6 '''
     7 from multiprocessing import Process,Lock
     8 
     9 def f(lock,i):
    10     lock.acquire()
    11     try:
    12         print('hello world',i)
    13     finally:
    14         lock.release()
    15 
    16 if __name__ == '__main__':
    17     lock = Lock()
    18 
    19     for num in range(10):
    20         Process(target=f,args=(lock,num)).start()

      执行结果

    hello world 0
    hello world 3
    hello world 2
    hello world 6
    hello world 5
    hello world 1
    hello world 4
    hello world 7
    hello world 9
    hello world 8

    3、Queue、Pipe、Manager

      分别利用Queue、Pipe、Manager实现进程间通信

     (1)Queue

      Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

      get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
     

      (2) Pipe

      Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

       send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 
     

      (3) Manager

      Manager()支持:list、dict、Namespace、Lock、Rlock、Semaphore、 BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array 其中常用的就list、dict。 注意:其中的list、dict都要通过Manager() 实现。
     
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    '''
    进程之间是不能直接通信的,需要第三方作为通信媒介
    '''
    
    '''以下是Queue实现数据传递代码'''
    from multiprocessing import  Process,Queue
    
    def f1(q):
        q.put([22,'root','hello'])
    
    def f2(q):
        q.put([23,'admin','hello'])
    
    
    '''以下是Pipe实现数据传递代码'''
    from multiprocessing import  Process,Pipe
    
    def f3(conn):
        conn.send([23,'admin1','hello'])
    
    def f4(conn):
        conn.send([24, 'root', 'hello'])
    
    
    '''以下是Manager实现数据共享代码'''
    from multiprocessing import Process,Manager
    
    def f(d,l,n):
        d[n] = n
        d['name'] = 'root'
        l.append(n)
        print(l)
    
    #以下是主程序
    if __name__ == '__main__':
        print(
            '''33[42;1m 利用Queue实现进程间的通信 33[0m '''
        )
        '''
        这里的进程间通信是在同一父进程下不同子进程间的通信,因为队列q是在父进程实例化并
        传递到每个子进程中。(当然可以在一个第三方文件里面创建一个队列,其余文件来调用是可以的)
        '''
        q = Queue()    #生成一个第三方队列,用来存放子进程put的值
                    # 这里的Queue是在queue.Queue的基础上,封装了能多进程访问
        pq1 = Process(target=f1,args=(q,))
        pq2 = Process(target=f2,args=(q,))  #要把队列对象q当参数传递给各个子进程
        pq1.start()
        pq2.start()
        print('from parent 1:',q.get())
        print('from parent 2:',q.get())
    
        print(
            '''
    33[43;1m 利用Pipe实现进程间的通信 33[0m '''
        )
        parent_conn,child_conn = Pipe()
        pp1 = Process(target=f3,args=(child_conn,))
        pp2 = Process(target=f4,args=(child_conn,))
        pp1.start()
        pp2.start()
        print('parent data from pp1:', parent_conn.recv())
        print('parent data from pp2:', parent_conn.recv())
        parent_conn.close() #这个在子进程或父进程关闭都可以
    
        print(
            '''
    33[43;1m 利用Manager实现进程间的数据共享 33[0m '''
        )
        '''
        这里Manager()支持:list、dict、Namespace、Lock、Rlock、Semaphore、
         BoundedSemaphore、Condition、Event、Barrier、Queue、Value、Array
         其中常用的就list、dict。
         注意:其中的list、dict都要通过Manager() 实现
        '''
        #manager = Manager()  #注意这样后面需要关闭,用with...as...语句更方便
        with Manager() as manager:
            d = manager.dict()
            # l = manager.list()  #这样生成空列表
            l = manager.list(range(5))
    
            process_list = []
            for i in range(10):
                p = Process(target=f,args=(d,l,i))
                p.start()
                process_list.append(p)
    
            for res in process_list:
                res.join()
    
            print("执行结束l:",l)
            print("执行结束d:",d)

    4、POOL

      在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    函数解释:

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
    • close()    关闭pool,使其不在接受新的任务。
    • terminate()    结束工作进程,不在处理未完成的任务。
    • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
     1 #!/usr/bin/env python
     2 # -*- coding: utf-8 -*-
     3 
     4 '''
     5 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有
     6 可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
     7 
     8 进程池方法:
     9     apply: 同步
    10     apply_async: 异步
    11 '''
    12 from multiprocessing import Process,Pool,freeze_support
    13 import time
    14 
    15 def Foo(i):
    16     time.sleep(1)
    17     return i,i+100
    18 
    19 def Bar(arg):
    20     # print('arg:',arg)
    21     data.append(arg)
    22     print('--> exec done [%s]: %s' % (arg[0],arg[1]))
    23 
    24 if __name__ == '__main__':
    25     freeze_support()   #要导入,在win下必须有它,不然会报错。linux下不用
    26 
    27     pool = Pool(3)  #生成一个有5个进程序列的进程池实例
    28     data = []
    29     for i in range(10):
    30         # func为进程每次执行的函数,args是参数,callback是执行完func后执行的函数,
    31         # 并且callback会自动接收func的返回值作为函数参数。Foo是子进程执行的函数,callback是在
    32         # 父进程执行的函数,所以这里说明是子进程把执行结果返回给父进程里面的函数
    33         pool.apply_async(func=Foo,args=(i,),callback=Bar)
    34         #pool.apply(func=Foo,args=(i,))   #进程同步,没啥用
    35 
    36     print("end")
    37     pool.close()
    38     pool.join()  #这里表示进程池中进程执行完毕后再关闭,若注释,那么程序直接关闭
    39         #注意:这里是先close,再join
    40     print('全局变量data:',data)

      执行结果

    end
    --> exec done [0]: 100
    --> exec done [1]: 101
    --> exec done [2]: 102
    --> exec done [3]: 103
    --> exec done [4]: 104
    --> exec done [5]: 105
    --> exec done [7]: 107
    --> exec done [6]: 106
    --> exec done [8]: 108
    --> exec done [9]: 109
    全局变量data: [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104), (5, 105), (7, 107), (6, 106), (8, 108), (9, 109)]

    5、Semaphore

      Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

     1 #!/usr/bin/env python
     2 # -*- coding:utf8 -*-
     3 
     4 import multiprocessing
     5 import time
     6 
     7 
     8 def run(s, i):
     9     s.acquire()
    10     print(multiprocessing.current_process().name + "acquire");
    11     time.sleep(i)
    12     print(multiprocessing.current_process().name + "release
    ");
    13     s.release()
    14 
    15 
    16 if __name__ == "__main__":
    17     s = multiprocessing.Semaphore(2)  #最多允许3个进程同时运行
    18 
    19     process_list = []
    20     for i in range(3):
    21         p = multiprocessing.Process(target=run, args=(s, i * 2))
    22         p.start()
    23         process_list.append(p)
    24 
    25     for process in process_list:
    26         process.join()
    27 
    28     print('33[31;1m 全部结束 33[0m')

      执行结果

    Process-1acquire
    Process-1release
    
    Process-3acquire
    Process-2acquire
    Process-2release
    
    Process-3release
    
     全部结束 

    6、Event

      用Event实现进程同步

     1 #!/usr/bin/env python
     2 # -*- coding:utf8 -*-
     3 
     4 import multiprocessing
     5 import time
     6 
     7 
     8 def wait_for_event(e):
     9     print("wait_for_event: starting")
    10     e.wait()
    11     print("wairt_for_event: e.is_set()->" + str(e.is_set()))
    12 
    13 
    14 def wait_for_event_timeout(e, t):
    15     print("wait_for_event_timeout:starting")
    16     e.wait(t)
    17     print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
    18 
    19 
    20 if __name__ == "__main__":
    21     e = multiprocessing.Event()
    22     w1 = multiprocessing.Process(name="block",
    23                                  target=wait_for_event,
    24                                  args=(e,))
    25 
    26     w2 = multiprocessing.Process(name="non-block",
    27                                  target=wait_for_event_timeout,
    28                                  args=(e, 2))
    29     w1.start()
    30     w2.start()
    31 
    32     time.sleep(3)
    33 
    34     e.set()
    35     print("main: event is set")

      执行结果

    wait_for_event: starting
    wait_for_event_timeout:starting
    wait_for_event_timeout:e.is_set->False
    wairt_for_event: e.is_set()->True
    main: event is set
  • 相关阅读:
    Android中Context具体解释 ---- 你所不知道的Context
    JDK6、Oracle11g、Weblogic10 For Linux64Bit安装部署说明
    matplotlib 可视化 —— 定制 matplotlib
    matplotlib 可视化 —— 移动坐标轴(中心位置)
    matplotlib 可视化 —— 移动坐标轴(中心位置)
    matplotlib 可视化 —— 定制画布风格 Customizing plots with style sheets(plt.style)
    matplotlib 可视化 —— 定制画布风格 Customizing plots with style sheets(plt.style)
    指数函数的研究
    指数函数的研究
    指数分布的研究
  • 原文地址:https://www.cnblogs.com/xtsec/p/6973382.html
Copyright © 2011-2022 走看看