zoukankan      html  css  js  c++  java
  • 管道,数据共享,进程池

    内容梗概:
    1.管道
    2.数据共享
    3.进程池
    4.回调函数


    1.管道
    进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现
    实例:
    import time
    from multiprocessing import Process,Pipe
    def func(conn1,conn2):

    conn1.close()
    conn2.close()

    # msg = conn2.recv()
    msg = conn2.recv()
    print(">>",msg)

    if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p1 = Process(target=func,args=(conn1,conn2,))
    p1.start()
    conn1.close()
    conn2.close()
    conn1.send("嘿嘿")
    conn1.send("嘿嘿")
    conn1.close()
    p1.join()


    EOFError:接收端两端都关闭的的时候
    OSError:发送端关闭后,仍用该发送端发送


    2.数据共享
    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    import time
    from multiprocessing import Process,Manager,Lock
    def func(dic,lock):
    with lock: #因为数据共享,必须加锁,不认有可能会争抢数据,导致出错
    dic["num"] -= 1
    if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({"num":100})
    lis = []
    for i in range(100):
    p = Process(target=func,args=(dic,lock))
    p.start()
    lis.append(p)
    for el in lis:
    el.join()
    print(dic["num"])




    3.进程池
    进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
    如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行

    进程池实例:(创建多个进程和进程池运算的区别)
    import time
    from multiprocessing import Process,Manager,Lock,Pool
    def func(i):
    num = 0
    for j in range(5):
    num += i
    if __name__ == '__main__':
    pool = Pool(4)

    #创建多个进程运算
    lis = []
    start_time = time.time()
    for i in range(250):
    p = Process(target=func,args=(i,))
    p.start()
    lis.append(p)
    for el in lis:
    el.join()
    end_time = time.time()
    dif_time = abs(start_time - end_time)
    print(dif_time)


    #进程池运算
    s_time = time.time()
    pool.map(func,range(250)) #此处map所使用的方法,与内置函数用法一致
    e_time = time.time()
    d_time = abs(s_time - e_time)
    print(d_time)
    结果:13.073268413543701
    0.0017964839935302734
    运算速度差距较大,因为创建进程是很浪费资源的
    注意:1.,map是异步执行的,并且自带close和join
      2.一般约定俗成的是进程池中的进程数量为CPU的数量,工作中要看具体情况来考量。


    进程池中常用方法
    1.同步方法
    import time
    from multiprocessing import Process,Pool

    def func(i):
    num = 0
    for j in range(5):
    num += i
    time.sleep(0.5)
    return num

    if __name__ == '__main__':
    pool = Pool(4)
    lis = []
    for i in range(10):
    res = pool.apply(func,args=(i,)) #
    print(res)

    2.异步方法
    import time
    from multiprocessing import Process,Pool

    def func(i):
    num = 0
    for j in range(5):
    num += i
    time.sleep(0.5)
    print('>>>>>', num)
    # return num
    if __name__ == '__main__':
    lis = []
    pool = Pool(4)
    for i in range(10):
    res = pool.apply_async(func,args=(i,)) ## 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了
    # lis.append(res)
    pool.close() # 不是关闭进程池,只是锁定
    pool.join()
    for el in lis:
    print(el.get())
    需要注意的是,进程池中的三个进程不会同时开启或者同时结束
    而是执行完一个就释放一个进程,这个进程就去接收新的任务。

    4.回调函数
    回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。
    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果



    import time,os
    from multiprocessing import Pool,Process
    def func(n):
    print("子进程pid",os.getpid())
    return n*2,"约吗?"
    def call_back_func(x):
    print("call_back pid",os.getpid())
    print(x) #回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。
    if __name__ == '__main__':
    pool = Pool(4)
    pool.apply_async(func,args=(2,),callback=call_back_func) #回调函数只能接受值,不能赋值
    print('主进程pid', os.getpid())
    pool.close()
    pool.join()


    from threading import Thread
    def func(n):
    print(">>>")
    print(n)
    if __name__ == '__main__':
    t = Thread(target=func,args=(5,))
    t.start()
    t.join()
    print("呵呵")

    import time
    from threading import Thread
    class mythread(Thread):
    def __init__(self,num):
    super().__init__()
    self.num = num
    def run(self):
    print(">>>heiehi")
    if __name__ == '__main__':
    t = mythread(58)
    t.start()
    time.sleep(0.5)
    print("<<>>")


    import threading,time
    from threading import Thread,current_thread
    def func(n):
    time.sleep(2)
    print('我是子线程,名字是', current_thread().getName())
    print('我是子线程,id是', current_thread().ident)


    if __name__ == '__main__':
    t = Thread(target=func,args=(5,))
    t.start()
    print('我是主线程,id是', current_thread().ident)
    print(threading.enumerate())
    print(threading.active_count())
  • 相关阅读:
    JavaScript操作符instanceof揭秘
    Linux打开txt文件乱码的解决方法
    Working copy locked run svn cleanup not work
    poj 2299 UltraQuickSort 归并排序求解逆序对
    poj 2312 Battle City 优先队列+bfs 或 记忆化广搜
    poj2352 stars 树状数组
    poj 2286 The Rotation Game 迭代加深
    hdu 1800 Flying to the Mars
    poj 3038 Children of the Candy Corn bfs dfs
    hdu 1983 Kaitou Kid The Phantom Thief (2) DFS + BFS
  • 原文地址:https://www.cnblogs.com/Mixtea/p/10045583.html
Copyright © 2011-2022 走看看