zoukankan      html  css  js  c++  java
  • 事件循环和协程

    前言

    第一次接触异步编程这个概念是在 Python 里面,去年的时候就因为不清楚 Python 中异步编程的实现原理找了很多资料研究,但最后也没有搞得很清楚。

    后来又因为重心逐渐转移到了 Java 上,就暂时放弃了对 Python 中异步原理的探究,直到前段时间,被 JavaScript 的异步坑了一次后, 让我想起了 Python 里面的异步……

    然后我就在网上看到了一篇文章,看了半天后在文章的评论中发现了自己两年前的评论……

    当时我那个心情啊,太曲折了,于是乎决定找个时间在研究一下 Python 中的异步,这便是这篇博客的由来。

    • 注 1:虽然这篇博客源自探究 Python 异步编程实现原理的过程,但是,并不会包含太多 Python 中异步编程的实现原理,因为我还没有搞明白 QAQ
    • 注 2:博客中最后的实现代码存在问题,但一时间找不到解决办法,只好作为一种思路发出来,等待后续研究解决
    • 注 3:哇咔咔咔,通过研究 JavaScript 的异步实现解决了代码的问题 (≧▽≦)/

    Python 中异步编程模型的特殊性

    我先后接触过了 Python、JavaScript1 和 Java 中的异步编程,其中,Python 的是最为特殊的一个,因为它是 单线程异步阻塞 的。

    通常来说,异步编程模型下最小的执行单位是一个个任务(往往就是一个函数),而为了能够在一个任务执行完成后执行另外的操作,又会引入回调任务(函数)。 不同的任务和回调任务的执行,往往又是通过事件循环和任务队列来完成。

    假如将这些东西拆分开来,像 JavaScript 那样,放在不同的线程下面处理,是很容易理解的,因为整体结构足够清晰,但是 Python 不行,因为 GIL 的原因,如果 Python 还是使用多线程的方式来实现异步编程的话, 并不能带来多少性能上的提升,因此,Python 异步中的事件循环、任务队列和任务的执行都在一个线程里面。

    而且,Python 中异步的实现是基于协程的,这不仅使得 Python 中的异步是阻塞的,而且,最小执行单位不再是单个任务,而是单个任务的一部分。

    这就让 Python 中异步的实现变得复杂起来,本来 Python 的源码就不好读,好家伙,现在更不好读了,同时又因为异步编程往往都和异步 I/O 挂钩,刷的一下,源码中一堆和异步 I/O 相关的代码……

    这让我明白了,想快速搞明白 Python 异步是咋回事是不可能的,毕竟,我要做的是通过阅读源码倒推作者的思路,这很难!!!

    因此,我换了一个思路,我先自己用协程实现一个简单的事件循环,在慢慢去读 Python 的源码,总可以了吧!

    这就是为啥这篇博客说是在探究 Python 异步编程的实现原理,但是标题连 Python 这个单词都没有的原因。

    协程的基本认识

    Python 中的协程是通过生成器来实现的,但是,基本上所有博客将协程的时候都会说的一句话,协程不等于生成器,它们只是长得像:

    def grep(pattern):
        """
        >>> g = grep("python")
        >>> g.send(None)
        Looking for python
        >>> g.send("Yeah, but no, but yeah, but no")
        >>> g.send("A series of tubes")
        >>> g.send("python generators rock!")
        python generators rock!
        """
        print("Looking for %s" % pattern)
        while True:
            line = yield
            if pattern in line:
                print(line)
    

    上面这个协程不断接收来自 send 方法的输入,在经过判断后进行输出,假如把它当做生成器使用的话,那么你只能得到无数的 None 值。

    本质上,在低版本的 Python 中生成器和协程没有区别,就看你怎么用,关键就在于协程 消费 值,而生成器 生成 值,当然了,高版本的 Python 对协程提供了更多的支持, 使得它们不再一样,但是,这篇博客里面,所有的协程都通过生成器实现。

    因此,我们需要关注后面会用到的几个特性:

    1. yield 的左值会接收来自 send 方法的输入,但是协程在第一次运行时还没到达 yield 语句处,因此无法传递参数,只能通过 None 值来调用协程:

      def coroutine():
          while True:
              val = yield
              print(val)
      

      执行输出:

      In [9]: coro = coroutine()
      
      In [10]: coro.send(None)
      
      In [11]: coro.send(1)
      1
      
      In [12]: coro = coroutine()
      
      In [13]: coro.send(1)
      ---------------------------------------------------------------------------
      TypeError                                 Traceback (most recent call last)
      <ipython-input-13-e272bd1527da> in <module>()
      ----> 1 coro.send(1)
      
      TypeError: can't send non-None value to a just-started generator
      
    2. 可以通过 yield from 语句递归调用协程,效果如下:

      def coroutine():
          for i in range(3):
              val = yield
              print('coroutine %s' % val)
      
      def invoker():
          yield from coroutine()
      

      执行输出(就是会报异常):

      In [29]: coro = invoker()
      
      In [30]: coro.send(None)
      
      In [31]: coro.send(1)
      coroutine 1
      
      In [32]: coro.send(2)
      coroutine 2
      
      In [33]: coro.send(3)
      coroutine 3
      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-33-8e657389bc11> in <module>()
      ----> 1 coro.send(3)
      
      StopIteration:
      
    3. 协程可以有返回值,保存在 StopIteration 异常中,作为 yield from 的左值时可以直接接收:

      def coroutine():
          val = yield
          return 'coroutine %s' % val
      

      执行输出:

      In [42]: coro = coroutine()
      
      In [43]: coro.send(None)
      
      In [44]: coro.send(1)
      ---------------------------------------------------------------------------
      StopIteration                             Traceback (most recent call last)
      <ipython-input-44-e272bd1527da> in <module>()
      ----> 1 coro.send(1)
      
      StopIteration: coroutine 1
      

    简单事件循环的实现

    本来想将 Future & TaskEventLoop 分成两节的,结果 TaskEventLoop 耦合在了一起,只好合在一起了,下面是代码:

    class EventLoop:
        def __init__(self):
            self._ready = []
    
        def call_soon(self, task):
            self._ready.append(task)
    
        def run_forever(self, coro):
            root = Task(coro, self)
            while self._ready:
                task = self._ready.pop(0)
                task.step(Future())
            return root.result
    
    class Future:
        def __init__(self):
            # 通过 result 来保存协程的返回值
            self.result = None
            # 通过 _callbacks 来保存回调函数
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            # try suppression bug
            self.result = self.result or result
            # 执行完成后将自身作为参数传递给回调函数
            for callback in self._callbacks:
                callback(self)
    
    class Task(Future):
        # 协程类型
        coroutine = type((i for i in range(0)))
    
        def __init__(self, coro, loop):
            super().__init__()
            self.coro = coro
            self.loop = loop
            # 将自身加入任务队列
            self.loop.call_soon(self)
    
        def step(self, future):
            try:
                result = self.coro.send(future.result)
            except StopIteration as exc:
                # 触发 StopIteration 异常时说明协程已经执行结束
                self.set_result(exc.value)
            else:
                # 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
                if type(result) == self.coroutine:
                    result = Task(result, self.loop)
                    result.add_done_callback(self.step)
                # 协程返回任务,将 self.step 注册为回调函数等待唤醒
                elif isinstance(result, Task):
                    # there is a bug
                    result.add_done_callback(self.step)
                    self.loop.call_soon(result)
                # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                else:
                    self.loop.call_soon(self)
    

    一开始实现的时候是想用一个外部的事件循环来操作,不需要 Task 持有事件循环,但是实现过程中发现那样存在一点问题,便学着 Python 中的方式将事件循环传递给 Task 操作, 但这里的实现是依然存在问题。

    在只存在协程和同序返回 Task 的情况下测试没有问题,但是当存在异序返回 Task 的情况下问题就出现了,下面的测试代码便是异序返回,我通过在 set_result 中判断 result 的方式暂时抑制了该异常, 但是,这是治标不治本的方式。如果有大佬知道方案,请务必告诉我 QAQ

    测试代码:

    _loop = EventLoop()
    
    def main():
        ta = Task(say_hello(), _loop)
        tb = Task(say_world(), _loop)
    
        b = yield tb
        a = yield ta
    
        return a + b
    
    def say_world():
        print('world')
        yield
        return 'world'
    
    def say_hello():
        print('hello')
        yield from say_other()
        return 'hello '
    
    
    def say_other():
        print('other')
        yield
    
    print(_loop.run_forever(main()))
    

    输出:

    hello
    other
    world
    hello world
    

    简单事件循环的实现 · 修

    发现在 Python 这边碰壁后,我就想到了去 JavaScript 这边取取经,然后,我就在阮老师的 Generator 函数的异步应用 教程里面找到了自己代码的问题,并成功解决 (≧▽≦)/

    这是新的代码:

    class EventLoop:
        def __init__(self):
            self._ready = []
    
        def call_soon(self, task):
            self._ready.append(task)
    
        def run_forever(self, coro):
            root = Task(coro, self)
            while self._ready:
                task = self._ready.pop(0)
                task.step(Future())
            return root.result
    
    
    class Future:
        def __init__(self):
            # 自身是否执行完成
            self.done = False
            # 通过 result 来保存协程的返回值
            self.result = None
            # 通过 _callbacks 来保存回调函数
            self._callbacks = []
    
        def add_done_callback(self, callback):
            if not self.done:
                self._callbacks.append(callback)
            else:
                callback(self)
    
        def set_result(self, result):
            self.result = result
            self.done = True
            # 执行完成后将自身作为参数传递给回调函数
            for callback in self._callbacks:
                callback(self)
    
    class Task(Future):
        # 协程类型
        coroutine = type((i for i in range(0)))
    
        def __init__(self, coro, loop):
            super().__init__()
            self.coro = coro
            self.loop = loop
            # 将自身加入任务队列
            self.loop.call_soon(self)
    
        def step(self, future):
            try:
                result = self.coro.send(future.result)
            except StopIteration as exc:
                # 触发 StopIteration 异常时说明协程已经执行结束
                self.set_result(exc.value)
            else:
                # 协程返回协程,将其转换为 Task
                if type(result) == self.coroutine:
                    result = Task(result, self.loop)
                # 将 self.step 注册为 task 回调函数等待唤醒
                if isinstance(result, Task):
                    result.add_done_callback(self.step)
                # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                else:
                    self.loop.call_soon(self)
    

    主要改动为:

     class Future:
         def __init__(self):
    +        # 自身是否执行完成
    +        self.done = False
             # 通过 result 来保存协程的返回值
             self.result = None
             # 通过 _callbacks 来保存回调函数
             self._callbacks = []
    
    -    def add_done_callback(self, fn):
    -        self._callbacks.append(fn)
    +    def add_done_callback(self, callback):
    +        if not self.done:
    +            self._callbacks.append(callback)
    +        else:
    +            callback(self)
    
         def set_result(self, result):
    -        # try suppression bug
    -        self.result = self.result or result
    +        self.result = result
    +        self.done = True
             # 执行完成后将自身作为参数传递给回调函数
             for callback in self._callbacks:
                 callback(self)
    @@ -47,15 +53,12 @@
                 # 触发 StopIteration 异常时说明协程已经执行结束
                 self.set_result(exc.value)
             else:
    -            # 协程返回协程,将其转换为 Task 后将 self.step 注册为期回调函数等待唤醒
    +            # 协程返回协程,将其转换为 Task
                 if type(result) == self.coroutine:
                     result = Task(result, self.loop)
    +            # 将 self.step 注册为 task 回调函数等待唤醒
    +            if isinstance(result, Task):
                     result.add_done_callback(self.step)
    -            # 协程返回任务,将 self.step 注册为回调函数等待唤醒
    -            elif isinstance(result, Task):
    -                # there is a bug
    -                result.add_done_callback(self.step)
    -                self.loop.call_soon(result)
                 # 协程返回其他东西,不受理,直接将 self 再次放入任务队列
                 else:
                     self.loop.call_soon(self)
    

    问题原因为:

    1. 创建 Task 时任务会被立即加入事件循环,在任务执行时发现返回的是一个 Task 那么就会将唤醒自身的回调注册到该任务的回调函数中
    2. 如果这时任务已经执行完成了,那么,按照我之前的逻辑,这次回调函数的注册是无效的
    3. 结果就是,我发现回调函数没有执行,就只好在注册了回调函数后仍然将自身加入事件循环,导致重复调用

    现在好了,加个判断,加入注册回调函数时自身已经执行完成了,就立即执行回调函数,这样就不用担心出现问题了。

    优化:

    1. 现在的执行模式的话可能导致递归调用,更好的方式就是改造一下,在注册回调函数时发现自身已经执行完成,那么就把回调函数封装为 Task 注册到事件循环
    2. 现在的事件循环其实就只是个简单的同步协程执行器,而异步操作关注的更多的往往是 I/O 操作,因此可以考虑通过 Selector 的方式注册异步 I/O 回调函数, 这个到时候可以去研究一下 Python 里面的实现

    结语

    折腾了一圈后结果还是只能得到一份存在问题的代码,和去年的时候差不多,但比去年好的是,多少多了一点思路。

    但是,还是差得好远啊……

    参考链接

    Footnotes

    1 ES6 中 async/await 的原理还没有怎么了解过,因此这里的异步只包括 ajax 这类异步操作

  • 相关阅读:
    LeetCode 769. Max Chunks To Make Sorted
    LeetCode 845. Longest Mountain in Array
    LeetCode 1059. All Paths from Source Lead to Destination
    1129. Shortest Path with Alternating Colors
    LeetCode 785. Is Graph Bipartite?
    LeetCode 802. Find Eventual Safe States
    LeetCode 1043. Partition Array for Maximum Sum
    LeetCode 841. Keys and Rooms
    LeetCode 1061. Lexicographically Smallest Equivalent String
    LeetCode 1102. Path With Maximum Minimum Value
  • 原文地址:https://www.cnblogs.com/rgbit/p/12210530.html
Copyright © 2011-2022 走看看