zoukankan      html  css  js  c++  java
  • 深入tornado中的协程

    tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费。

    tornado中的I/O多路复用前面已经讲过了。本文不做详细解释。

    来看一下tornado中的协程模块:tornado.gen:

    tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步。

    先来说一下tornado.gen.coroutine的实现思路:

      我们知道generator中的yield语句可以使函数暂停执行,而send()方法则可以恢复函数的执行。

      tornado将那些异步操作放置到yield语句后,当这些异步操作完成后,tornado会将结果send()至generator中恢复函数执行。

    在tornado的官方文档中有这么一句话:

    Most asynchronous functions in Tornado return a Future; yielding this object returns its result.

    就是说:

      在tornado中大多数的异步操作返回一个Future对象

      yield Future对象 会返回该异步操作的结果,这句话的意思就是说 假如 ret = yield some_future_obj 当some_future_obj所对应的异步操作完成后会自动的将该异步操作的结果赋值给 ret

    那么,Future对象到底是什么?

    一  Future对象

    先来说说Future对象:

    Future对象可以概括为: 一个异步操作的占位符,当然这个占位符有些特殊,它特殊在:

      1 这个占位符是一个对象

      2 这个对象包含了很多属性,包括_result 以及 _callbacks,分别用来存储异步操作的结果以及回调函数

      3 这个对象包含了很多方法,比如添加回调函数,设置异步操作结果等。

      4 当这个对象对应的异步操作完成后,该对象会被set_done,然后遍历并运行_callbacks中的回调函数

    来看一下Future的简化版

    class Future(object):
        '''
            Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
            如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
        '''
        def __init__(self):
            self._result = None    # 执行的结果
            self._callbacks = []    # 用来保存该future对象的回调函数
    
        def result(self, timeout=None):
            # 如果操作成功,返回结果。如果失败则抛出异常
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def add_done_callback(self, fn):
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            self._result = result
            self._set_done()
    
        def _set_done(self):
            # 执行结束(成功)后的操作。
            self._done = True
            for cb in self._callbacks:
                try:
                    cb(self)
                except Exception:
                    app_log.exception('Exception in callback %r for %r', cb, self)
            self._callbacks = None

    完整源码:

    class Future(object):
        '''
            Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
        '''
        def __init__(self):
            self._done = False  # 是否执行完成
            self._result = None    # 执行的结果
            self._exc_info = None    # 执行的异常信息
    
            self._log_traceback = False   # Used for Python >= 3.4
            self._tb_logger = None        # Used for Python <= 3.3
    
            self._callbacks = []    # 用来保存该future对象的回调函数
    
        # Implement the Python 3.5 Awaitable protocol if possible
        # (we can't use return and yield together until py33).
        if sys.version_info >= (3, 3):
            exec(textwrap.dedent("""
            def __await__(self):
                return (yield self)
            """))
        else:
            # Py2-compatible version for use with cython.
            def __await__(self):
                result = yield self
                # StopIteration doesn't take args before py33,
                # but Cython recognizes the args tuple.
                e = StopIteration()
                e.args = (result,)
                raise e
    
        def cancel(self):
            """Cancel the operation, if possible. 如果可能的话取消操作
            tornado对象不支持取消操作,所以总是返回False
            """
            return False
    
        def cancelled(self):
            # 同上
            return False
    
        def running(self):
            """Returns True if this operation is currently running."""
            return not self._done
    
        def done(self):
            """Returns True if the future has finished running."""
            return self._done
    
        def _clear_tb_log(self):
            self._log_traceback = False
            if self._tb_logger is not None:
                self._tb_logger.clear()
                self._tb_logger = None
    
        def result(self, timeout=None):
            """If the operation succeeded, return its result.  If it failed,
            re-raise its exception. 如果操作成功,返回结果。如果失败则抛出异常
    
            This method takes a ``timeout`` argument for compatibility with
            `concurrent.futures.Future` but it is an error to call it
            before the `Future` is done, so the ``timeout`` is never used.
            """
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def exception(self, timeout=None):
            """If the operation raised an exception, return the `Exception`
            object.  Otherwise returns None.
    
            This method takes a ``timeout`` argument for compatibility with
            `concurrent.futures.Future` but it is an error to call it
            before the `Future` is done, so the ``timeout`` is never used.
            """
            self._clear_tb_log()
            if self._exc_info is not None:
                return self._exc_info[1]
            else:
                self._check_done()
                return None
    
        def add_done_callback(self, fn):
            """Attaches the given callback to the `Future`. 将callback附加到
    
            It will be invoked with the `Future` as its argument when the Future
            has finished running and its result is available.  In Tornado
            consider using `.IOLoop.add_future` instead of calling
            `add_done_callback` directly.
            """
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            """Sets the result of a ``Future``. 将 result 设置为该future对象的结果
    
            It is undefined to call any of the ``set`` methods more than once
            on the same object.
            """
            self._result = result
            self._set_done()
    
        def set_exception(self, exception):
            """Sets the exception of a ``Future.``"""
            self.set_exc_info(
                (exception.__class__,
                 exception,
                 getattr(exception, '__traceback__', None)))
    
        def exc_info(self):
            """Returns a tuple in the same format as `sys.exc_info` or None.
    
            .. versionadded:: 4.0
            """
            self._clear_tb_log()
            return self._exc_info
    
        def set_exc_info(self, exc_info):
            """Sets the exception information of a ``Future.``
    
            Preserves tracebacks on Python 2.
    
            .. versionadded:: 4.0
            """
            self._exc_info = exc_info
            self._log_traceback = True
            if not _GC_CYCLE_FINALIZERS:
                self._tb_logger = _TracebackLogger(exc_info)
    
            try:
                self._set_done()
            finally:
                # Activate the logger after all callbacks have had a
                # chance to call result() or exception().
                if self._log_traceback and self._tb_logger is not None:
                    self._tb_logger.activate()
            self._exc_info = exc_info
    
        def _check_done(self):
            if not self._done:
                raise Exception("DummyFuture does not support blocking for results")
    
        def _set_done(self):
            # 执行结束(成功)后的操作。
            self._done = True
            for cb in self._callbacks:
                try:
                    cb(self)
                except Exception:
                    app_log.exception('Exception in callback %r for %r', cb, self)
            self._callbacks = None
    
        # On Python 3.3 or older, objects with a destructor part of a reference
        # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to
        # the PEP 442.
        if _GC_CYCLE_FINALIZERS:
            def __del__(self):
                if not self._log_traceback:
                    # set_exception() was not called, or result() or exception()
                    # has consumed the exception
                    return
    
                tb = traceback.format_exception(*self._exc_info)
    
                app_log.error('Future %r exception was never retrieved: %s',
                              self, ''.join(tb).rstrip())
    Future源码

    二  gen.coroutine装饰器

    tornado中的协程是通过tornado.gen中的coroutine装饰器实现的:

    def coroutine(func, replace_callback=True):
        return _make_coroutine_wrapper(func, replace_callback=True)
    _make_coroutine_wrapper :
    def _make_coroutine_wrapper(func, replace_callback):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            '''
                大体过程:
                future = TracebackFuture()  
                result = func(*args, **kwargs)
                if isinstance(result, GeneratorType):
                    yielded = next(result)
                    Runner(result, future, yielded)
                return future
            '''
            future = TracebackFuture()                   # TracebackFuture = Future
    
            if replace_callback and 'callback' in kwargs:
                callback = kwargs.pop('callback')
                IOLoop.current().add_future(future, lambda future: callback(future.result()))
    
            try:
                result = func(*args, **kwargs)           # 执行func,若func中包含yield,则返回一个generator对象
            except (Return, StopIteration) as e:
                result = _value_from_stopiteration(e)
            except Exception:
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, GeneratorType):      # 判断其是否为generator对象
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        yielded = next(result)            # 第一次执行
                        if stack_context._state.contexts is not orig_stack_contexts:
                            yielded = TracebackFuture()
                            yielded.set_exception(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        future.set_result(_value_from_stopiteration(e))
                    except Exception:
                        future.set_exc_info(sys.exc_info())
                    else:
                        Runner(result, future, yielded)  # Runner(result, future, yield)
                    try:
                        return future            
                    finally:
                        future = None
            future.set_result(result)
            return future
        return wrapper

    先来看一下大体过程:

      1  首先生成一个Future对象

      2  运行该被装饰函数并将结果赋值给result。 在这里因为tornado的'异步'实现是基于generator的,所以一般情况下 result是一个generator对象

      3  yielded = next(result)  执行到被装饰函数的第一次yield,将结果赋值给yielded。一般情况下,yielded很大情况下是一个Future对象。

      4  Runner(result, future, yielded)

      5  return future

    除了第4步以外其他都很好理解,所以来了解一下第四步Runner()干了些啥:

    三  Runner()类

    1 为什么要有Runner()?或者说Runner()的作用是什么?

    Runner()可以自动的将异步操作的结果send()至生成器中止的地方

    tornado的协程或者说异步是基于generator实现的,generator较为常用的有两个方法:send() next() ,关于这两个方法的流程分析在这

    很多情况下会有generator的嵌套。比如说经常会yield 一个generator。当A生成器yield B生成器时,分两步:

      1 我们首先中止A的执行转而执行B

      2 当B执行完成后,我们需要将B的结果send()至A中止的地方,继续执行A

    Runner()主要就是来做这些的,也就是控制生成器的执行与中止,并在合适的情况下使用send()方法同时传入B生成器的结果唤醒A生成器。

    来看一个简单例子:

    def run():
        print('start running')
        yield 2     # 跑步用时2小时
    
    def eat():
        print('start eating')
        yield 1     # 吃饭用时1小时
    
    def time():
        run_time = yield run()
        eat_time = yield eat()
        print(run_time+eat_time)
    
    def Runner(gen):
        r = next(gen)
        return r
    
    t = time()
    try:
        action = t.send(Runner(next(t)))
        t.send(Runner(action))
    except StopIteration:
        pass
    View Code

    上例中的Runner()仅仅完成了第一步,我们还需要手动的执行第二步,而tornado的gen的Runner()则做了全套奥!

    2 剖析Runner()

    在Runner()中主要有三个方法__init__  handle_yield  run:

    class Runner(object):
        def __init__(self, gen, result_future, first_yielded):
            self.gen = gen                        # 一个generator对象
            self.result_future = result_future    # 一个Future对象
            self.future = _null_future            # 一个刚初始化的Future对象  _null_future = Future(); _null_future.set_result(None)
            self.yield_point = None
            self.pending_callbacks = None
            self.results = None
            self.running = False
            self.finished = False
            self.had_exception = False
            self.io_loop = IOLoop.current()
            self.stack_context_deactivate = None
            if self.handle_yield(first_yielded):
                self.run()
        
        ………… 部分方法省略
        def run(self):
            """Starts or resumes the generator, running until it reaches a
            yield point that is not ready.
            """
            if self.running or self.finished:
                return
            try:
                self.running = True
                while True:
                    future = self.future
                    if not future.done():
                        return
                    self.future = None
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        exc_info = None
    
                        try:
                            value = future.result()
                        except Exception:
                            self.had_exception = True
                            exc_info = sys.exc_info()
    
                        if exc_info is not None:
                            yielded = self.gen.throw(*exc_info)
                            exc_info = None
                        else:
                            yielded = self.gen.send(value)
    
                        if stack_context._state.contexts is not orig_stack_contexts:
                            self.gen.throw(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        if self.pending_callbacks and not self.had_exception:
                            # If we ran cleanly without waiting on all callbacks
                            # raise an error (really more of a warning).  If we
                            # had an exception then some callbacks may have been
                            # orphaned, so skip the check in that case.
                            raise LeakedCallbackError(
                                "finished without waiting for callbacks %r" %
                                self.pending_callbacks)
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    except Exception:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_exc_info(sys.exc_info())
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    if not self.handle_yield(yielded):
                        return
            finally:
                self.running = False
    
        def handle_yield(self, yielded):
            if _contains_yieldpoint(yielded):    # 检查其中是否包含YieldPoint
                yielded = multi(yielded)
    
            if isinstance(yielded, YieldPoint):        # Base class for objects that may be yielded from the generator
                self.future = TracebackFuture()        # 一个刚刚初始化的Future对象
    
                def start_yield_point():
                    try:
                        yielded.start(self)
                        if yielded.is_ready():
                            self.future.set_result(yielded.get_result())
                        else:
                            self.yield_point = yielded
                    except Exception:
                        self.future = TracebackFuture()
                        self.future.set_exc_info(sys.exc_info())
    
                if self.stack_context_deactivate is None:
                    with stack_context.ExceptionStackContext(self.handle_exception) as deactivate:
                        self.stack_context_deactivate = deactivate
                        
                        def cb():
                            start_yield_point()
                            self.run()
                        self.io_loop.add_callback(cb)
                        return False
                else:
                    start_yield_point()
            else:
                try:
                    self.future = convert_yielded(yielded)
                except BadYieldError:
                    self.future = TracebackFuture()
                    self.future.set_exc_info(sys.exc_info())
    
            if not self.future.done() or self.future is moment:  # moment = Future()
                self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
                return False
            return True
    Runner()

    2.1 __init__方法

    __init__ 里面执行了一些初始化的操作,最主要是最后两句:

    if self.handle_yield(first_yielded): # 运行
        self.run()

    2.2 handle_yield方法

    handle_yield(self, yielded) 函数,这个函数顾名思义,就是用来处理yield返回的对象的。

    首先我们假设yielded是一个Future对象(因为这是最常用的情况),这样的话代码就缩减了很多

    def handle_yield(self, yielded):
            self.future = convert_yielded(yielded)                         # 如果yielded是Future对象则原样返回
            if not self.future.done() or self.future is moment:            # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
                self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
                return False
            return True

    也就是干了三步:

      首先解析出self.future  

      然后判断self.future对象是否已经被done(完成),如果没有的话为其添加回调函数,这个回调函数会执行self.run()

      返回self.future对象是否被done

    总体来说,handle_yield返回yielded对象是否被set_done,如果没有则为yielded对象添加回调函数,这个回调函数执行self.run()

    还有一个有趣的地方,就是上面代码的第四行:  self.io_loop.add_future(self.future, lambda f: self.run()) 

    def add_future(self, future, callback):
        # 为future添加一个回调函数,这个回调函数的作用是:将参数callback添加至self._callbacks中
        # 大家思考一个问题: 如果某个Future对象被set_done,那么他的回调函数应该在什么时候执行? 
        # 是立即执行亦或者是将回调函数添加到IOLoop实例的_callbacks中进行统一执行? 
        # 虽然前者更简单,但导致回调函数的执行过于混乱,我们应该让所有满足执行条件的回调函数统一执行。显然后者更合理
        # 而add_future()的作用就是这样
        future.add_done_callback(lambda future: self.add_callback(callback, future))
            
    def add_callback(self, callback, *args, **kwargs):
        # 将callback添加至_callbacks列表中
        self._callbacks.append(functools.partial(callback, *args, **kwargs))

    2.3 run方法

    再来看self.run()方法。这个方法实际上就是一个循环,不停的执行generator的send()方法,发送的值就是yielded的result。

    我们可以将run()方法简化一下:

        def run(self):
            """Starts or resumes the generator, running until it reaches a
            yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
            """
            try:
                self.running = True 
                while True:
                    future = self.future  
                    if not future.done():
                        return
                    self.future = None      # 清空self.future
                    value = future.result()   # 获取future对象的结果
                    try:    
                        yielded = self.gen.send(value)  # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    if not self.handle_yield(yielded):  # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
                        return
            finally:
                self.running = False

    总结:

      1 每一个Future对应一个异步操作

      2 该Future对象可以添加回调函数,当该异步操作完成后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数

      3 凡是使用了coroutine装饰器的generator函数都会返回一个Future对象,同时会不断为该generator,该generator每一次运行send()或者next()的返回结果yielded以及future对象运行Runner()

      4 Runner()会对generator不断进行send()或者next()操作。具体步骤是:上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send()至generator中,不断循环该操作,直到产生StopIteration或者Return异常(这表示该generator执行结束),这时会为该generator对应的Future对象set_result。

        我们可以看到tornado的协程是基于generator的,generator可以通过yield关键字暂停执行,也可以通过next()或者send()恢复执行,同时send()可以向generator中传递值。

        而将协程连接起来的纽带则是Future对象,每一个Future对象都对应着一个异步操作,我们可以为该对象添加许多回调函数,当异步操作完成后通过对Future对象进行set_done或者set_result就可以执行相关的回调函数。

        提供动力的则是Runner(),他不停的将generator所yield的每一个future对象的结果send()至generator,当generator运行结束,他会进行最后的包装工作,对该generator所对应的Future对象执行set_result操作。

    参考:

      http://blog.csdn.net/wyx819/article/details/45420017

      http://www.cnblogs.com/apexchu/p/4226784.html

  • 相关阅读:
    tensorboard以时间命名每一个文件夹
    图像分割loss集合
    博客园使用markdown
    Processed foods make us fatter easily
    python 有4个数字1234,能组成多少个互不相同且无重复的三位数数字。
    python 实现计算器功能 输入字符串,输出相应结果
    解决idea关闭Run窗口时点了Disconnect导致项目一直在跑的问题
    idea导入SpringBoot项目,没有启动按钮,没有maven
    Bean with name 'xxxService' has been injected into other beans [xxxServiceA,xxxServiceB] in its raw version as part of a circular reference, but has eventually been wrapped
    工厂模式
  • 原文地址:https://www.cnblogs.com/MnCu8261/p/6560502.html
Copyright © 2011-2022 走看看