zoukankan      html  css  js  c++  java
  • python 池 协程

    # 有多少个任务就开多少个进程或者线程
    # 什么是池
    # 要在程序开始的时候,还没提交任务先创建几个线程或者进程
    # 放在一个池子里,这就是池
    # 为什么要用池?
    # 如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了
    # 并且开好的线程或者进程会一直存在在池中,可以被多个任务反复利用
    # 这样极大的减少了开启关闭调度线程/进程的时间开销
    # 池中的线程/进程个数控制了操作系统需要调度的任务个数,控制池中的单位
    # 有利于提高操作系统的效率,减轻操作系统的负担
    # 发展过程
    # threading模块 没有提供池
    # multiprocessing模块 仿照threading写的 Pool
    # concurrent.futures模块 线程池,进程池都能够用相似的方式开启使用

    # 线程池
    # import time
    # import random
    # from threading import current_thread
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # def func(a,b):
    # print(current_thread().ident,'start',a,b)
    # time.sleep(random.randint(1,4))
    # print(current_thread().ident,'end')
    #
    # if __name__ == '__main__':
    # tp = ThreadPoolExecutor(4)
    # for i in range(20):
    # tp.submit(func,i,b=i+1)

    # 实例化 创建池
    # 向池中提交任务,submit 传参数(按照位置传,按照关键字传)

    # 进程池
    # import os
    # import time,random
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # def func(a,b):
    # print(os.getpid(),'start',a,b)
    # time.sleep(random.randint(1,4))
    # print(os.getpid(),'end')
    #
    # if __name__ == '__main__':
    # tp = ProcessPoolExecutor(4)
    # for i in range(20):
    # tp.submit(func,i,b=i+1)

    # 获取任务结果
    # import os
    # import time,random
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # def func(a,b):
    # print(os.getpid(),'start',a,b)
    # time.sleep(random.randint(1,4))
    # print(os.getpid(),'end')
    # return a*b
    #
    # if __name__ == '__main__':
    # tp = ProcessPoolExecutor(4)
    # futrue_l = {}
    # for i in range(20): # 异步非阻塞的
    # ret = tp.submit(func,i,b=i+1)
    # futrue_l[i] = ret
    # # print(ret.result()) # Future未来对象
    # for key in futrue_l: # 同步阻塞的
    # print(key,futrue_l[key].result())

    # map 只适合传递简单的参数,并且必须是一个可迭代的类型作为参数
    # import os
    # import time,random
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # def func(a):
    # print(os.getpid(),'start',a[0],a[1])
    # time.sleep(random.randint(1,4))
    # print(os.getpid(),'end')
    # return a[0]*a[1]
    #
    # if __name__ == '__main__':
    # tp = ProcessPoolExecutor(4)
    # ret = tp.map(func,((i,i+1) for i in range(20)))
    # for key in ret: # 同步阻塞的
    # print(key)

    # 回调函数 : 效率最高的
    import time,random
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor

    def func(a,b):
    print(current_thread().ident,'start',a,b)
    time.sleep(random.randint(1,4))
    print(current_thread().ident,'end',a)
    return (a,a*b)

    def print_func(ret): # 异步阻塞
    print(ret.result())

    if __name__ == '__main__':
    tp = ThreadPoolExecutor(4)
    futrue_l = {}
    for i in range(20): # 异步非阻塞的
    ret = tp.submit(func,i,b=i+1)
    ret.add_done_callback(print_func) # ret这个任务会在执行完毕的瞬间立即触发print_func函数,并且把任务的返回值对象传递到print_func做参数
    # 异步阻塞 回调函数 给ret对象绑定一个回调函数,等待ret对应的任务有了结果之后立即调用print_func这个函数
    # 就可以对结果立即进行处理,而不用按照顺序接收结果处理结果

    # import time
    # import random
    # import queue
    # from threading import Thread
    #
    # def func(q,i):
    # print('start',i)
    # time.sleep(random.randint(1,5))
    # print('end',i)
    # q.put(i*(i+1))
    # # return i*i+1
    # def print_func(q):
    # print(q.get())
    #
    # q = queue.Queue()
    # for i in range(20):
    # Thread(target=func,args=(q,i)).start()
    #
    # for i in range(20):
    # Thread(target=print_func, args=(q,)).start()




    from concurrent.futures import ThreadPoolExecutor
    import requests
    import os
    
    def get_page(url):    # 访问网页,获取网页源代码   线程池中的线程来操作
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def parse_page(res):   # 获取到字典结果之后,计算网页源码的长度,把https://www.baidu.com : 1929749729写到文件里   线程任务执行完毕之后绑定回调函数
        res=res.result()
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        # 获得一个线程池对象 = 开启线程池
        tp = ThreadPoolExecutor(4)
        # 循环urls列表
        for url in urls:
            # 得到一个futrue对象 = 把每一个url提交一个get_page任务
            ret = tp.submit(get_page,url)
            # 给futrue对象绑定一个parse_page回调函数
            ret.add_done_callback(parse_page)   # 谁先回来谁就先写结果进文件
    
    # 不用回调函数:
        # 按照顺序获取网页 百度 python openstack git sina
        # 也只能按照顺序写
    # 用上了回调函数
        # 按照顺序获取网页 百度 python openstack git sina
        # 哪一个网页先返回结果,就先执行那个网页对应的parserpage(回调函数)
    
    
    # 会起池会提交任务
    # 会获取返回值会用回调函数
    
    # 1.所有的例题 会默
    # 2.进程池(高计算的场景,没有io(没有文件操作没有数据库操作没有网络操作没有input)) : >cpu_count*1  <cpu_count*2
    #   线程池(一般根据io的比例定制) : cpu_count*5
    # 5*20 = 100并发
    

      协程

    # 进程
    # 线程
    # 正常的开发语言 多线程可以利用多核
    # cpython解释器下的多个线程不能利用多核 : 规避了所有io操作的单线程
    # 协程
    # 是操作系统不可见的
    # 协程本质就是一条线程 多个任务在一条线程上来回切换
    # 利用协程这个概念实现的内容 : 来规避IO操作,就达到了我们将一条线程中的io操作降到最低的目的

    # import time
    # def func1():
    # print('start')
    # time.sleep(1)
    # print('end')
    #
    # def func2():
    # print('start')
    # time.sleep(1)
    # print('end')

    # 切换 并 规避io 的两个模块
    # gevent = 利用了 greenlet 底层模块完成的切换 + 自动规避io的功能
    # asyncio = 利用了 yield 底层语法完成的切换 + 自动规避io的功能
    # tornado 异步的web框架
    # yield from - 更好的实现协程
    # send - 更好的实现协程
    # asyncio模块 基于python原生的协程的概念正式的被成立
    # 特殊的在python中提供协程功能的关键字 : aysnc await

    # 进程 数据隔离 数据不安全 操作系统级别 开销非常大 能利用多核
    # 线程 数据共享 数据不安全 操作系统级别 开销小 不能利用多核 一些和文件操作相关的io只有操作系统能感知到
    # 协程 数据共享 数据安全 用户级别 更小 不能利用多核 协程的所有的切换都基于用户,只有在用户级别能够感知到的io才会用协程模块做切换来规避(socket,请求网页的)

    # 用户级别的协程还有什么好处:
    # 减轻了操作系统的负担
    # 一条线程如果开了多个协程,那么给操作系统的印象是线程很忙,这样能多争取一些时间片时间来被CPU执行,程序的效率就提高了

    # a = 1
    # def func():
    # global a
    # # 切换
    # a += 1
    # # 切换
    #
    # import dis
    # dis.dis(func)
    # 对于操作系统 : python代码--> 编译 --> 字节码 --> 解释 --> 二进制010101010010101010
    # 二进制 反编译过来的 --> LOAD_GLOBAL

    # 4cpu
    # 进程 :5个进程
    # 线程 :20个
    # 协程 :500个
    # 5*20*500 = 50000

    # 协程的原理
    import time
    def sleep(n):
        print('start sleep')
        yield time.time() + n
        print('end sleep')
    
    def func(n):
    
        print(123)
        yield from sleep(n)   # 睡1s
        print(456)
    
    def run_until_complete(g1,g2):
        ret1 = next(g1)
        ret2 = next(g2)
        time_dic = {ret1: g1, ret2: g2}
        while time_dic:
            min_time = min(time_dic)
            time.sleep(min_time - time.time())
            try:
                next(time_dic[min_time])
            except StopIteration:
                pass
            del time_dic[min_time]
    
    
    n = 1
    g1 = func(1)
    g2 = func(1.1)
    run_until_complete(g1,g2)








  • 相关阅读:
    JS获取今天的日期
    领域模型vs数据模型,应该怎么用?
    如何让技术想法更容易被理解?
    如何做好技术 Team Leader
    回归分析中常见的“门槛模型”!
    有了数据湖,距离数据仓库消失还有几年?
    数据治理 VS 公司治理、IT治理、数仓治理
    Sentence-seven basic patterns 英语句子结构
    VM的Linux CentOS系统的VMTools的手动安装
    linux下IPTABLES配置详解
  • 原文地址:https://www.cnblogs.com/shaohuagu/p/12287495.html
Copyright © 2011-2022 走看看