zoukankan      html  css  js  c++  java
  • 协程

    协程

    1.回顾与思考

    • 回顾

      • 串行:多个任务执行时,一个任务从开始执行,遇到IO阻塞,原地等待,等待IO阻塞结束后,继续执行,一个任务结束后再执行下一个
      • 并行:多核,多线程或进程同时执行,4个cpu,同时执行四个人物
      • 并发:多个任务,多个cpu在多个任务之间来回切换(遇到IO阻塞或者执行时间过长),看起来像是同时执行
      • 并发的本质:
        • 遇到IO阻塞或执行时间过长切换cpu
        • 保持原来的状态
    • 当任务量不大时,是否有一种方法,可以实现一个线程的并发

      • 协程本质上就是一个线程,线程任务的切换是由操作系统控制的,遇到IO阻塞自动切换,现在用协程的目的就是减少操作系统切换的开销(开关线程,创建寄存器,堆栈等,在他们之间来回切换等),在自己的程序里面来控制任务的切换
    • yeild:类似协程

      • #1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
        #2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
        import time
        def func():
            for i in range(11):
                yield
                print('这是我第%s次打印了' % i)
                time.sleep(1)
        
        def func2():
            g = func()
            for k in range(10):
                print('哈哈,我第%s次打印了' % k)
                next(g)
        func2()
        # yield虽然可以实现两个任务来回切换,并且能够保存原来的状态,而且还是一个线程
        # 但是它只能遇到yield才能切换,遇到IO还是阻塞
        ----------------------------------------------------------
        # 计算密集型串行与协程的效率对比
        import time
        def task():
            res = 1
            for i in range(1, 100000):
                res += i
        def task2():
            res = 1
            for i in range(1, 100000):
                res -= i
        
        start_time = time.time()
        task()
        task2()
        print('串行消耗的时间:', time.time() - start_time)
        # 串行消耗的时间: 0.019963502883911133
        
        import time
        def task():
            res = 1
            for i in range(1, 100000):
                res += i
                yield res
        
        def task2():
            g = task()
            res = 1
            for i in range(1, 100000):
                res -= i
                next(g)
        start_time = time.time()
        task2()
        print('协程消耗的时间:', time.time() - start_time)
        # 协程消耗的时间: 0.03234148025512695
        
    • 在自己的程序中(即用户级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到IO阻塞时就切换到另一个任务去计算,这样就保证了该线程能够最大限度的处于就绪状态,即随时都可以被cpu执行的状态,相当于我们在用户级别将自己的IO操作虽大限度的隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像一直在计算,IO比较少,从而更多的将cpu的执行权限分配给我们的线程

    • 协程的本质就是在单线程下,由用户自己控制一个任务遇到IO阻塞就切换到另一个任务去执行,从此来提高效率,为了实现它,我们需要寻找一种可以同时满足下列条件的解决方案:

      • 可以控制多个任务之间的的切换,切换之前将任务的状态保存下来,以便重新运行,可以基于暂停的位置继续执行

      • 可以检测IO操作,在遇到IO操作的情况下才发生切换

    2.协程接介绍

    • 协程:是单线程下的并发,由称为微线程,纤程,英文名Coroutine
    • 一句话说明什么是协程:协会是一种用户级别的轻量级线程,即协程是由用户程序自己调度的
    • python的线程属于内核级别的,即由操作系统控制调度的
    • 对比操作系统线程的切换,用户在单线程内控制协程的切换,
      • 优点:
        • 协程切换的开销更小,属于程序级别的切换, 操作系统完全感知不到,因而更加轻量级
        • 单线程内就可以实现并发的效果,最大限度的利用cpu
        • 修改共享数据不需要加锁
      • 缺点:
        • 协程的本质是单线程下,,无法利用多核,可以试试一个程序开启多个线程,每个进程内开启多个线程,每个线程内开启协程
        • 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
      • 特点:
        • 必须在只有一个线程里实现并发
        • 修改数据不需要加锁
        • 用户程序里自己保存多个控制的上下文栈

    3.Greenletmokuai

    • 如果我们在单线程内有20个任务,要想实现多个任务之间的切换,使用yeild生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send,非常麻烦),而使用greenle模块可以非常简单的实现这20个任务直接的切换

      • # 真正的协程模块就是使用greenlet完成的切换
        from greenlet import greenlet
        
        def eat(name):
            print('%s eat 1' % name)  #2
            g2.switch('猪八戒')        #3
            print('%s eat 2' % name)  #6
            g2.switch()               #7
        def play(name):
            print('%s play 1' % name) #4
            g1.switch()               #5
            print('%s play 2' % name) #8
        
        g1 = greenlet(eat)
        g2 = greenlet(play)
        
        g1.switch('孙悟空') # 可以在第一次switch时传入参数,以后都不需要 
        
    • 任然是没有解决遇到IO自动切换来提升效率的问题

    • 我们想要实现的是:

      • 单线程里的20个任务的代码通常会既有计算操作,又有阻塞操作,完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2,如此才能提高效率,这就用到了Gevent模块

    4.Gevent模块

    • Gevent是一个第三方库,可以轻松通过Fenent实现并发同步编程,在Gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入python的轻量级协程,Greenlet全部运行在主程序操作系统进程的内部,但他们被协作式的调度

    • 用法:

      • 1. g = gevent.spawn(func,*args,**kwargs)创建一个协程对象g,spawn括号内第一个参数是函数名,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数的,spawn是异步提交任务
        2. g.join() 等待g1结束
        3. gevent.joinall([g1])   可以合并多个协程对象的join
        4. g.value  拿到函数的返回值
        
    • 模拟IO阻塞

      • import gevent
        
        def eat(name):
            print(name, '吃1')
            gevent.sleep(2)
            print(name, '吃2')
        
        def play(name):
            print(name, '玩1')
            gevent.sleep(1)
            print(name, '玩2')
        
        g1 = gevent.spawn(eat, '猪八戒')
        g2 = gevent.spawn(play, '孙悟空')
        g1.join()
        g2.join()
        # 或gevent.joinall([g1, g2])
        print('主线程')
        
        # 此时的gevent只能识别gevent.sleep()
        
    • 完整版的Gevent

      • # 想要让gevent识别所有阻塞,必须在最开头加入:
        from gevent import monkey
        monkey.patch_all()
        -------------------------------------------------------
        from gevent import monkey
        monkey.patch_all()
        # 必须写在最上面,这句话后面的所有阻塞全部能够识别了
        import gevent
        import time
        def eat(name):
            print(name, '吃1')
            time.sleep(2)
            print(name, '吃2')
        
        def play(name):
            print(name, '玩1')
            time.sleep(1)
            print(name, '玩2')
        
        g1 = gevent.spawn(eat, '猪八戒')
        g2 = gevent.spawn(play, '孙悟空')
        
        gevent.joinall([g1, g2])
        print('主线程')
        
        # 可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程,虚拟线程,其实都在一个线程里面;进程线程的任务切换是由操作系统自行切换的,自己不能控制;协程是通过自己的程序(代码)来进行切换的,自己能够控制,只有遇到协程模块能够识别的IO操作的时候,程序才会进行任务切换,实现并发效果,如果所有程序都没有IO操作,那么就基本属于串行执行了
        
    • Gevent异步与同步对比

      from gevent import monkey
      monkey.patch_all()
      import gevent
      import time
      def task(id):
          # 此处放一些任务
          time.sleep(0.5)
          print(f'Task {id} done')
      
      def synchronous():   # 同步
          for i in range(10):
              task(i)
      
      def asynchronous():  # 异步
          g_l = [gevent.spawn(task, i) for i in range(10)]
          gevent.joinall(g_l)
      
      if __name__ == '__main__':
          print('同步:')
          synchronous()
          print('异步:')
          asynchronous()
      # 存在阻塞时,异步效率远高于同步
      
    • Gevent应用举例一

      from gevent import monkey
      monkey.patch_all()
      import gevent
      import requests
      import time
      
      def get_page(url):
          print('分析:', url)
          response = requests.get(url)
          if response.status_code == 200:
              print(url, '分析结果:', len(response.text))
      
      start_time = time.time()
      gevent.joinall([
          gevent.spawn(get_page, 'https://www.python.org/'),
          gevent.spawn(get_page, 'https://www.yahoo.com/'),
          gevent.spawn(get_page, 'https://github.com/')
      ])
      print('用时:', time.time() - start_time)
      # 用时: 1.9783523082733154
      -------------------------------------------------------
      start_time = time.time()
      get_page('https://www.python.org/')
      get_page('https://www.yahoo.com/')
      get_page('https://github.com/')
      print('用时:', time.time() - start_time)
      # 用时: 5.849104881286621
      
    • Gevent应用举例二

      # 服务端
      from gevent import monkey
      monkey.patch_all()
      import socket
      import gevent
      # 如果不想用money.patch_all()打补丁,可以用gevent自带的socket
      # from gevent import socket
      # s = socket.socket()
      
      def server(server_ip, port):
          s = socket.socket()
          s.bind((server_ip, port))
          s.listen(5)
          while 1:
              conn, addr = s.accept()
              gevent.spawn(talk, conn, addr)
      
      def talk(conn, addr):
          try:
              while 1:
                  f_c = conn.recv(1024).decode('utf-8')
                  print(f'来自{addr}的消息:{f_c}')
                  conn.send(f_c.upper().encode('utf-8'))
          except Exception:
              print(f'断开与{addr}的连接')
          finally:
              conn.close()
      if __name__ == '__main__':
          server('127.0.0.1', 2019)
      --------------------------------------------------------
      # 客户端
      import socket
      c = socket.socket()
      c.connect(('127.0.0.1', 2019))
      
      while 1:
          t_s = input('>>>:').strip()
          if not t_s:
              continue
          c.send(t_s.encode('utf-8'))
          f_s = c.recv(1024).decode('utf-8')
          print('来自服务器的回复:', f_s)
      --------------------------------------------------------
      # 多线程并发多个客户端
      from threading import Thread
      from threading import current_thread
      import socket
      
      def client(server_ip, port):
          c = socket.socket()
          # 套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,
          # 则大家公用一个套接字对象,那么客户端端口永远一样了
          c.connect((server_ip, port))
      
          count = 0
          while 1:
              c.send(f'{current_thread().name}说:{count}'.encode('utf-8'))
              f_s = c.recv(1024).decode('utf-8')
              print(f_s)
              count += 1
      if __name__ == '__main__':
          for i in range(250):
              t = Thread(target=client, args=('127.0.0.1', 2019))
              t.start()
      
  • 相关阅读:
    VMware的安装
    草根创业专题开篇
    转:分布式和集中式版本控制工具svn,git,mercurial
    sql db to sqlite
    简单办公自动化系统开发与思考1
    sql ef datacontext muti thread problem
    谈云计算,服务器运算的惊天骗局
    ios5.1二货,手贱,解决方案
    阿曹的创业点子1人人快递
    创业点子wifi anywhere
  • 原文地址:https://www.cnblogs.com/W-Y-C/p/11266005.html
Copyright © 2011-2022 走看看