zoukankan      html  css  js  c++  java
  • 7.进程池与线程池

    进程池与线程池 - 回调函数 - 协程

    1.进程池与线程池

    # 进程池与线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    
    # 获取CPU处理器的逻辑核心数
    print(os.cpu_count()) # 8  代表8核处理器
    
    # 1. 进程池 ProcessPoolExecutor
    """
    多个进程已提前在进程池中开辟了,可以并发,也可以并行
    """
    def func(i):
        time.sleep(random.random())
        print(i, "执行任务~,当前进程号{}".format(os.getpid()))
        return i
    
    if __name__ == "__main__":
        # 创建进程池对象
        """ProcessPoolExecutor(参数),默认参数是电脑硬件CPU核心数"""
        p = ProcessPoolExecutor() # 进程数量 = 参数
        # 异步提交任务
        """
        语法 :
            submit(函数,参数1,参数2,....)
        含义:
            进程池里有8个进程提前开辟好了,不要再一个一个开辟进程了,执行多个任务
        注意点:
            如果一个进程多时间内可以完成更多任务
            进程池就不会用更多进程辅助完成,可以节省系统资源
        """
        lst = []
        for i in range(10):
            obj = p.submit(func,i) #创建提交任务对象
            # obj.result() #获取函数返回值 不要加这里,会把异步变同步
            lst.append(obj)
    
        for i in lst:
            print(i.result(),"=1=") 
    
        # shutdown 等待进程池所有任务执行完毕后,在放行
        """
        result 和 shutdown 用一个就可以同步进程池和主进程
        """
        p.shutdown()
        print("进程池结束了~~")
    
    # 2. 线程池 ThreadPoolExecutor
    from threading import currentThread as ct
    def func(i):
        time.sleep(0.2)
        print(i,"线程任务执行中..,当前线程号{}".format(ct().ident))
        return ct().ident
        
    if __name__ == "__main__":
        # 创建线程池对象
        t = ThreadPoolExecutor() #线程数量 = 参数 * 5
        # 异步提交任务
        """
        语法 :
            submit(函数,参数1,参数2,....)
        """
        lst = []
        setvar = set()
        for i in range(100 ):
            obj = t.submit(func,i)
            lst.append(obj)
        for i in lst:
            setvar.add(i.result()) #获取线程号返回值,集合自动去重
        t.shutdown() # 等待线程池所有任务执行完毕后,在放行
        print(len(setvar)) #40 个线程 
    
    # 3. 线程池 map 处理数据,返回迭代器
    from threading import currentThread as ct
    
    def func(i):
        time.sleep(random.random())
        print(i,"当前线程号{}".format(ct().ident))
        return i
    
    if __name__ == "__main__":
        t = ThreadPoolExecutor()
        it = t.map(func,range(100))
        for i in it:
            print(i)
        t.shutdown()
        print("线程池执行结束~~")
    
    """
    结论:
        无论是进程池还剩线程池,都是由固定的进程数或线程数完成的
        系统不会额外创建更多的进程或线程来完成任务
    """
    

    2.回调函数

    # 进程池与线程池的回调函数
    """
    回调函数 : 回头调用一下函数
    
    add_done_callback : 完成回收函数的调用
    add_done_callback作用:
        可以在获取result返回值时,还是异步并发程序;
    结论: 
        进程池的回调函数由主进程完成
        线程池的回调函数由对应子线程完成
    """
    from concurrent.futures import ProcessPoolExecutor 
    from concurrent.futures import ThreadPoolExecutor 
    from threading import currentThread as ct
    import os,time,random
    
    # 1.进程池任务
    def func(i):
        time.sleep(random.random())
        print(i,"当前进程号{}".format(os.getpid()))
        return i
        
    def callback(obj):
        print("<===回调函数进程号{}===>".format(os.getpid()))
        print(obj.result())
    
    if __name__ == "__main__":
        p = ProcessPoolExecutor()
        for i in range(20):
            obj = p.submit(func,i)
            # 异步获取当前进程返回值
            obj.add_done_callback(callback)
        p.shutdown()
        print("主进程执行结束~,进程号{}".format(os.getpid()))
    
    # 2.线程池任务
    def func(i):
        time.sleep(random.random())
        print(i,"当前线程号{}".format(ct().ident))
        return i
    
    def callback(obj):
        print("-====回调函数线程号{}====".format(ct().ident))
        print(obj.result())
        
    if __name__ == "__main__":
        t = ThreadPoolExecutor()
        for i in range(100):
            obj = t.submit(func,i)
            obj.add_done_callback(callback)
        t.shutdown()
        print("主线程号{}".format(ct().ident))
    

    3.协程

    # 协程 : 记住终极版本4就行
    """
    进程是资源分配的最小单位
    线程是程序调度的最小单位
    协程是线程实现的最小单位
    总结:
        在进程一定的情况下,开辟多个线程
        在线程一定的情况下,开辟多个协程
        提高更大的并发,充分利用CPU
    """
    
    # 利用协程改写生产者消费者模型
    # 1.迭代器
    def producer():
        for i in range(100):
            yield  i
    def consumer(gen):
        for i in range(10):
            print(next(gen))
    
    gen = producer() #初始化生成器对象
    consumer(gen)
    
    # 2.greenlet 协程早期版本
    """
    switch() 与到阻塞可以来回切换任务,但是需要手动完成
    """
    from greenlet import greenlet
    import time
    
    def eat():
        print("吃吃~~")
        g2.switch()
        time.sleep(2)
        print("喝喝喝~")
        
    def play():
        print("玩玩玩~")
        time.sleep(2)
        print("乐乐乐~")
        g1.switch()
        
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch()
    
    # 3. gevent 版本 (遇到阻塞自动识别,但是不能识别所有阻塞)
    import gevent
    def eat():
        print("吃吃~~")
        # time.sleep(2) time不能识别
        gevent.sleep(2)
        print("喝喝喝~")
        
    def play():
        print("玩玩玩~")
        gevent.sleep(2)
        print("乐乐乐~")
    g1 = gevent.spawn(eat) #创建协程对象g1
    g2 = gevent.spawn(play) #创建协程对象g2
    g1.join() 
    g2.join()
    print("主线程结束~~")
    
    # 4.协程的终极形态(猴子补丁)  记住这个就行
    from gevent import monkey 
    monkey.patch_all() #猴子补丁
    """
    上面两行可以写在一行,用分号隔开
    猴子补丁 : 可以把接下来引入的所有模块中的阻塞
                    自动识别出来,自动遇到阻塞就切换任务
    1. spawn(函数,参数1,参数2...) 创建协程
    2. join 直到某个协程任务执行完毕之后,放行
    3. value 获取协程任务中的返回值
    4. joinall 等待所有协程任务执行完毕之后,放行(参数是一个列表)
    """
    import time,gevent
    def eat():
        print("吃吃~~")
        time.sleep(2)
        print("喝喝喝~")
        return "吃完了~"
        
    def play():
        print("玩玩玩~")
        time.sleep(2)
        print("乐乐乐~")
        return "玩完了~"
    
    g1 = gevent.spawn(eat) # 创建协程对象g1
    g2 = gevent.spawn(play) # 创建协程对象g2
    gevent.joinall([g1,g2]) # 等待所有协程任务执行完毕之后,放行
    print(g1.value)
    print(g2.value)
    print("主线程结束~~")
    

    4.协程案例

    # 协程案例 : 爬虫
    # 基本语法:
    import requests
    response = requests.get("http://www.baidu.com") # 返回响应对象
    print(response) #<Response [200]>
    print(response.status_code) #200  获取状态码 
    res = response.apparent_encoding
    print(res) # utf-8 获取网页中字符编码
    response.encoding = res # 设置编码集,防止乱码
    print(response.text) # 获取网页内容
    
    # 爬虫
    from gevent import monkey ; monkey.patch_all()
    import time,gevent,requests
    
    url_lst =[
    	"http://www.baidu.com",
    	"http://www.taobao.com/",
    	"http://www.jd.com/",
    	"http://www.4399.com/",
    	"http://www.7k7k.com/",
    	"http://www.baidu.com",
    	"http://www.taobao.com/",
    	"http://www.jd.com/",
    	"http://www.4399.com/",
    	"http://www.7k7k.com/",
    ]
    
    # 1.正常爬取
    def get_url(url):
        response = requests.get(url)
        if response.status_code == 200:
            print(response.text)
    """"""
    startime = time.time()
    for i in url_lst:
        get_url(i)
    endtime = time.time()
    print("所用时间是{}".format(endtime - startime)) #所用时间是3.013963222503662
    
    # 2.异步多协程爬取
    lst = []
    startime = time.time()
    for i in url_lst:
        g = gevent.spawn(get_url,i)
        lst.append(g)
    gevent.joinall(lst)
    endtime = time.time()
    print("所用时间是{}".format(endtime - startime)) #所用时间是2.8483333587646484
    
  • 相关阅读:
    ORACLE-016:ora-01720 授权选项对于&#39;xxxx&#39;不存在
    leetcode笔记:Sort Colors
    指针常量与常量指针
    Tiling POJ 2506 【大数】
    杭电5137How Many Maos Does the Guanxi Worth
    ognl.OgnlException: target is null for setProperty(null,&quot;XXXX&quot;...)
    VM虚拟机全屏显示
    http://www.blogjava.net/crespochen/archive/2011/04/22/348819.html
    springMVC配置静态资源访问的<mvc:resources>标签的使用
    eclipse package explorer视图中怎么让default package不显示?
  • 原文地址:https://www.cnblogs.com/jia-shu/p/14233080.html
Copyright © 2011-2022 走看看