zoukankan      html  css  js  c++  java
  • 进程池、线程池、异步调用(取返回值)

    进程池vs线程池

    为什么要用“池”:
    池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务

    池子内什么时候装进程:并发的任务属于计算密集型
    池子内什么时候装线程:并发的任务属于IO密集型
    concurrent:并发的,一致的,同时发生的  Executor执行者 

    '''
    #1、阻塞与非阻塞指的是程序的两种运行状态
    阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源
    非阻塞(就绪态或运行态):没有遇到IO操作,或者通过某种手段让程序即便是遇到IO操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU


    #2、同步与异步指的是提交任务的两种方式:
    同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才继续执行下一行代码
    异步调用:提交完任务后,不在原地等待,直接执行下一行代码,等到任务有返回值后自动触发后调函数

    '''
     
    #进程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time,os,random
    
    def task(x):
        print('%s 接客' %os.getpid())
        time.sleep(random.randint(2,5))
        return x**2
    
    
    if __name__ == '__main__':
        # p=ProcessPoolExecutor(3) # 不指定默认开启的进程数是cpu的核数
        # print(os.cpu_count())查看CPU的核数
        p=ProcessPoolExecutor(max_workers=2) # 默认开启的进程数是cpu的核数,也就是干活的人数
    
        #简写成for循环模拟6个任务执行
        for i in range(6):
            p.submit(task,i)  #submit只是负责往池子里面丢任务,刚开始2个人同时接客
    
    
    #1.submit往里面丢任务干活的人就是你指定的ProcessPoolExecutor(2)后面的人数
    #2.刚一造个进程池,立马给你预备2个人干活,无论你丢多少任务,干活的永远是那2个人(谁先完了腾出手再接下一个)
    #线程池
    def task(x):
        print('%s is running'%x)
        time.sleep(random.randint(2, 5))
        # return x**2
    
    if __name__ == '__main__':
        # p=ProcessPoolExecutor(max_workers=2)
        p=ThreadPoolExecutor(20)   #默认不写开启的是CPU核数*5
    
        for i in range(25):
            p.submit(task,i)

    result功能(可以通过每次线程或者进程对象.result(),拿到返回值)

    #线程池(result功能)
    def task(x):
        print('%s is running'%x)
        time.sleep(random.randint(2, 5))
        return x**2
    
    if __name__ == '__main__':
        # p=ProcessPoolExecutor(max_workers=2)
        p=ThreadPoolExecutor(3)   #默认不写开启的是CPU核数*5
    
        for i in range(6):
            obj=p.submit(task,i)
            print(obj.result())     #重点:每个线程打印 is running后,obj.result()都回去等着拿return x**2的返回值,所以输出结果会是串行的输出
    
        print('')
    
    '''结果:因为每次
    1 is running
    1
    2 is running
    4
    3 is running
    9
    4 is running
    16
    5 is running
    25
    主
    
    
    '''

     线程池、进程池下异步调用分析:

    版本1:

    #线程池下分析异步调用(取返回值),分析版本1
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,os,random
    #线程池为例:
    def task(x):
        print('%s is running'%x)
        time.sleep(random.randint(2, 5))
        return x**2
    
    if __name__ == '__main__':
        异步调用:submit只负责提交任务,不会等待拿结果不会拿return
        p=ThreadPoolExecutor(4)
        obj_l=[]
        for i in range(10):
            obj=p.submit(task,i)   #每个返回obj对象里面,其实都封装了一个result方法
            # print(obj.result()) #可以通过result拿到每次return的返回值
            obj_l.append(obj)
            # print(obj_l)
        # p.close()
        # p.join() #
        p.shutdown(wait=True)  #关闭进程池入口 等着取一个数目就减1
    
        #实现异步调用拿结果:如何取结果:
        print(obj_l[0].result())  #第一个任务的结果 即return 0**2
        print(obj_l[1].result())  #第二个:1**2
        print('')
    
        同步调用:每次都拿到返回结果
        p=ThreadPoolExecutor(4)
        for i in range(10):
            obj=p.submit(task,i)
            print(obj.result())   #每次等待返回值后再执行下一个
        print('')

    版本2:

    #线程池下分析异步调用(取返回值),分析版本2
    def task(x):
        print('%s is running' % x)
        time.sleep(10)
        return x ** 2
    
    def parse(res):
        print('.......')
    
    if __name__ == '__main__':
        # 异步调用:
        p=ThreadPoolExecutor(4)
        obj_l=[]
        for i in range(10):
            obj=p.submit(task,i)
            obj_l.append(obj)
    
        #实现目的:为了保证进程池里面任务全部运行完,后再统一拿返回值
        p.shutdown(wait=True)  #1.关闭进程池入口(确保统计时进程池不会再有数据进入),等着取一个数目就减1
        # print(obj_l)  #第一个任务的结果 即return 0**2
        for future in obj_l:
            # print(future.result())
            parse(future.result())  #4个任务一次10s + 4个任务(每个1s)=共14s
        print('')

    最终版本(引用回调函数)

    #线程池、进程池异步调用取返回值最终版(引用回调函数add_done_callback())
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time, os, random
    import os
    
    def task(x):
        print('%s is running' %x)
        # time.sleep(1)
        return x ** 2
        # parse(x**2)  #产生结果后立马被处理
    
    def parse(future):
        # time.sleep(1)
        res=future.result()
        print('%s 处理了 %s'%(os.getpid(),res))  #统一是主进程的一个Pid
    if __name__ == '__main__':
        poo1=ProcessPoolExecutor(4)  #进程为9s左右
        # poo1=ThreadPoolExecutor(4)   #线程为6s左右,线程效率更高
        start=time.time()
        for i in range(1,5):
            future=poo1.submit(task,i)
            #回调函数就是有结果之后立马执行结果,回调是主进程去处理运行结果,所以parse函数os.getpid()pid都是主进程一个PID
            #提交完之后给对象绑定了回调函数,parse会在future有返回值是立即触发,并且将future当做参数传给parse
            future.add_done_callback(parse) #绑定回调函数
        poo1.shutdown(wait=True) #关闭进程池入口,确保进程池里面不会再有数据进入
        stop=time.time()
    
        print('',(stop-start))
    
    '''
    总结:根据以上执行
        回调函数:
            1.在线程池环境下,处理回调函数是线程池里面的运行的,是线程池里面的线程去执行的
            (还是I/O密集型)
            
            2.在进程池环境下,处理回调函数是主进程去处理的
            
            
    '''

    线程池与进程池里面没有join()这个方法,

  • 相关阅读:
    【iOS基础控件 6 】 汽车品牌展示 Model嵌套/KVC/TableView索引 <UITableView>
    RAC
    Magical Data Modelling Framework for JSON
    Xcode插件管理工具Alcatraz
    iOS8 Size Classes的理解与使用
    Storyboard、xib中的UIScrollView使用autolayout,使其能够滚动
    iOS事件响应链
    git一些常用的操作(转载)
    iOS本地数据存储(转载)
    使用react-native做一个简单的应用-04界面主框架
  • 原文地址:https://www.cnblogs.com/yangzhizong/p/9321369.html
Copyright © 2011-2022 走看看