zoukankan      html  css  js  c++  java
  • 管道,数据共享,进程池

    内容梗概:
    1.管道
    2.数据共享
    3.进程池
    4.回调函数


    1.管道
    进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现
    实例:
    import time
    from multiprocessing import Process,Pipe
    def func(conn1,conn2):

    conn1.close()
    conn2.close()

    # msg = conn2.recv()
    msg = conn2.recv()
    print(">>",msg)

    if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p1 = Process(target=func,args=(conn1,conn2,))
    p1.start()
    conn1.close()
    conn2.close()
    conn1.send("嘿嘿")
    conn1.send("嘿嘿")
    conn1.close()
    p1.join()


    EOFError:接收端两端都关闭的的时候
    OSError:发送端关闭后,仍用该发送端发送


    2.数据共享
    进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
    虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
    import time
    from multiprocessing import Process,Manager,Lock
    def func(dic,lock):
    with lock: #因为数据共享,必须加锁,不认有可能会争抢数据,导致出错
    dic["num"] -= 1
    if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({"num":100})
    lis = []
    for i in range(100):
    p = Process(target=func,args=(dic,lock))
    p.start()
    lis.append(p)
    for el in lis:
    el.join()
    print(dic["num"])




    3.进程池
    进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
    如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行

    进程池实例:(创建多个进程和进程池运算的区别)
    import time
    from multiprocessing import Process,Manager,Lock,Pool
    def func(i):
    num = 0
    for j in range(5):
    num += i
    if __name__ == '__main__':
    pool = Pool(4)

    #创建多个进程运算
    lis = []
    start_time = time.time()
    for i in range(250):
    p = Process(target=func,args=(i,))
    p.start()
    lis.append(p)
    for el in lis:
    el.join()
    end_time = time.time()
    dif_time = abs(start_time - end_time)
    print(dif_time)


    #进程池运算
    s_time = time.time()
    pool.map(func,range(250)) #此处map所使用的方法,与内置函数用法一致
    e_time = time.time()
    d_time = abs(s_time - e_time)
    print(d_time)
    结果:13.073268413543701
    0.0017964839935302734
    运算速度差距较大,因为创建进程是很浪费资源的
    注意:1.,map是异步执行的,并且自带close和join
      2.一般约定俗成的是进程池中的进程数量为CPU的数量,工作中要看具体情况来考量。


    进程池中常用方法
    1.同步方法
    import time
    from multiprocessing import Process,Pool

    def func(i):
    num = 0
    for j in range(5):
    num += i
    time.sleep(0.5)
    return num

    if __name__ == '__main__':
    pool = Pool(4)
    lis = []
    for i in range(10):
    res = pool.apply(func,args=(i,)) #
    print(res)

    2.异步方法
    import time
    from multiprocessing import Process,Pool

    def func(i):
    num = 0
    for j in range(5):
    num += i
    time.sleep(0.5)
    print('>>>>>', num)
    # return num
    if __name__ == '__main__':
    lis = []
    pool = Pool(4)
    for i in range(10):
    res = pool.apply_async(func,args=(i,)) ## 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了
    # lis.append(res)
    pool.close() # 不是关闭进程池,只是锁定
    pool.join()
    for el in lis:
    print(el.get())
    需要注意的是,进程池中的三个进程不会同时开启或者同时结束
    而是执行完一个就释放一个进程,这个进程就去接收新的任务。

    4.回调函数
    回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。
    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果



    import time,os
    from multiprocessing import Pool,Process
    def func(n):
    print("子进程pid",os.getpid())
    return n*2,"约吗?"
    def call_back_func(x):
    print("call_back pid",os.getpid())
    print(x) #回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。
    if __name__ == '__main__':
    pool = Pool(4)
    pool.apply_async(func,args=(2,),callback=call_back_func) #回调函数只能接受值,不能赋值
    print('主进程pid', os.getpid())
    pool.close()
    pool.join()


    from threading import Thread
    def func(n):
    print(">>>")
    print(n)
    if __name__ == '__main__':
    t = Thread(target=func,args=(5,))
    t.start()
    t.join()
    print("呵呵")

    import time
    from threading import Thread
    class mythread(Thread):
    def __init__(self,num):
    super().__init__()
    self.num = num
    def run(self):
    print(">>>heiehi")
    if __name__ == '__main__':
    t = mythread(58)
    t.start()
    time.sleep(0.5)
    print("<<>>")


    import threading,time
    from threading import Thread,current_thread
    def func(n):
    time.sleep(2)
    print('我是子线程,名字是', current_thread().getName())
    print('我是子线程,id是', current_thread().ident)


    if __name__ == '__main__':
    t = Thread(target=func,args=(5,))
    t.start()
    print('我是主线程,id是', current_thread().ident)
    print(threading.enumerate())
    print(threading.active_count())
  • 相关阅读:
    spring事务管理器设计思想(一)
    ThreaLocal内存泄露的问题
    denyhost防止SSH暴力破解
    qt下调用win32api 修改分辨率
    Windows下pip安装包报错:Microsoft Visual C++ 9.0 is required Unable to find vcvarsall.bat
    Centos 6.5 下安装socket5代理
    Mac 下安装mitmproxy
    Mac OS 下安装wget
    判断客户端是否使用代理服务器及其匿名级别
    Mac OS 下安装rar unrar命令
  • 原文地址:https://www.cnblogs.com/Mixtea/p/10045583.html
Copyright © 2011-2022 走看看