zoukankan      html  css  js  c++  java
  • 事件循环和协程

    前言

    第一次接触异步编程这个概念是在 Python 里面,去年的时候就因为不清楚 Python 中异步编程的实现原理找了很多资料研究,但最后也没有搞得很清楚。

    后来又因为重心逐渐转移到了 Java 上,就暂时放弃了对 Python 中异步原理的探究,直到前段时间,被 JavaScript 的异步坑了一次后, 让我想起了 Python 里面的异步……

    然后我就在网上看到了一篇文章,看了半天后在文章的评论中发现了自己两年前的评论……

    当时我那个心情啊,太曲折了,于是乎决定找个时间在研究一下 Python 中的异步,这便是这篇博客的由来。

    • 注 1:虽然这篇博客源自探究 Python 异步编程实现原理的过程,但是,并不会包含太多 Python 中异步编程的实现原理,因为我还没有搞明白 QAQ
    • 注 2:博客中最后的实现代码存在问题,但一时间找不到解决办法,只好作为一种思路发出来,等待后续研究解决
    • 注 3:哇咔咔咔,通过研究 JavaScript 的异步实现解决了代码的问题 (≧▽≦)/

    Python 中异步编程模型的特殊性

    我先后接触过了 Python、JavaScript1 和 Java 中的异步编程,其中,Python 的是最为特殊的一个,因为它是 单线程异步阻塞 的。

    通常来说,异步编程模型下最小的执行单位是一个个任务(往往就是一个函数),而为了能够在一个任务执行完成后执行另外的操作,又会引入回调任务(函数)。 不同的任务和回调任务的执行,往往又是通过事件循环和任务队列来完成。

    假如将这些东西拆分开来,像 JavaScript 那样,放在不同的线程下面处理,是很容易理解的,因为整体结构足够清晰,但是 Python 不行,因为 GIL 的原因,如果 Python 还是使用多线程的方式来实现异步编程的话, 并不能带来多少性能上的提升,因此,Python 异步中的事件循环、任务队列和任务的执行都在一个线程里面。

    而且,Python 中异步的实现是基于协程的,这不仅使得 Python 中的异步是阻塞的,而且,最小执行单位不再是单个任务,而是单个任务的一部分。

    这就让 Python 中异步的实现变得复杂起来,本来 Python 的源码就不好读,好家伙,现在更不好读了,同时又因为异步编程往往都和异步 I/O 挂钩,刷的一下,源码中一堆和异步 I/O 相关的代码……

    这让我明白了,想快速搞明白 Python 异步是咋回事是不可能的,毕竟,我要做的是通过阅读源码倒推作者的思路,这很难!!!

    因此,我换了一个思路,我先自己用协程实现一个简单的事件循环,在慢慢去读 Python 的源码,总可以了吧!

    这就是为啥这篇博客说是在探究 Python 异步编程的实现原理,但是标题连 Python 这个单词都没有的原因。

    协程的基本认识

    Python 中的协程是通过生成器来实现的,但是,基本上所有博客将协程的时候都会说的一句话,协程不等于生成器,它们只是长得像:

    def grep(pattern):
        """
        >>> g = grep("python")
        >>> g.send(None)
        Looking for python
        >>> g.send("Yeah, but no, but yeah, but no")
        >>> g.send("A series of tubes")
        >>> g.send("python generators rock!")
        python generators rock!
        """
        print("Looking for %s" % pattern)
        while True:
            line = yield
            if pattern in line:
                print(line)
    

    上面这个协程不断接收来自 send 方法的输入,在经过判断后进行输出,假如把它当做生成器使用的话,那么你只能得到无数的 None 值。

    本质上,在低版本的 Python 中生成器和协程没有区别,就看你怎么用,关键就在于协程 消费 值,而生成器 生成 值,当然了,高版本的 Python 对协程提供了更多的支持, 使得它们不再一样,但是,这篇博客里面,所有的协程都通过生成器实现。

    因此,我们需要关注后面会用到的几个特性:

    1. yield 的左值会接收来自 send 方法的输入,但是协程在第一次运行时还没到达 yield 语句处,因此无法传递参数,只能通过 None 值来调用协程:

      def coroutine():
          while True:
              val = yield
              print(val)
      

      执行输出:

      In [9]: coro = coroutine()
      
      In [10]: coro.send(None)
      
      In [11]: coro.send(1)
      1
      
      In [12]: coro = coroutine()
      
      In [13]: coro.send(1)
      ---------------------------------------------------------------------------
      TypeError                                 Traceback (most recent call last)
      <ipython-input-13-e272bd1527da> in <module>()
      ----> 1 coro.send(1)
      
      TypeError: can't send non-None value to a just-started generator
      
    2. 可以通过 yield from 语句递归调用协程,效果如下:

      def coroutine():
          for i in range(3):
              val = yield
              print('coroutine %s' % val)
      
      def invoker():
          yield from coroutine()
      

      执行输出(就是会报异常):

      In [29]: coro = invoker()
      
      In [30]: coro.send(None)
      
      In [31]: coro.send(1)
      coroutine 1
      
      In [32]: coro.send(2)
      coroutine 2
      
      In [33]: coro.send(3)
      coroutine 3
      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-33-8e657389bc11> in <module>()
      ----> 1 coro.send(3)
      
      StopIteration:
      
    3. 协程可以有返回值,保存在 StopIteration 异常中,作为 yield from 的左值时可以直接接收:

      def coroutine():
          val = yield
          return 'coroutine %s' % val
      

      执行输出:

      In [42]: coro = coroutine()
      
      In [43]: coro.send(None)
      
      In [44]: coro.send(1)
      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-44-e272bd1527da> in <module>()
      ----> 1 coro.send(1)
      
      StopIteration: coroutine 1
      

    简单事件循环的实现

    本来想将 Future & TaskEventLoop 分成两节的,结果 TaskEventLoop 耦合在了一起,只好合在一起了,下面是代码:

    class EventLoop:
        def __init__(self):
            self._ready = []
    
        def call_soon(self, task):
            self._ready.append(task)
    
        def run_forever(self, coro):
            root = Task(coro, self)
            while self._ready:
                task = self._ready.pop(0)
                task.step(Future())
            return root.result
    
    class Future:
        def __init__(self):
            # 通过 result 来保存协程的返回值
            self.result = None
            # 通过 _callbacks 来保存回调函数
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            # try suppression bug
            self.result = self.result or result
            # 执行完成后将自身作为参数传递给回调函数
            for callback in self._callbacks:
                callback(self)
    
    class Task(Future):
        # 协程类型
        coroutine = type((i for i in range(0)))
    
        def __init__(self, coro, loop):
            super().__init__()
            self.coro = coro
            self.loop = loop
            # 将自身加入任务队列
            self.loop.call_soon(self)
    
        def step(self, future):
            try:
                result = self.coro.send(future.result)
            except StopIteration as exc:
                # 触发 StopIteration 异常时说明协程已经执行结束
                self.set_result(exc.value)
            else:
                # 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
                if type(result) == self.coroutine:
                    result = Task(result, self.loop)
                    result.add_done_callback(self.step)
                # 协程返回任务,将 self.step 注册为回调函数等待唤醒
                elif isinstance(result, Task):
                    # there is a bug
                    result.add_done_callback(self.step)
                    self.loop.call_soon(result)
                # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                else:
                    self.loop.call_soon(self)
    

    一开始实现的时候是想用一个外部的事件循环来操作,不需要 Task 持有事件循环,但是实现过程中发现那样存在一点问题,便学着 Python 中的方式将事件循环传递给 Task 操作, 但这里的实现是依然存在问题。

    在只存在协程和同序返回 Task 的情况下测试没有问题,但是当存在异序返回 Task 的情况下问题就出现了,下面的测试代码便是异序返回,我通过在 set_result 中判断 result 的方式暂时抑制了该异常, 但是,这是治标不治本的方式。如果有大佬知道方案,请务必告诉我 QAQ

    测试代码:

    _loop = EventLoop()
    
    def main():
        ta = Task(say_hello(), _loop)
        tb = Task(say_world(), _loop)
    
        b = yield tb
        a = yield ta
    
        return a + b
    
    def say_world():
        print('world')
        yield
        return 'world'
    
    def say_hello():
        print('hello')
        yield from say_other()
        return 'hello '
    
    
    def say_other():
        print('other')
        yield
    
    print(_loop.run_forever(main()))
    

    输出:

    hello
    other
    world
    hello world
    

    简单事件循环的实现 · 修

    发现在 Python 这边碰壁后,我就想到了去 JavaScript 这边取取经,然后,我就在阮老师的 Generator 函数的异步应用 教程里面找到了自己代码的问题,并成功解决 (≧▽≦)/

    这是新的代码:

    class EventLoop:
        def __init__(self):
            self._ready = []
    
        def call_soon(self, task):
            self._ready.append(task)
    
        def run_forever(self, coro):
            root = Task(coro, self)
            while self._ready:
                task = self._ready.pop(0)
                task.step(Future())
            return root.result
    
    
    class Future:
        def __init__(self):
            # 自身是否执行完成
            self.done = False
            # 通过 result 来保存协程的返回值
            self.result = None
            # 通过 _callbacks 来保存回调函数
            self._callbacks = []
    
        def add_done_callback(self, callback):
            if not self.done:
                self._callbacks.append(callback)
            else:
                callback(self)
    
        def set_result(self, result):
            self.result = result
            self.done = True
            # 执行完成后将自身作为参数传递给回调函数
            for callback in self._callbacks:
                callback(self)
    
    class Task(Future):
        # 协程类型
        coroutine = type((i for i in range(0)))
    
        def __init__(self, coro, loop):
            super().__init__()
            self.coro = coro
            self.loop = loop
            # 将自身加入任务队列
            self.loop.call_soon(self)
    
        def step(self, future):
            try:
                result = self.coro.send(future.result)
            except StopIteration as exc:
                # 触发 StopIteration 异常时说明协程已经执行结束
                self.set_result(exc.value)
            else:
                # 协程返回协程,将其转换为 Task
                if type(result) == self.coroutine:
                    result = Task(result, self.loop)
                # 将 self.step 注册为 task 回调函数等待唤醒
                if isinstance(result, Task):
                    result.add_done_callback(self.step)
                # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                else:
                    self.loop.call_soon(self)
    

    主要改动为:

     class Future:
         def __init__(self):
    +        # 自身是否执行完成
    +        self.done = False
             # 通过 result 来保存协程的返回值
             self.result = None
             # 通过 _callbacks 来保存回调函数
             self._callbacks = []
    
    -    def add_done_callback(self, fn):
    -        self._callbacks.append(fn)
    +    def add_done_callback(self, callback):
    +        if not self.done:
    +            self._callbacks.append(callback)
    +        else:
    +            callback(self)
    
         def set_result(self, result):
    -        # try suppression bug
    -        self.result = self.result or result
    +        self.result = result
    +        self.done = True
             # 执行完成后将自身作为参数传递给回调函数
             for callback in self._callbacks:
                 callback(self)
    @@ -47,15 +53,12 @@
                 # 触发 StopIteration 异常时说明协程已经执行结束
                 self.set_result(exc.value)
             else:
    -            # 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
    +            # 协程返回协程,将其转换为 Task
                 if type(result) == self.coroutine:
                     result = Task(result, self.loop)
    +            # 将 self.step 注册为 task 回调函数等待唤醒
    +            if isinstance(result, Task):
                     result.add_done_callback(self.step)
    -            # 协程返回任务,将 self.step 注册为回调函数等待唤醒
    -            elif isinstance(result, Task):
    -                # there is a bug
    -                result.add_done_callback(self.step)
    -                self.loop.call_soon(result)
                 # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                 else:
                     self.loop.call_soon(self)
    

    问题原因为:

    1. 创建 Task 时任务会被立即加入事件循环,在任务执行时发现返回的是一个 Task 那么就会将唤醒自身的回调注册到该任务的回调函数中
    2. 如果这时任务已经执行完成了,那么,按照我之前的逻辑,这次回调函数的注册是无效的
    3. 结果就是,我发现回调函数没有执行,就只好在注册了回调函数后仍然将自身加入事件循环,导致重复调用

    现在好了,加个判断,加入注册回调函数时自身已经执行完成了,就立即执行回调函数,这样就不用担心出现问题了。

    优化:

    1. 现在的执行模式的话可能导致递归调用,更好的方式就是改造一下,在注册回调函数时发现自身已经执行完成,那么就把回调函数封装为 Task 注册到事件循环
    2. 现在的事件循环其实就只是个简单的同步协程执行器,而异步操作关注的更多的往往是 I/O 操作,因此可以考虑通过 Selector 的方式注册异步 I/O 回调函数, 这个到时候可以去研究一下 Python 里面的实现

    结语

    折腾了一圈后结果还是只能得到一份存在问题的代码,和去年的时候差不多,但比去年好的是,多少多了一点思路。

    但是,还是差得好远啊……

    参考链接

    Footnotes

    1 ES6 中 async/await 的原理还没有怎么了解过,因此这里的异步只包括 ajax 这类异步操作

  • 相关阅读:
    Elasticsearch倒排索引结构
    Redis(二)冰叔带你了解Redis-哨兵模式和高可用集群解析
    jenkins构建配置
    jenkins配置email
    jenkins配置maven
    jenkins配置java
    jenkins配置git
    centos安装jenkins
    centos安装tomcat
    centos安装jdk
  • 原文地址:https://www.cnblogs.com/rgbit/p/12210530.html
Copyright © 2011-2022 走看看