zoukankan      html  css  js  c++  java
  • 管道,事件,信号量,进程池

    管道:
    Conn1,conn2 = Pipe()
    Conn1.recv()
    Conn1.send()
    数据接收一次就没有了
    from multiprocessing import Process,Pipe
    def f1(conn):
        from_zhu = conn.recv()
    
        print('我是子进程')
        print('来自子进程的消息:', from_zhu)
    
    if __name__ == '__main__':
        conn1,conn2 = Pipe()
        p1 = Process(target=f1,args=(conn1,))
        p1.start()
        conn1.send('小宝贝,你在哪')
    
        print('我是主进程')
    事件:
    E = Event() #初识状态是false
    E.wait() 当事件对象e的状态为false的时候,在wait的地方会阻塞程序,当对象状态为true的时候,直接在这个wait地方继续往下执行
    E.set() 将事件对象的状态改为true,
    E.is_set() 查看状态
    E.clear() 将事件对象的状态改为false

    from multiprocessing import Process,Event
    
    e = Event()
    print('e的状态是:', e.is_set())
    print('进程运行到这里了')
    
    e.set()
    print('e的状态是:', e.is_set())
    
    e.clear()
    
    e.wait()
    
    print('进程过了wait')
    
    信号量:
    S = semphore(4),内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待
    S.acquire()
    需要锁住的代码
    S.release()
    import time
    import random
    from multiprocessing import Process,Semaphore
    
    def f1(i,s):
        # s.acquire()
        with s:
            print('%s男嘉宾到了' % i)
            time.sleep(random.randint(1,3))
        # s.release()
    
    
    if __name__ == '__main__':
        s = Semaphore(4)  # 计数器4,acquire一次减一,为0,其他人等待,release加1
        for i in range(10):
            p = Process(target=f1, args=(i,s))
            p.start()
    

      

    进程池(*****)
    进程的创建和销毁是很有消耗的,影响代码执行效率

    进程池:
    Map:异步提交任务,并且传参需要可迭代类型的数据,自带close和join功能
    Res = Apply(f1,args=(i,)) #同步执行任务,必须等任务执行结束才能给进程池提交下一个任务,可以直接拿到返回结果res

    Res_obj = Apply_async(f1,args=(i,)) #异步提交任务,可以直接拿到结果对象,从结果对象里面拿结果,
    要用get方法,get方法会阻塞程序,没有拿到结果会一直等待

    Close : 锁住进程池,防止有其他的新的任务在提交给进程池
    Join : 等待着进程池将自己里面的任务都执行完

    回调函数:
    Apply_async(f1,args=(i,),callback=function) #将前面f1这个任务的返回结果作为参数传给callback指定的那个function函数

    # 对比多进程和进程池的效率
    import time
    from multiprocessing import Process, Pool
    
    def f1(n):
        for i in range(5):
            n = n+i
    
    if __name__ == '__main__':
        # 统计进程池执行100个任务的时间
        s_time = time.time()
        pool = Pool(4)  # 里面这个参数是指进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数
        # pool.map(f1, [1, 2])    # 参数数据必须是可迭代的
        pool.map(f1, range(100))    # 参数数据必须是可迭代的,异步提交任务,自带join功能
        e_time = time.time()
        dif_time = e_time - s_time
    
        # 统计100个进程,来执行100个任务的执行时间
        p_s_t = time.time()  # 多进程起始时间
        p_list = []
        for i in range(100):
            p = Process(target=f1, args=(i,))
            p.start()
            p_list.append(p)
    
        [pp.join() for pp in p_list]
        p_e_t = time.time()
        p_dif_t = p_e_t - p_s_t
        print('进程池的时间:', dif_time)
        print('多进程的执行时间', p_dif_t)
    

      

  • 相关阅读:
    tile38 复制配置
    The Guardian’s Migration from MongoDB to PostgreSQL on Amazon RDS
    tile38 一款开源的geo 数据库
    sqler sql 转rest api 的docker 镜像构建(续)使用源码编译
    sqler sql 转rest api javascript 试用
    sqler sql 转rest api redis 接口使用
    sqler sql 转rest api 的docker image
    sqler sql 转rest api 的工具试用
    apache geode 试用
    benthos v1 的一些新功能
  • 原文地址:https://www.cnblogs.com/YangWenYu-6/p/10267455.html
Copyright © 2011-2022 走看看