zoukankan      html  css  js  c++  java
  • day 32 管道,信号量,进程池,线程的创建

    1.管道(了解)
    Pipe(): 在进程之间建立一条通道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道.
    from multiprocessing import Process,Pipe
    conn1,conn2 = Pipe()  结构
    主要方法:
    conn1.recv():接受conn2.send(obj)发送的对象.如果没有消息可接受, recv方法会一直阻塞.如果连接的另一端已经关闭,那么recv方法会跑输EOEError.
    conn1.send(obj):通过连接发送对象.obj是与序列化兼容的任意对象
    from multiprocessing import Process,Pipe
     
     
    def f1(conn):
     
        from_zhujincheng = conn.recv()
     
        print('我是子进程')
        print('来自主进程的消息:',from_zhujincheng)
     
     
    if __name__ == '__main__':
        conn1,conn2 = Pipe()  #创建一个管道对象,全双工,返回管道的两端,但是一端发送的消息,只能另外一端接收,自己这一端是不能接收的
     
        #可以将一端或者两端发送给其他的进程,那么多个进程之间就可以通过这一个管道进行通信了
        p1 = Process(target=f1,args=(conn2,))
        p1.start()
     
        conn1.send('小宝贝,你在哪')
     
        print('我是主进程')
    2.信号量  Semaphore
    互斥锁同时只允许一个线程更改熟路,而信号量时同事允许一定数量的线程更改数据
    s=Semaphore() 内部维护了一个计数器,acquire -1,release +1,为0的时候,其他的进程都要在acquire之前等待
    s.acquire()
    需要锁住的代码
    s.release()
    import time
    import random
    from multiprocessing import Process,Semaphore
     
    def f1(i,s):
        s.acquire()
        print('%s男嘉宾到了' %i)
        time.sleep(random.randint(1,3))
        s.release()
     
    if __name__ == '__main__':
        s =Semaphore(4)
     
        for i in range(10):
            p = Process(target=f1,args=(i,s))
     
            p.start()
    3.事件 Event
    e = Event()  初识状态是false
    E.wait() 当事件对象e的状态为false的时候,在wait的地方会阻塞程序,当对象状态为true的时候,直接在这个wait地方继续往下执行
    E.set()  将事件对象的状态更改为true
    E.is_set() 查看状态
    E.clear() 将事件对象的状态更改为false
    from multiprocessing import Process,Event
     
     
    e = Event() # 创建事件对象,这个对象初识状态为False
    print('e的状态是:',e.is_set())
     
    print('进程运行到这里')
    e.set() #将e的状态更改为True
    print('e的状态是:',e.is_set())
     
    e.clear() #将e的状态更改为False
    e.wait()  #e这个事件对象如果值为False,就在我加wait的地方等待
    print('进程过了wait')
     基于事件的进程通信:
    import time
    from multiprocessing import  Process,Event
     
    def f1(e):
        time.sleep(2)
        n = 100
        print('子进程计算结果为',n)
        e.set()
     
    if __name__ == '__main__':
        e = Event()
        p = Process(target=f1,args=(e,))
        p.start()
     
        print('主进程等待....')
        e.wait()
        print('结果已经写入文件,可以拿到值')
    4.进程池(重点) Pool 池子
    对比:多进程和多进程池的效率对比:进程的创建和销毁时很消耗的,影响代码执行效率
    pool.map() 参数数据必须是可迭代的,异步提交任务,自带close和join功能
    import time
    from multiprocessing import Process,Pool
     
     
    # def f1(n):
    #     time.sleep(1)
    #     print(n)
     
    #对比多进程和进程池的效率
    def f1(n):
        for i in range(5):
            n = n + i
    if __name__ == '__main__':
        #统计进程池执行100个任务的时间
        s_time = time.time()
        pool = Pool(4)  #里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
        # pool.map(f1,[1,2])  #参数数据必须是可迭代的
        pool.map(f1,range(100))  #参数数据必须是可迭代的,异步提交任务,自带join功能
        e_time = time.time()
        dif_time = e_time - s_time
     
        #统计100个进程,来执行100个任务的执行时间
        p_s_t = time.time() #多进程起始时间
        p_list = []
        for i in range(100):
            p = Process(target=f1,args=(i,))
            p.start()
            p_list.append(p)
            # p.join()
        [pp.join() for pp in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
        print('进程池的时间:',dif_time)
        print('多进程的执行时间:',p_dif_t)
        # 结果:
        # 进程池的时间: 0.40102291107177734
        # 多进程的执行时间: 9.247529029846191
    进程池的同步和异步的方法:
    pool.apply()  同步方法,进程池的同步方法,将任务变成串行,必须等任务执行结束才能给进程池提交下一个任务,可以直接拿到返回结果res
    pool.apply_async()  异步方法,,可以直接拿到结果对象,从结果对象里面拿到结果,要用get方法,get方法会阻塞程序,没有拿到结果会一直等待.
    pool.close()  锁住进程池,不在让其他的持续往程序里面人新任务,确保没有新的任务交给进程池里面的进程
    join: 等待着进程池将自己里面的任务都执行完
    进程池的同步方法:
    import time
    from multiprocessing import Process,Pool
     
     
    def f1(n):
        time.sleep(1)
        # print(n)
        return n*n
     
    if __name__ == '__main__':
     
        pool = Pool(4)
     
        for i in range(10):
            print('xxxxxx')
            res = pool.apply(f1,args=(i,))
            print(res)
    进程池的异步方法:
     
    import time
    from multiprocessing import Process,Pool
     
     
    def f1(n):
        time.sleep(0.5)
        # print(n)
        return n*n
     
    if __name__ == '__main__':
     
        pool = Pool(4)
     
        res_list = []
        for i in range(10):
            print('xxxx')
            #异步给进程池提交任务
            res = pool.apply_async(f1,args=(i,))
            res_list.append(res)
     
        # print('等待所有任务执行完')
        # pool.close()  #锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了
        # pool.join()
     
        #打印结果,如果异步提交之后的结果对象
        for i in res_list:
            print(i.get())
    进程池的同步和异步的综合:
    import time
    from multiprocessing import Process,Pool
     
    def f1(n):
        time.sleep(0.5)
        # print(n)
        return  n*n
     
    if __name__ == '__main__':
        pool =Pool(4)
        # pool.apply(f1,args=(2,))  #同步方法
        res_list = []
        for i in range(10):
            # print('任务%s' %i)
            #进程池的同步方法,将任务变成串行
            # res = pool.apply(f1,args=(i,))
            # print(res)
            #进程池的异步方法
            res =pool.apply_async(f1,args=(i,))
            print(res)
            # as_result =res.get()  #join的效果
            # print(as_result)
            res_list.append(res)
     
        pool.close() # 锁住进程池,不在让其他的程序网里面仍新的任务了,确保没有新的任务交给进程池里面的进程
        pool.join()
     
        for r in res_list:
            print(r.get())
     
        time.sleep(2)
        #主进程运行结束,进程池里面的任务全部停止,不会等待进程池里面的任务
        print('主进程直接结束')
        p = Process(target=f1,)
    5.进程池的回调函数
     Apply_async(f1,args=(i,),callback=function)  #将前面f1这个任务的返回结果作为参数传给callback指定的那个function函数
    import os
    from multiprocessing import Pool,Process
     
    def f1(n):
        print('进程池里面的进程id',os.getpid())
        print('>>>>',n)
        return n*n
     
    def call_back_func(asdf):
        print('>>>>>>>>>>>>>',os.getpid())
        print('回调函数中的结果:',asdf)
        # print('回调函数中的结果:',s.get())
     
    if __name__ == '__main__':
        pool = Pool(4)
        res = pool.apply_async(f1,args=(5,),callback=call_back_func)
        pool.close()
        pool.join()
        # print(res.get())
        print('主进程的进程id',os.getpid())
     
    1.管道(了解)
    Pipe(): 在进程之间建立一条通道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道.
    from multiprocessing import Process,Pipe
    conn1,conn2 = Pipe()  结构
    主要方法:
    conn1.recv():接受conn2.send(obj)发送的对象.如果没有消息可接受, recv方法会一直阻塞.如果连接的另一端已经关闭,那么recv方法会跑输EOEError.
    conn1.send(obj):通过连接发送对象.obj是与序列化兼容的任意对象
    from multiprocessing import Process,Pipe
     
     
    def f1(conn):
     
        from_zhujincheng = conn.recv()
     
        print('我是子进程')
        print('来自主进程的消息:',from_zhujincheng)
     
     
    if __name__ == '__main__':
        conn1,conn2 = Pipe()  #创建一个管道对象,全双工,返回管道的两端,但是一端发送的消息,只能另外一端接收,自己这一端是不能接收的
     
        #可以将一端或者两端发送给其他的进程,那么多个进程之间就可以通过这一个管道进行通信了
        p1 = Process(target=f1,args=(conn2,))
        p1.start()
     
        conn1.send('小宝贝,你在哪')
     
        print('我是主进程')
    2.信号量  Semaphore
    互斥锁同时只允许一个线程更改熟路,而信号量时同事允许一定数量的线程更改数据
    s=Semaphore() 内部维护了一个计数器,acquire -1,release +1,为0的时候,其他的进程都要在acquire之前等待
    s.acquire()
    需要锁住的代码
    s.release()
    import time
    import random
    from multiprocessing import Process,Semaphore
     
    def f1(i,s):
        s.acquire()
        print('%s男嘉宾到了' %i)
        time.sleep(random.randint(1,3))
        s.release()
     
    if __name__ == '__main__':
        s =Semaphore(4)
     
        for i in range(10):
            p = Process(target=f1,args=(i,s))
     
            p.start()
    3.事件 Event
    e = Event()  初识状态是false
    E.wait() 当事件对象e的状态为false的时候,在wait的地方会阻塞程序,当对象状态为true的时候,直接在这个wait地方继续往下执行
    E.set()  将事件对象的状态更改为true
    E.is_set() 查看状态
    E.clear() 将事件对象的状态更改为false
    from multiprocessing import Process,Event
     
     
    e = Event() # 创建事件对象,这个对象初识状态为False
    print('e的状态是:',e.is_set())
     
    print('进程运行到这里')
    e.set() #将e的状态更改为True
    print('e的状态是:',e.is_set())
     
    e.clear() #将e的状态更改为False
    e.wait()  #e这个事件对象如果值为False,就在我加wait的地方等待
    print('进程过了wait')
     基于事件的进程通信:
    import time
    from multiprocessing import  Process,Event
     
    def f1(e):
        time.sleep(2)
        n = 100
        print('子进程计算结果为',n)
        e.set()
     
    if __name__ == '__main__':
        e = Event()
        p = Process(target=f1,args=(e,))
        p.start()
     
        print('主进程等待....')
        e.wait()
        print('结果已经写入文件,可以拿到值')
    4.进程池(重点) Pool 池子
    对比:多进程和多进程池的效率对比:进程的创建和销毁时很消耗的,影响代码执行效率
    pool.map() 参数数据必须是可迭代的,异步提交任务,自带close和join功能
    import time
    from multiprocessing import Process,Pool
     
     
    # def f1(n):
    #     time.sleep(1)
    #     print(n)
     
    #对比多进程和进程池的效率
    def f1(n):
        for i in range(5):
            n = n + i
    if __name__ == '__main__':
        #统计进程池执行100个任务的时间
        s_time = time.time()
        pool = Pool(4)  #里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
        # pool.map(f1,[1,2])  #参数数据必须是可迭代的
        pool.map(f1,range(100))  #参数数据必须是可迭代的,异步提交任务,自带join功能
        e_time = time.time()
        dif_time = e_time - s_time
     
        #统计100个进程,来执行100个任务的执行时间
        p_s_t = time.time() #多进程起始时间
        p_list = []
        for i in range(100):
            p = Process(target=f1,args=(i,))
            p.start()
            p_list.append(p)
            # p.join()
        [pp.join() for pp in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
        print('进程池的时间:',dif_time)
        print('多进程的执行时间:',p_dif_t)
        # 结果:
        # 进程池的时间: 0.40102291107177734
        # 多进程的执行时间: 9.247529029846191
    进程池的同步和异步的方法:
    pool.apply()  同步方法,进程池的同步方法,将任务变成串行,必须等任务执行结束才能给进程池提交下一个任务,可以直接拿到返回结果res
    pool.apply_async()  异步方法,,可以直接拿到结果对象,从结果对象里面拿到结果,要用get方法,get方法会阻塞程序,没有拿到结果会一直等待.
    pool.close()  锁住进程池,不在让其他的持续往程序里面人新任务,确保没有新的任务交给进程池里面的进程
    join: 等待着进程池将自己里面的任务都执行完
    进程池的同步方法:
    import time
    from multiprocessing import Process,Pool
     
     
    def f1(n):
        time.sleep(1)
        # print(n)
        return n*n
     
    if __name__ == '__main__':
     
        pool = Pool(4)
     
        for i in range(10):
            print('xxxxxx')
            res = pool.apply(f1,args=(i,))
            print(res)
    进程池的异步方法:
     
    import time
    from multiprocessing import Process,Pool
     
     
    def f1(n):
        time.sleep(0.5)
        # print(n)
        return n*n
     
    if __name__ == '__main__':
     
        pool = Pool(4)
     
        res_list = []
        for i in range(10):
            print('xxxx')
            #异步给进程池提交任务
            res = pool.apply_async(f1,args=(i,))
            res_list.append(res)
     
        # print('等待所有任务执行完')
        # pool.close()  #锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了
        # pool.join()
     
        #打印结果,如果异步提交之后的结果对象
        for i in res_list:
            print(i.get())
    进程池的同步和异步的综合:
    import time
    from multiprocessing import Process,Pool
     
    def f1(n):
        time.sleep(0.5)
        # print(n)
        return  n*n
     
    if __name__ == '__main__':
        pool =Pool(4)
        # pool.apply(f1,args=(2,))  #同步方法
        res_list = []
        for i in range(10):
            # print('任务%s' %i)
            #进程池的同步方法,将任务变成串行
            # res = pool.apply(f1,args=(i,))
            # print(res)
            #进程池的异步方法
            res =pool.apply_async(f1,args=(i,))
            print(res)
            # as_result =res.get()  #join的效果
            # print(as_result)
            res_list.append(res)
     
        pool.close() # 锁住进程池,不在让其他的程序网里面仍新的任务了,确保没有新的任务交给进程池里面的进程
        pool.join()
     
        for r in res_list:
            print(r.get())
     
        time.sleep(2)
        #主进程运行结束,进程池里面的任务全部停止,不会等待进程池里面的任务
        print('主进程直接结束')
        p = Process(target=f1,)
    5.进程池的回调函数
     Apply_async(f1,args=(i,),callback=function)  #将前面f1这个任务的返回结果作为参数传给callback指定的那个function函数
    import os
    from multiprocessing import Pool,Process
     
    def f1(n):
        print('进程池里面的进程id',os.getpid())
        print('>>>>',n)
        return n*n
     
    def call_back_func(asdf):
        print('>>>>>>>>>>>>>',os.getpid())
        print('回调函数中的结果:',asdf)
        # print('回调函数中的结果:',s.get())
     
    if __name__ == '__main__':
        pool = Pool(4)
        res = pool.apply_async(f1,args=(5,),callback=call_back_func)
        pool.close()
        pool.join()
        # print(res.get())
        print('主进程的进程id',os.getpid())
     
  • 相关阅读:
    css03层次选择器
    css02基本选择器
    Distance Between Points
    CloseHandle(IntPtr handle)抛异常
    关于win7上内存占用较大的说明
    C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试(续)
    五种开源协议的比较(BSD_Apache_GPL_LGPL_MIT)
    C# WPF 显示图片和视频显示 EmuguCv、AForge.Net测试
    Opencv不用每次创建项目配置vs2010 vc++目录 库目录等项
    矩阵运算
  • 原文地址:https://www.cnblogs.com/yanghongtao/p/10268682.html
Copyright © 2011-2022 走看看