zoukankan      html  css  js  c++  java
  • 线程池,进程池

    什么是线程池,进程池

    线程池,进程池本质上就是一个存储进程或线程的列表,如果是IO密集型任务使用线程池,如果是计算密集任务则使用进程池

    为什么需要线程池,进程池

    在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程,虽然线程的开销小,但肯定不能无限的开,否则系统资源迟早被耗尽,解决的办法就是控制线程的数量。

    线程/进程池不仅帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及任务的分配

    进程池的使用:

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,os
    
    # 创建进程池,指定最大进程数为3,此时不会创建进程,不指定数量时,默认为CPU和核数
    pool = ProcessPoolExecutor(3)
    
    def task():
        time.sleep(1)
        print(os.getpid(),"working..")
    
    if __name__ == '__main__':
        for i in range(10):
            pool.submit(task) # 提交任务时立即创建进程
    
        # 任务执行完成后也不会立即销毁进程
        time.sleep(2)
    
        for i in range(10):
            pool.submit(task) #再有新任务是 直接使用之前已经创建好的进程来执行

    线程池的使用:

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread,active_count
    import time,os
    
    # 创建进程池,指定最大线程数为3,此时不会创建线程,不指定数量时,默认为CPU和核数*5
    pool = ThreadPoolExecutor(3)
    print(active_count()) # 只有一个主线
    
    def task():
        time.sleep(1)
        print(current_thread().name,"working..")
    
    if __name__ == '__main__':
        for i in range(10):
            pool.submit(task) # 第一次提交任务时立即创建线程
    
        # 任务执行完成后也不会立即销毁
        time.sleep(2)
    
        for i in range(10):
            pool.submit(task) #再有新任务时 直接使用之前已经创建好的线程来执行

    同步异步

    阻塞非阻塞指的是程序的运行状态

    阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!

    非阻塞:程序在正常运行没有遇到IO操作,或者通过某种方式使程序即时遇到了也不会停在原地,还可以执行其他操作,以提高CPU的占用率

    同步指调用:发起任务后必须在原地等待任务执行完成,才能继续执行

    异步指调用:发起任务后必须不用等待任务执行,可以立即开启执行其他操作

    同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会

    程序中的异步调用并获取结果方式1:

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
            objs.append(res_obj)
    
    # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
    pool.shutdown(wait=True)
    
    # 从结果对象中取出执行结果
    for res_obj in objs:
        print(res_obj.result())
    print("over")

    程序中的异步调用并获取结果方式2:

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time
    
    pool = ThreadPoolExecutor(3)
    def task(i):
        time.sleep(0.01)
        print(current_thread().name,"working..")
        return i ** i
    
    if __name__ == '__main__':
        objs = []
        for i in range(3):
            res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
            print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
    print("over")

    异步回调

    异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

    为什么需要异步回调

    之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调

    异步回调的使用

    在编写爬虫程序时,通常都是两个步骤:

    1.从服务器下载一个网页文件

    2.读取并且解析文件内容,提取有用的数据

    按照以上流程可以编写一个简单的爬虫程序

    要请求网页数据则需要使用到第三方的请求库requests可以通过pip或是pycharm来安装,在pycharm中点击settings->解释器->点击+号->搜索requests->安装

    import requests,re,os,random,time
    from concurrent.futures import ProcessPoolExecutor
    
    def get_data(url):
        print("%s 正在请求%s" % (os.getpid(),url))
        time.sleep(random.randint(1,2))
        response = requests.get(url)
        print(os.getpid(),"请求成功 数据长度",len(response.content))
        #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
        return response
    
    def parser(obj):
        data = obj.result()
        htm = data.content.decode("utf-8")
        ls = re.findall("href=.*?com",htm)
        print(os.getpid(),"解析成功",len(ls),"个链接")
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(3)
        urls = ["https://www.baidu.com",
                "https://www.sina.com",
                "https://www.python.org",
                "https://www.tmall.com",
                "https://www.mysql.com",
                "https://www.apple.com.cn"]
        # objs = []
        for url in urls:
            # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
            # parser(res)
    
            obj = pool.submit(get_data,url) # 
            obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
            # objs.append(obj)
            
        # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
        # for obj in objs:
        #     res = obj.result()
        #     parser(res)
        # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
        # 2.解析任务变成了串行,

    总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数,

    如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。

    注意:

    1. 使用进程池时,回调函数都是主进程中执行执行

    2. 使用线程池时,回调函数的执行线程是不确定的,哪个线程空闲就交给哪个线程

    3. 回调函数默认接收一个参数就是这个任务对象自己,再通过对象的result函数来获取任务的处理结果

  • 相关阅读:
    SpringCloud面试题
    网工必知:(1)Cisco 路由器PPPOE拨号配置与NAT简单上网配置
    【网工的福利来了!】用Excel表做的“子网划分&路由聚合计算器”
    肝了,一文让你看懂《Docker极简入门指南》
    ImportError: libopenblas.so.0: cannot open shared object file
    Linux软件包管理工具 Snap 常用命令
    squashfs文件系统
    回环设备
    Mac homebrew报错Error: homebrew-core is a shallow clone.
    公钥和私钥
  • 原文地址:https://www.cnblogs.com/duGD/p/10982167.html
Copyright © 2011-2022 走看看