zoukankan      html  css  js  c++  java
  • 协程

     对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,

    这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,

    让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。

    协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

    协程

    协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    优点:

    1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    2. 单线程内就可以实现并发的效果,最大限度地利用cpu

    缺点:

    1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    

      

    协程特点:

      1. 必须在只有一个单线程里实现并发

      2. 修改共享数据不需加锁

      3. 用户程序里自己保存多个控制流的上下文栈

      4. 一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

        协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:

    #1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
    #2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
    

    协程实例:

    # 生产者消费者模型
    
    # def consumer():
    #     g = producer()
    #     for num in g:
    #         print(num)
    #
    #
    # def producer():
    #     for i in range(100):
    #         yield i   # 保存当前程序的状态
    #
    # consumer()
    生产者消费者模型
    更好的利用协程
    一个线程的执行明确的切分开
    两个任务, 帮助你记住哪个任务执行到哪个位置上了, 并且实现安全的切换
    一个任务不得不陷入阻塞了, 在这个任务阻塞的过程中切换到另一个任务继续执行
    你的程序只要还有任务需要执行, 你的当前线程永远不会阻塞
    利用协程在多个任务陷入阻塞的时候进行切换,
        来保证一个线程在处理多个任务的时候总是忙碌的,这样能够更加充分的利用CPU,来抢占更多的时间片
    无论是进程、还是线程都是由操作系统来切换的,开启过多的线程会给操作系统的调度带来更大的压力
    
    如果我们是使用协程,协程在程序之间的切换操作系统感知不到。无论开启多少个协程对操作系统的调度不会由任何压力。
    
    协程的本质就是一条线程,所有完全不会产生数据安全问题。
    

     

      

    Greenlet模块

    greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator(生成器)

     

    # import time
    # from greenlet import greenlet
    # def eat():
    #     print('eating 1')
    #     g2.switch()  # 切换到play
    #     time.sleep(1)
    #     print('eating 2')
    #
    # def play():
    #     print('playing 1')
    #     time.sleep(1)
    #     print('playing 2')
    #     g1.switch()  # 切换回eat
    #
    # g1 = greenlet(eat)
    # g2 = greenlet(play)
    # g1.switch()
    

    上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

    from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

    或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    from gevent import monkey
    monkey.patch_all()  # 把下面导入的模块打包进来
    import time
    import gevent
    
    
    def eat():
        print('eating 1')
        # gevent.sleep(1)
        time.sleep(1)
        print('eating 2')
    def play():
        print('playing 1')
        # gevent.sleep(1)
        time.sleep(1)
        print('playing 2')
    
    g1 = gevent.spawn(eat)  # 自动的检测阻塞事件,遇见阻塞了就会进行切换,有些阻塞它不认识。。
    g2 = gevent.spawn(play)
    g1.join()  # 阻塞直到g1结束
    g2.join()
    View Code

    Gevent模块

    gevent 是一个第三方库,通过greenlet实现协程,核心就是在遇到IO操作,会自动切换状态

    from gevent import spawn,joinall,monkey;monkey.patch_all()
    
    import time
    def task(pid):
        """
        Some non-deterministic task
        """
        time.sleep(0.5)
        print('Task %s done' % pid)
    
    
    def synchronous():  # 同步
        for i in range(10):
            task(i)
    
    def asynchronous(): # 异步
        g_l=[spawn(task,i) for i in range(10)]
        joinall(g_l)
        print('DONE')
        
    if __name__ == '__main__':
        print('Synchronous:')
        synchronous()
        print('Asynchronous:')
        asynchronous()
    #  上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
    #  初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,
    #  后者阻塞当前流程,并执行所有给定的greenlet任务。执行流程只会在 所有greenlet执行完后才会继续向下走。
    

      

    from gevent import monkey; monkey.patch_all()
    # 导入其他模块 带有一些阻塞事件
    # spawn(函数名) 产生了一个协程任务,在遇到IO操作的时候帮助我们在多任务之间自动切换
    # join()       阻塞 直到某个任务被执行完毕
    # join_all()
    # value 属性   获取返回值
    

      

    # from gevent import monkey
    # monkey.patch_all()  # 把下面导入的模块打包进来
    # import time
    # import gevent
    #
    #
    # def eat():
    #     print('eating 1')
    #     # gevent.sleep(1)
    #     time.sleep(1)
    #     print('eating 2')
    #     return 'eat finished'
    #
    # def play():
    #     print('playing 1')
    #     # gevent.sleep(1)
    #     time.sleep(1)
    #     print('playing 2')
    #     return 'play finished'
    
    # g1 = gevent.spawn(eat)  # 自动的检测阻塞事件,遇见阻塞了就会进行切换,有些阻塞它不认识。。
    # g2 = gevent.spawn(play)
    # gevent.joinall([g1, g2])
    # print(g1.value)
    # print(g2.value)
    

      

    # 爬虫
    from gevent import monkey; monkey.patch_all()
    import time
    import requests
    url_lst = [
        'https://www.baidu.com',
        'https://www.zhihu.com/topics',
        'https://www.sogou.com',
        'https://www.sohu.com',
        'https://www.sina.com',
        'https://www.jd.com',
        'https://www.luffycity.com/home',
        'https://www.douban.com',
    ]
    def get_url(url):
        response = requests.get(url)
        if response.status_code == 200:  # 状态码  200正常结果
            print(url, len(response.text))
    
    start = time.time()
    for url in url_lst:
        get_url(url)
    print(time.time()-start)
    爬虫实例

    通过gevent实现单线程下的socket并发

    注意 :from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞

    from gevent import monkey;monkey.patch_all()
    import socket
    import gevent
    from threading import current_thread
    
    def talk(conn):
        print('-->',current_thread())
        while True:
            conn.send(b'hello')
            # print(conn.recv(1024))
    
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9000))
    sk.listen()
    
    while True:
        conn, addr = sk.accept()
        gevent.spawn(talk, conn)
    
    
    # 主线程 accept 基本上都是阻塞
    # 任务一 talk
    # 任务二 talk
    # 任务三 talk
    server
    import socket
    from threading import Thread
    
    
    def client():
        sk = socket.socket()
        sk.connect(('127.0.0.1', 9000))
    
        while True:
            print(sk.recv(1024))
            sk.send(b'byebye')
    
    
    for i in range(10):
        Thread(target=client,).start()
    client

     

  • 相关阅读:
    Spring Boot 配置文件和命令行配置
    Spring Boot RestApi 测试教程 Mock 的使用
    Spring Boot 集成 Swagger2 教程
    Spring Boot Web 开发@Controller @RestController 使用教程
    SpringBoot RESTful API 架构风格实践
    Spring Boot 使用 Aop 实现日志全局拦截
    Spring Boot 全局异常处理
    Spring Boot Thymeleaf 模板引擎的使用
    Spring Boot Log 日志使用教程
    IDEA 下的 github 创建提交与修改
  • 原文地址:https://www.cnblogs.com/eaoo/p/9708540.html
Copyright © 2011-2022 走看看