zoukankan      html  css  js  c++  java
  • 深入tornado中的协程

    tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费。

    tornado中的I/O多路复用前面已经讲过了。本文不做详细解释。

    来看一下tornado中的协程模块:tornado.gen:

    tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步。

    先来说一下tornado.gen.coroutine的实现思路:

      我们知道generator中的yield语句可以使函数暂停执行,而send()方法则可以恢复函数的执行。

      tornado将那些异步操作放置到yield语句后,当这些异步操作完成后,tornado会将结果send()至generator中恢复函数执行。

    在tornado的官方文档中有这么一句话:

    Most asynchronous functions in Tornado return a Future; yielding this object returns its result.

    就是说:

      在tornado中大多数的异步操作返回一个Future对象

      yield Future对象 会返回该异步操作的结果,这句话的意思就是说 假如 ret = yield some_future_obj 当some_future_obj所对应的异步操作完成后会自动的将该异步操作的结果赋值给 ret

    那么,Future对象到底是什么?

    一  Future对象

    先来说说Future对象:

    Future对象可以概括为: 一个异步操作的占位符,当然这个占位符有些特殊,它特殊在:

      1 这个占位符是一个对象

      2 这个对象包含了很多属性,包括_result 以及 _callbacks,分别用来存储异步操作的结果以及回调函数

      3 这个对象包含了很多方法,比如添加回调函数,设置异步操作结果等。

      4 当这个对象对应的异步操作完成后,该对象会被set_done,然后遍历并运行_callbacks中的回调函数

    来看一下Future的简化版

    class Future(object):
        '''
            Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
            如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
        '''
        def __init__(self):
            self._result = None    # 执行的结果
            self._callbacks = []    # 用来保存该future对象的回调函数
    
        def result(self, timeout=None):
            # 如果操作成功,返回结果。如果失败则抛出异常
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def add_done_callback(self, fn):
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            self._result = result
            self._set_done()
    
        def _set_done(self):
            # 执行结束(成功)后的操作。
            self._done = True
            for cb in self._callbacks:
                try:
                    cb(self)
                except Exception:
                    app_log.exception('Exception in callback %r for %r', cb, self)
            self._callbacks = None

    完整源码:

    class Future(object):
        '''
            Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
        '''
        def __init__(self):
            self._done = False  # 是否执行完成
            self._result = None    # 执行的结果
            self._exc_info = None    # 执行的异常信息
    
            self._log_traceback = False   # Used for Python >= 3.4
            self._tb_logger = None        # Used for Python <= 3.3
    
            self._callbacks = []    # 用来保存该future对象的回调函数
    
        # Implement the Python 3.5 Awaitable protocol if possible
        # (we can't use return and yield together until py33).
        if sys.version_info >= (3, 3):
            exec(textwrap.dedent("""
            def __await__(self):
                return (yield self)
            """))
        else:
            # Py2-compatible version for use with cython.
            def __await__(self):
                result = yield self
                # StopIteration doesn't take args before py33,
                # but Cython recognizes the args tuple.
                e = StopIteration()
                e.args = (result,)
                raise e
    
        def cancel(self):
            """Cancel the operation, if possible. 如果可能的话取消操作
            tornado对象不支持取消操作,所以总是返回False
            """
            return False
    
        def cancelled(self):
            # 同上
            return False
    
        def running(self):
            """Returns True if this operation is currently running."""
            return not self._done
    
        def done(self):
            """Returns True if the future has finished running."""
            return self._done
    
        def _clear_tb_log(self):
            self._log_traceback = False
            if self._tb_logger is not None:
                self._tb_logger.clear()
                self._tb_logger = None
    
        def result(self, timeout=None):
            """If the operation succeeded, return its result.  If it failed,
            re-raise its exception. 如果操作成功,返回结果。如果失败则抛出异常
    
            This method takes a ``timeout`` argument for compatibility with
            `concurrent.futures.Future` but it is an error to call it
            before the `Future` is done, so the ``timeout`` is never used.
            """
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def exception(self, timeout=None):
            """If the operation raised an exception, return the `Exception`
            object.  Otherwise returns None.
    
            This method takes a ``timeout`` argument for compatibility with
            `concurrent.futures.Future` but it is an error to call it
            before the `Future` is done, so the ``timeout`` is never used.
            """
            self._clear_tb_log()
            if self._exc_info is not None:
                return self._exc_info[1]
            else:
                self._check_done()
                return None
    
        def add_done_callback(self, fn):
            """Attaches the given callback to the `Future`. 将callback附加到
    
            It will be invoked with the `Future` as its argument when the Future
            has finished running and its result is available.  In Tornado
            consider using `.IOLoop.add_future` instead of calling
            `add_done_callback` directly.
            """
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            """Sets the result of a ``Future``. 将 result 设置为该future对象的结果
    
            It is undefined to call any of the ``set`` methods more than once
            on the same object.
            """
            self._result = result
            self._set_done()
    
        def set_exception(self, exception):
            """Sets the exception of a ``Future.``"""
            self.set_exc_info(
                (exception.__class__,
                 exception,
                 getattr(exception, '__traceback__', None)))
    
        def exc_info(self):
            """Returns a tuple in the same format as `sys.exc_info` or None.
    
            .. versionadded:: 4.0
            """
            self._clear_tb_log()
            return self._exc_info
    
        def set_exc_info(self, exc_info):
            """Sets the exception information of a ``Future.``
    
            Preserves tracebacks on Python 2.
    
            .. versionadded:: 4.0
            """
            self._exc_info = exc_info
            self._log_traceback = True
            if not _GC_CYCLE_FINALIZERS:
                self._tb_logger = _TracebackLogger(exc_info)
    
            try:
                self._set_done()
            finally:
                # Activate the logger after all callbacks have had a
                # chance to call result() or exception().
                if self._log_traceback and self._tb_logger is not None:
                    self._tb_logger.activate()
            self._exc_info = exc_info
    
        def _check_done(self):
            if not self._done:
                raise Exception("DummyFuture does not support blocking for results")
    
        def _set_done(self):
            # 执行结束(成功)后的操作。
            self._done = True
            for cb in self._callbacks:
                try:
                    cb(self)
                except Exception:
                    app_log.exception('Exception in callback %r for %r', cb, self)
            self._callbacks = None
    
        # On Python 3.3 or older, objects with a destructor part of a reference
        # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to
        # the PEP 442.
        if _GC_CYCLE_FINALIZERS:
            def __del__(self):
                if not self._log_traceback:
                    # set_exception() was not called, or result() or exception()
                    # has consumed the exception
                    return
    
                tb = traceback.format_exception(*self._exc_info)
    
                app_log.error('Future %r exception was never retrieved: %s',
                              self, ''.join(tb).rstrip())
    Future源码

    二  gen.coroutine装饰器

    tornado中的协程是通过tornado.gen中的coroutine装饰器实现的:

    def coroutine(func, replace_callback=True):
        return _make_coroutine_wrapper(func, replace_callback=True)
    _make_coroutine_wrapper :
    def _make_coroutine_wrapper(func, replace_callback):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            '''
                大体过程:
                future = TracebackFuture()  
                result = func(*args, **kwargs)
                if isinstance(result, GeneratorType):
                    yielded = next(result)
                    Runner(result, future, yielded)
                return future
            '''
            future = TracebackFuture()                   # TracebackFuture = Future
    
            if replace_callback and 'callback' in kwargs:
                callback = kwargs.pop('callback')
                IOLoop.current().add_future(future, lambda future: callback(future.result()))
    
            try:
                result = func(*args, **kwargs)           # 执行func,若func中包含yield,则返回一个generator对象
            except (Return, StopIteration) as e:
                result = _value_from_stopiteration(e)
            except Exception:
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, GeneratorType):      # 判断其是否为generator对象
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        yielded = next(result)            # 第一次执行
                        if stack_context._state.contexts is not orig_stack_contexts:
                            yielded = TracebackFuture()
                            yielded.set_exception(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        future.set_result(_value_from_stopiteration(e))
                    except Exception:
                        future.set_exc_info(sys.exc_info())
                    else:
                        Runner(result, future, yielded)  # Runner(result, future, yield)
                    try:
                        return future            
                    finally:
                        future = None
            future.set_result(result)
            return future
        return wrapper

    先来看一下大体过程:

      1  首先生成一个Future对象

      2  运行该被装饰函数并将结果赋值给result。 在这里因为tornado的'异步'实现是基于generator的,所以一般情况下 result是一个generator对象

      3  yielded = next(result)  执行到被装饰函数的第一次yield,将结果赋值给yielded。一般情况下,yielded很大情况下是一个Future对象。

      4  Runner(result, future, yielded)

      5  return future

    除了第4步以外其他都很好理解,所以来了解一下第四步Runner()干了些啥:

    三  Runner()类

    1 为什么要有Runner()?或者说Runner()的作用是什么?

    Runner()可以自动的将异步操作的结果send()至生成器中止的地方

    tornado的协程或者说异步是基于generator实现的,generator较为常用的有两个方法:send() next() ,关于这两个方法的流程分析在这

    很多情况下会有generator的嵌套。比如说经常会yield 一个generator。当A生成器yield B生成器时,分两步:

      1 我们首先中止A的执行转而执行B

      2 当B执行完成后,我们需要将B的结果send()至A中止的地方,继续执行A

    Runner()主要就是来做这些的,也就是控制生成器的执行与中止,并在合适的情况下使用send()方法同时传入B生成器的结果唤醒A生成器。

    来看一个简单例子:

    def run():
        print('start running')
        yield 2     # 跑步用时2小时
    
    def eat():
        print('start eating')
        yield 1     # 吃饭用时1小时
    
    def time():
        run_time = yield run()
        eat_time = yield eat()
        print(run_time+eat_time)
    
    def Runner(gen):
        r = next(gen)
        return r
    
    t = time()
    try:
        action = t.send(Runner(next(t)))
        t.send(Runner(action))
    except StopIteration:
        pass
    View Code

    上例中的Runner()仅仅完成了第一步,我们还需要手动的执行第二步,而tornado的gen的Runner()则做了全套奥!

    2 剖析Runner()

    在Runner()中主要有三个方法__init__  handle_yield  run:

    class Runner(object):
        def __init__(self, gen, result_future, first_yielded):
            self.gen = gen                        # 一个generator对象
            self.result_future = result_future    # 一个Future对象
            self.future = _null_future            # 一个刚初始化的Future对象  _null_future = Future(); _null_future.set_result(None)
            self.yield_point = None
            self.pending_callbacks = None
            self.results = None
            self.running = False
            self.finished = False
            self.had_exception = False
            self.io_loop = IOLoop.current()
            self.stack_context_deactivate = None
            if self.handle_yield(first_yielded):
                self.run()
        
        ………… 部分方法省略
        def run(self):
            """Starts or resumes the generator, running until it reaches a
            yield point that is not ready.
            """
            if self.running or self.finished:
                return
            try:
                self.running = True
                while True:
                    future = self.future
                    if not future.done():
                        return
                    self.future = None
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        exc_info = None
    
                        try:
                            value = future.result()
                        except Exception:
                            self.had_exception = True
                            exc_info = sys.exc_info()
    
                        if exc_info is not None:
                            yielded = self.gen.throw(*exc_info)
                            exc_info = None
                        else:
                            yielded = self.gen.send(value)
    
                        if stack_context._state.contexts is not orig_stack_contexts:
                            self.gen.throw(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        if self.pending_callbacks and not self.had_exception:
                            # If we ran cleanly without waiting on all callbacks
                            # raise an error (really more of a warning).  If we
                            # had an exception then some callbacks may have been
                            # orphaned, so skip the check in that case.
                            raise LeakedCallbackError(
                                "finished without waiting for callbacks %r" %
                                self.pending_callbacks)
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    except Exception:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_exc_info(sys.exc_info())
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    if not self.handle_yield(yielded):
                        return
            finally:
                self.running = False
    
        def handle_yield(self, yielded):
            if _contains_yieldpoint(yielded):    # 检查其中是否包含YieldPoint
                yielded = multi(yielded)
    
            if isinstance(yielded, YieldPoint):        # Base class for objects that may be yielded from the generator
                self.future = TracebackFuture()        # 一个刚刚初始化的Future对象
    
                def start_yield_point():
                    try:
                        yielded.start(self)
                        if yielded.is_ready():
                            self.future.set_result(yielded.get_result())
                        else:
                            self.yield_point = yielded
                    except Exception:
                        self.future = TracebackFuture()
                        self.future.set_exc_info(sys.exc_info())
    
                if self.stack_context_deactivate is None:
                    with stack_context.ExceptionStackContext(self.handle_exception) as deactivate:
                        self.stack_context_deactivate = deactivate
                        
                        def cb():
                            start_yield_point()
                            self.run()
                        self.io_loop.add_callback(cb)
                        return False
                else:
                    start_yield_point()
            else:
                try:
                    self.future = convert_yielded(yielded)
                except BadYieldError:
                    self.future = TracebackFuture()
                    self.future.set_exc_info(sys.exc_info())
    
            if not self.future.done() or self.future is moment:  # moment = Future()
                self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
                return False
            return True
    Runner()

    2.1 __init__方法

    __init__ 里面执行了一些初始化的操作,最主要是最后两句:

    if self.handle_yield(first_yielded): # 运行
        self.run()

    2.2 handle_yield方法

    handle_yield(self, yielded) 函数,这个函数顾名思义,就是用来处理yield返回的对象的。

    首先我们假设yielded是一个Future对象(因为这是最常用的情况),这样的话代码就缩减了很多

    def handle_yield(self, yielded):
            self.future = convert_yielded(yielded)                         # 如果yielded是Future对象则原样返回
            if not self.future.done() or self.future is moment:            # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
                self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
                return False
            return True

    也就是干了三步:

      首先解析出self.future  

      然后判断self.future对象是否已经被done(完成),如果没有的话为其添加回调函数,这个回调函数会执行self.run()

      返回self.future对象是否被done

    总体来说,handle_yield返回yielded对象是否被set_done,如果没有则为yielded对象添加回调函数,这个回调函数执行self.run()

    还有一个有趣的地方,就是上面代码的第四行:  self.io_loop.add_future(self.future, lambda f: self.run()) 

    def add_future(self, future, callback):
        # 为future添加一个回调函数,这个回调函数的作用是:将参数callback添加至self._callbacks中
        # 大家思考一个问题: 如果某个Future对象被set_done,那么他的回调函数应该在什么时候执行? 
        # 是立即执行亦或者是将回调函数添加到IOLoop实例的_callbacks中进行统一执行? 
        # 虽然前者更简单,但导致回调函数的执行过于混乱,我们应该让所有满足执行条件的回调函数统一执行。显然后者更合理
        # 而add_future()的作用就是这样
        future.add_done_callback(lambda future: self.add_callback(callback, future))
            
    def add_callback(self, callback, *args, **kwargs):
        # 将callback添加至_callbacks列表中
        self._callbacks.append(functools.partial(callback, *args, **kwargs))

    2.3 run方法

    再来看self.run()方法。这个方法实际上就是一个循环,不停的执行generator的send()方法,发送的值就是yielded的result。

    我们可以将run()方法简化一下:

        def run(self):
            """Starts or resumes the generator, running until it reaches a
            yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
            """
            try:
                self.running = True 
                while True:
                    future = self.future  
                    if not future.done():
                        return
                    self.future = None      # 清空self.future
                    value = future.result()   # 获取future对象的结果
                    try:    
                        yielded = self.gen.send(value)  # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    if not self.handle_yield(yielded):  # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
                        return
            finally:
                self.running = False

    总结:

      1 每一个Future对应一个异步操作

      2 该Future对象可以添加回调函数,当该异步操作完成后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数

      3 凡是使用了coroutine装饰器的generator函数都会返回一个Future对象,同时会不断为该generator,该generator每一次运行send()或者next()的返回结果yielded以及future对象运行Runner()

      4 Runner()会对generator不断进行send()或者next()操作。具体步骤是:上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send()至generator中,不断循环该操作,直到产生StopIteration或者Return异常(这表示该generator执行结束),这时会为该generator对应的Future对象set_result。

        我们可以看到tornado的协程是基于generator的,generator可以通过yield关键字暂停执行,也可以通过next()或者send()恢复执行,同时send()可以向generator中传递值。

        而将协程连接起来的纽带则是Future对象,每一个Future对象都对应着一个异步操作,我们可以为该对象添加许多回调函数,当异步操作完成后通过对Future对象进行set_done或者set_result就可以执行相关的回调函数。

        提供动力的则是Runner(),他不停的将generator所yield的每一个future对象的结果send()至generator,当generator运行结束,他会进行最后的包装工作,对该generator所对应的Future对象执行set_result操作。

    参考:

      http://blog.csdn.net/wyx819/article/details/45420017

      http://www.cnblogs.com/apexchu/p/4226784.html

  • 相关阅读:
    CodeForces 681D Gifts by the List (树上DFS)
    UVa 12342 Tax Calculator (水题,纳税)
    CodeForces 681C Heap Operations (模拟题,优先队列)
    CodeForces 682C Alyona and the Tree (树上DFS)
    CodeForces 682B Alyona and Mex (题意水题)
    CodeForces 682A Alyona and Numbers (水题,数学)
    Virtualizing memory type
    页面跳转
    PHP Misc. 函数
    PHP 5 Math 函数
  • 原文地址:https://www.cnblogs.com/MnCu8261/p/6560502.html
Copyright © 2011-2022 走看看