zoukankan      html  css  js  c++  java
  • day32 信号量 事件 管道 进程池

    今日主要内容:

    1.管道(Pipe) 数据接收一次就没有了

    2.事件(Event)

    3.基于事件的进程通信

    4.信号量(Semaphore)

    5. 进程池(重点)

    6.进程池的同步方法和异步方法

    7. 进程池的回调函数,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回调函数是在异步进程中的函数


    1.管道(Pipe) 数据接收一次就没有了

    Pipe() #创建管道,全双工,返回管道的两端,但是一端发送消息,只能是另一端才能接受到,自己这一端是接收不到的

    from multiprocessing import Process,Pipe
    def f1(conn):
    from_zhujincheng=conn.recv()# 此处接收可以接收任何数据,而且不需要带最大的传输的大小
    print('我是子进程')
    print('来自主进程的消息>>>',from_zhujincheng)

    if __name__=='__main__':
    conn1,conn2=Pipe() # 创建管道,全双工,返回管道的两端,但是一端发送的消息,只能另一端接收,自己这一端是不能接受的
    # 可以将一端或者两端发送给其他的进程,那么多个进程之间就可以通过这一个管道进行通信了 但是这样会导致数据的不安全,容易导致程序错乱

    p1=Process(target=f1,args=(conn1,) )
    p1.start()
    conn2.send('你好啊,大兄嘚')
    print('我是主进程')

    2.事件(Event)

    Event() #创建事件对象,这个对象的初始状态为False

    from multiprocess import Process,Event

    e=Event() # 创建事件对象,这个对象的初始状态为False
    print('判断e的状态>>>' , e.is_set()) #判断e当前的状态
    print('程序运行到这里了')
    e.set() # 这里的作用是将对象的状态改为True
    e.clear() # 将e的状态改为False
    e.wait() # 如果程序中e的状态为False,那么系统会阻塞在这里,只有改为True的时候才会继续往下执行
    print('进程走过了wait.')

    3.基于事件的进程通信
    import time
    from multiprocessing import Process,Event

    def func(e):
    time.sleep(1)
    n=100
    print('子进程计算结果为',n)
    e.set() # 将e的值修改为True

    if __name__=='__main__':
    e=Event() # 创建事件对象,初始状态为False
    p=Process(target=func,args=(e,))
    p.start()

    print('主程序在等待中......')
    e.wait() # 等待e的值变为True
    print('结果执行完毕,可以拿到这个值')

    4.信号量(Semaphore)

    S = semphore(4),内部维护了一个计数器,acquire-1,release+1,为0的时候,其他的进程都要在acquire之前等待
    S.acquire()
    需要锁住的代码
    S.release()

    Semaphore(4) # 计数器4,acquire一次减一,如果semaphore的值为0,其他人等待,release加一 . 抢票啊啥的一般用这个,就是多个程序抢占这4个cpu,如果4个都被占用了,那么他们剩余的全部等待,只有当一个人执行完毕的时候,剩下的人才会继续争抢这个cpu.

    import time
    import random
    from multiprocessing import Process,Semaphore
    def func():
    s.acquire() # 与进程锁的用法类似
    print(f'{i}号男嘉宾登场')
    time.sleep(random.randint(1,3)) # 随机
    s.release() #与进程锁的用法类似
    if __name__=='__main__':
    s=Semaphore(3) # 计数器3,每当acquire一次,计数器就减一,当release一次,计数器就加一. 如果计数器为0,那么意味着所有的计数器都被占用,其他的程序都要等待这3个程序,如果一个执行完毕后,剩下的才可以继续争抢这个名额.
    for i in range(10): # 创建了10个进程,
    p=Process(target=func,args=(i,))
    p.start()

    5. 进程池(重点)

    进程的创建和销毁是很有消耗的,影响代码执行效率
    进程池:
    Map:异步提交任务,并且传参需要可迭代类型的数据,自带close和join功能
    Res = Apply(f1,args=(i,)) #同步执行任务,必须等任务执行结束才能给进程池提交下一个任务,可以直接拿到返回结果res

    Res_obj = Apply_async(f1,args=(i,)) #异步提交任务,可以直接拿到结果对象,从结果对象里面拿结果,要用get方法,get方法会阻塞程序,没有拿到结果会一直等待

    Close : 锁住进程池,防止有其他的新的任务在提交给进程池
    Join : 等待着进程池将自己里面的任务都执行完

    回调函数:
    Apply_async(f1,args=(i,),callback=function) #将前面f1这个任务的返回结果作为参数传给callback指定的那个function函数


    进程池的map用法 (Pool)

    # 对比多进程和进程池的效率,统计进程池和多进程执行100个任务的时间
    import time
    from multiprocessing import Process,Pool
    def func(i):
    time.sleep(0.5)
    for a in range(10):
    n=n+i

    if __name__=='__main__':
    # 进程池执行100个任务的时间
    s_time=time.time()# 记录开始的时间
    p=Pool(4) # 里面的参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,那么默认开启的进程数就是你电脑的cpu个数
    p.map(func,range(100)) # map 2个参数第一个是你要执行的函数,第二个必须是可迭代的 ,异步提交任务,自带join功能
    e_time=time.time()# 记录结束的时间
    res_time=e_time-s_time
    print('进程池执行命令所用时间>>>',res_time)

    #多进程执行100个任务的时间
    p_s_t=time.time()
    p_list=[]
    for i in range(100):
    p=Process(target=func,args=(i,))
    p.start()
    p_list.append(p)

    [pp.join() for pp in p_list]
    p_e_t=time.time()

    res_t = p_e_t - p_s_t

    print('多进程执行命令所用的时间>>>',res_t)

    # 结果
    进程池执行命令所用时间:0.40s
    多进程执行命令所用时间:9.24s
    所以说进程池的时间比多进程的时间快了近十倍,所以......你懂我的意思吧,兄嘚

    6.进程池的同步方法和异步方法
    apply() 同步方法,将任务变成了串行
    apply_async()异步方法

    # 同步方法
    import time
    from multiprocessing import Process,Pool
    def func(n):
    time.sleep(1)
    # print(n)
    return n*n

    if __name__=='__main__':
    po=Pool(4)
    for i in range(10):
    print('你好啊,大哥')
    res=po.apply(func,args=(i,)) #进程池的同步方法,将任务变成了串行
    print(res)


    #异步方法
    import time
    from multiprocessing import Process,Pool

    def func():
    pass

    if __name__=='__main__':
    po=Pool(4)
    p_list=[]
    for i in range(10):
    print('你能每次都看到我吗?')
    res=po.apply_async(func,args=(i,)) #异步给进程池提交任务
    p_list.append(res)

    po.close()#锁住进程池,意思就是不让其他的长须再往这个进程池里面提交任务了
    po.join() # 必须等待子程序执行完毕才可以执行主程序

    #打印结果,如果异步提交之后的结果对象
    for i in p_list:
    print(i.get())# 从对象中拿值需要用到get方法,get的效果是join的效果
    # 主程序运行结束,进程池里面的任务全部停止,不会等待进程池里面的任务
    print('主进程直接结束')


    7. 进程池的回调函数,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回调函数是在异步进程中的函数

    所谓的回调函数其实指的是,在主进程中第一个函数算出的值,被回调函数把结果传入到第二个函数中进行计算.

    import os
    from multiprocess import Process,Pool

    def func(n):
    s=n+1
    return s**2

    def func2(x):
    print('回调函数中的结果>>>',x)

    if __name__=='__main__':
    po=Pool(4)
    po.apply_async(func,args=(3,),callback=func2)
    po.close()
    po.join()

    print('主进程的id',os.getpid())

  • 相关阅读:
    程序员要善于在工作中找到偷懒的办法
    关于count(1) 和 count(*)
    前端设计+程序开发那点事
    关于MySQL Connector/C++那点事儿
    windows下编译php5.2.17这是闹哪样?
    easyui使用时出现这个Uncaught TypeError: Cannot read property 'combo' of undefined
    视频文件自动转rtsp流
    Jenkins Pipeline如何动态的并行任务
    Jenkins的Dockerfile中如何批量迁移原Jenkins安装的插件
    Groovy中json的一些操作
  • 原文地址:https://www.cnblogs.com/zty1304368100/p/10262682.html
Copyright © 2011-2022 走看看