zoukankan      html  css  js  c++  java
  • python摸爬滚打之day032 管道 数据共享 进程池

    1、进程池

      当有成千上万个任务需要被执行的时候,有了进程池我们就不必去创建大量的进程. 首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间, 第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行频繁切换并且记录每个进程的执行节点, 这样反而会影响程序的效率

      创建一个有固定数量的进程池, 执行任务的时候就拿池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务, 可以减少创建进程的开支. 这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果. 

      使用进程池来实现并发效果, 减少创建进程的开支, 提高效率.

    map()方法: 异步调用进程, map自带join()的功能.

     1 import  time
     2 from multiprocessing import Pool,Process
     3 
     4 
     5 def func1(i):
     6     numb = 0
     7     for j in range(5):
     8         numb += i
     9 
    10 
    11 if __name__ == '__main__':
    12 
    13     p_lst = []
    14     s_time = time.time()
    15     for i in range(500):
    16         p = Process(target=func1,args=(i,))
    17         p.start()
    18         p_lst.append(p)
    19     [pp.join() for pp in p_lst]
    20     e_time = time.time()
    21     dis_time = e_time - s_time
    22     print("非进程池",dis_time)    # 9.458896160125732 处理时间
    23 
    24 # ---------------------------------------------------------------------------------
    25 
    26     pool = Pool(4)
    27     ps_time = time.time()
    28     pool.map(func1, range(100))      # 把可迭代对象的每一个元素都作为参数扔给func1
    29     pe_time = time.time()
    30     dis_time = pe_time - ps_time
    31     print("进程池",dis_time)       # 0.07204794883728027 处理时间
    进程池map方法

    apply()方法: 提供一个同步串行的方法.

     1 import  time
     2 from multiprocessing import Pool,Process
     3 
     4 
     5 def func1(i):
     6     numb = 0
     7     for j in range(5):
     8         numb += i
     9     time.sleep(0.5)
    10     return numb
    11 
    12 
    13 
    14 
    15 if __name__ == '__main__':
    16 
    17     pool = Pool(4)
    18 
    19     for i in range(10):
    20         print(i)
    21         ret = pool.apply(func1, args=(i,))     # apply提供的是一个同步串行的执行方法. 
    22                                                # 进程1 执行完任务后, ret 获取到数据, 第二个进程才开始执行任务.
    23         
    24         print(ret)
    进程池apply同步调用

    apply_async()方法: 进程池异步调用方法.

      注意: 在使用进程池异步调用时, 主进程结束时, 所有的子进程也跟着一起结束了(后台全部关闭), 所以主程序必须得先 join() , 等待子进程结束.

      使用apply_async()异步调用时, 主程序必须使用 join() 方法, 等待进程池内的任务都处理完, 才能用get()获取结果.

      使用map()异步调用时, 不用写 join() 方法, map()会自动 join().

     1 import  time
     2 from multiprocessing import Pool,Process
     3 
     4 
     5 def func1(i):
     6     numb = 0
     7     for j in range(5):
     8         numb += i
     9     time.sleep(1)
    10     return numb
    11 
    12 
    13 
    14 if __name__ == '__main__':
    15 
    16     pool = Pool(4)
    17     ret_lst = []
    18     for i in range(10):         # 相当于发布10个任务, 4个进程都过来拿任务
    19         print(i)
    20         res = pool.apply_async(func1,args=(i,))         # 各进程都是异步状态
    21         ret_lst.append(res)         # 这一步, 是把所有的res的执行对象都先放进列表里(包括那些没有结果的对象,
    22                                     # 即使后面6个任务都没有执行,但是都是先把执行对象放进列表里)
    23 
    24     pool.close()        # 不是关闭进程池, 而是结束进程池接受任务, 确保没有任务再传过来
    25     pool.join()         # 感知进程池中的任务已经结束, 只有进程池结束接收任务, 才能感知进程池中的任务结束, 所以必须加 close().
    26 
    27     for res in ret_lst:
    28         print(res.get())      # 前4个有结果, 用get()方法获取到结果后, 一直阻塞在后六个处,直到结果传进来执行对象中
    apply_async异步调用

     2、回调函数

      回调函数在主进程中被执行的, 子进程执行完相应的代码后, 返回主进程去执行回调函数, 它帮我们省略了主进程自身调用函数的这一步骤.

     1 from multiprocessing import Process,Pool
     2 
     3 
     4 def func1(n):
     5     return n * n
     6 
     7 def call_back_func(ret):        # 这里的ret 传的是func1的结果.
     8     with open("回调内容","w") as f:
     9         f.write(str(ret))
    10 
    11 
    12 
    13 
    14 if __name__ == '__main__':
    15 
    16     pool = Pool(4)
    17     ret = pool.apply_async(func1,args=(25,),callback=call_back_func)
    18     # callback后面跟回调函数, 即把func1的结果传进callback回调函数中去执行,因为调用者拿不到
    19     # 回调函数的返回值, 所以只能将返回值写进文件或者数据库里.
    20 
    21     print(ret.get())
    进程池回调函数

    3、进程间的通信  

      多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁

      数据共享----Manager模块

        给Manager对象里面传入你要共享的数据, 然后操作数据时一样要上锁、解锁.

     1 from multiprocessing import Process,Lock,Manager
     2 
     3 def func1(dic,loc):
     4     with loc:                   # with loc 做了两件事: loc.acquire() 和 loc.release(), 自动上锁和解锁
     5         dic["numb"] -= 1
     6 
     7 
     8 
     9 if __name__ == '__main__':
    10     m = Manager()
    11     loc = Lock()
    12     dic = m.dict({"numb": 100})
    13     p_lst = []
    14     for i in range(100):
    15         p = Process(target=func1, args=(dic,loc))
    16         p.start()
    17         p_lst.append(p)
    18     [pp.join() for pp in p_lst]
    19     print(">>>>>>",dic["numb"])
    Manager数据共享

      

  • 相关阅读:
    C++复制构造函数,类型转换构造函数,析构函数,引用,指针常量和常量指针
    POJ1611(The Suspects)--简单并查集
    最小生成树-Kruskal算法
    POJ1861(Network)-Kruskal
    POJ1979(Red and Black)--FloodFill
    [转]全网最!详!细!tarjan算法讲解
    POJ1573(Robot Motion)--简单模拟+简单dfs
    最小生成树-Prim算法
    POJ3368(Frequent values)--线段树
    POJ3255(Roadblocks)--次短路径
  • 原文地址:https://www.cnblogs.com/bk9527/p/10039895.html
Copyright © 2011-2022 走看看