zoukankan      html  css  js  c++  java
  • python 管道 事件(Event) 信号量 进程池(map/同步/异步)回调函数

    ####################总结########################

    管道:是进程间通信的第二种方式,但是不推荐使用,因为管道会导致数据不安全的情况出现

    事件:当我运行主进程的时候 需要子执行某个进程后 需要的返回值时 可以使用

    信号量:互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。

       内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待

    进程池: 

      进程的创建和销毁是很有消耗的,影响代码执行效率

      进程池:定义一个池子,池中进程的数量是固定的,那么同一时间有固定数量的进程在运行。

         这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果

    管道

    ###管道
    from multiprocessing import Process,Pipe
    
    def f1(conn):
        zhu=conn.recv()
        print('我是子进程')
        print('来自主进程消息',zhu)
    if __name__ == '__main__':
        conn1,conn2=Pipe()
        p1=Process(target=f1,args=(conn2,))
        p1.start()
        conn1.send('我是conn1')
        print('我是主进程')
    #######################
    我是主进程
    我是子进程
    来自主进程消息 我是conn1

    事件

    from multiprocessing import Process,Event
    e=Event()
    print('e的状态是',e.is_set())
    print('进程运行到这里了')
    e.set()#将e的状态改为True
    print('e的状态是',e.is_set())
    
    e.clear()#将e的状态改为False
    e.wait()#e这个事件对象如果值为False,就在我加wait的地方等待
    print('进程过了wait')
    ##################
    e的状态是 False
    进程运行到这里了
    e的状态是 True

    信号量

    import time
    import random
    from multiprocessing import Process,Semaphore
    def f1(i,s):
        s.acquire() #此时进来的人获取房间
        print('%s男嘉宾到了'%i)
        time.sleep(random.randint(1,3))
        s.release() #出房间,此时下个人可以进入
    if __name__ == '__main__':
        s = Semaphore(4)  #计数器4,acquire一次减一,为0 ,其他人等待,release加1,
        for i in range(10):
            p = Process(target=f1,args=(i,s))
            p.start()
    ###############
    6男嘉宾到了
    4男嘉宾到了
    7男嘉宾到了
    3男嘉宾到了
    #前面的人出去了,出去一个就往后上加 
    2男嘉宾到了
    1男嘉宾到了
    0男嘉宾到了
    5男嘉宾到了
    9男嘉宾到了
    8男嘉宾到了

    进程池 map

    import time
    from multiprocessing import Pool,Process
    #计算下开多进程和进程池的执行效率
    def func(n):
        for i in range(5):
            n = n + i
            # print(n)
    if __name__ == '__main__':
        #进程池模式时间统计
        pool_s_time = time.time()
        pool = Pool(4)    #创建含有4个进程的进程池   ****一般约定俗成的是进程池中的进程数量为CPU的数量,工作中要看具体情况来考量
        pool.map(func,range(100))    #参数数据必须是可迭代的,异步提交任务,自带join功能
                                     #注意,map目前只限于接收一个可迭代的数据类型参数,
        #多进程模式
        pool_end_time = time.time()
        pool_dif_time = pool_end_time - pool_s_time
    #############进程时间统计#####################################
        # 统计100个进程,来执行100个任务的执行时间
        p_s_time = time.time()
        p_lst= [] #因为这个是异步的,我子进程执行他自己的,如果直接在for循环join会每个进程等待
        #所以要放到进程直接 内存中 然后在join
        for i in range(100):
            p = Process(target= func,args = (i,))
            p.start()
            p_lst.append(p)
        [p.join() for p in p_lst]
        p_e_time = time.time()
        p_dif_time = p_e_time - p_s_time
    
        print("进程池的执行时间",pool_dif_time)   #0.176239013671875      进程池 的效率高
        print("创建进程的执行时间",p_dif_time)    #7.825609922409058

    进程池同步 把进程池 变串行执行的方式

    import time
    from multiprocessing import Process,Pool
    def f1(n):
        time.sleep(1)
        # print(n)
        return n*n
    if __name__ == '__main__':
        pool = Pool(4)
        for i in range(10):
            print('xxxxxx')
            res = pool.apply(f1,args=(i,))
            print(res)
    ############
    xxxxxx
    0
    xxxxxx
    1
    xxxxxx
    4
    xxxxxx
    9
    xxxxxx
    16

    进程池 异步执行

    import time
    from multiprocessing import Pool
    
    def f1(i):
        time.sleep(1)
        # print(i)
        return i
    if __name__ == '__main__':
        p=Pool(4)
        res_lst=[]#循环
        for i in range(10):
            # print('xxxx')
            res=p.apply_async(f1,args=(i,))
            # print(res)#10个空结果对象
            res_lst.append(res)#把结果值打印大内存中
        #主进程运行结果,进程池里面的任务全部停止,不会等待进程池里面的任务
        # p.close()#锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了
        # p.join()#它的作用是等到进程池的任务全部执行完,然后结束主进程
        #默认会 4个一打印  有了这2个参数后就会 一次性获取出来
        for r in res_lst:
            print(r.get())
        print('等待所有任务执行完')
    #########################################

    D:python_work_s18venvScriptspython.exe D:/python_work_s18/day31/1111.py
    0
    1
    2
    3

    --
    4
    5
    6
    7

    ---
    8
    9
    等待所有任务执行完

    回调函数:

     Apply_async(f1,args=(i,),callback=function)  #将前面f1这个任务的返回结果作为参数传给callback指定的那个function函数

    import os
    from multiprocessing import Pool,Process
    def f1(n):
        print('进程池里面的进程id',os.getpid())
        print('>>>>',n)
        return n*n
    def call_back_func(asdf):
        print('>>>>>>>',os.getpid())
        print('回调函数中的结果:', asdf)
    if __name__ == '__main__':
        pool=Pool(4)
        res = pool.apply_async(f1,args=(5,),callback=call_back_func)
        pool.close()
        pool.join()
        print('主进程id',os.getpid())
    ################################
    进程池里面的进程id 7872
    >>>> 5
    >>>>>>> 8640
    回调函数中的结果: 25
    主进程id 8640
    不怕大牛比自己牛,就怕大牛比自己更努力
  • 相关阅读:
    Oracle客户端安装与配置
    Word VBA(批量复制Excel表格和Word表格到Word中)
    批量追加数据库(GDB,MDB,Shp)
    批量横向打印Excel
    重命名要素类
    删除GIS数据属性值空格(GDB,MDB,Shp)
    删除GIS数据库空层(GDB,MDB,Shape)
    批量裁剪GIS数据(包含GDB,MDB,Shp)
    批量裁剪GDB
    3DS文件导出MultiPatch
  • 原文地址:https://www.cnblogs.com/zaizai1573/p/10253218.html
Copyright © 2011-2022 走看看