zoukankan      html  css  js  c++  java
  • 深入tornado中的协程

    tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费。

    tornado中的I/O多路复用前面已经讲过了。本文不做详细解释。

    来看一下tornado中的协程模块:tornado.gen:

    tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步。

    先来说一下tornado.gen.coroutine的实现思路:

      我们知道generator中的yield语句可以使函数暂停执行,而send()方法则可以恢复函数的执行。

      tornado将那些异步操作放置到yield语句后,当这些异步操作完成后,tornado会将结果send()至generator中恢复函数执行。

    在tornado的官方文档中有这么一句话:

    Most asynchronous functions in Tornado return a Future; yielding this object returns its result.

    就是说:在tornado中大多数的异步操作返回一个Future对象,yield Future对象 会返回该异步操作的结果。

    那么,Future对象到底是什么?

    一  Future对象

    先来说说Future对象:

    Future对象可以概括为: 一个异步操作的占位符,当然这个占位符有些特殊,它特殊在:

      1 这个占位符是一个对象

      2 这个对象包含了很多属性,包括_result 以及 _callbacks,分别用来存储异步操作的结果以及回调函数

      3 这个对象包含了很多方法,比如添加回调函数,设置异步操作结果等。

      4 当这个对象对应的异步操作完成后,该对象会被set_done,然后遍历并运行_callbacks中的回调函数

    来看一下Future的简化版

    复制代码
    class Future(object):
        '''
            Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
            如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
        '''
        def __init__(self):
            self._result = None    # 执行的结果
            self._callbacks = []    # 用来保存该future对象的回调函数
    
        def result(self, timeout=None):
            # 如果操作成功,返回结果。如果失败则抛出异常
            self._clear_tb_log()
            if self._result is not None:
                return self._result
            if self._exc_info is not None:
                raise_exc_info(self._exc_info)
            self._check_done()
            return self._result
    
        def add_done_callback(self, fn):
            if self._done:
                fn(self)
            else:
                self._callbacks.append(fn)
    
        def set_result(self, result):
            self._result = result
            self._set_done()
    
        def _set_done(self):
            # 执行结束(成功)后的操作。
            self._done = True
            for cb in self._callbacks:
                try:
                    cb(self)
                except Exception:
                    app_log.exception('Exception in callback %r for %r', cb, self)
            self._callbacks = None
    复制代码

    完整源码:

     Future源码

    二  gen.coroutine装饰器

    tornado中的协程是通过tornado.gen中的coroutine装饰器实现的:

    def coroutine(func, replace_callback=True):
        return _make_coroutine_wrapper(func, replace_callback=True)
    _make_coroutine_wrapper :
    复制代码
    def _make_coroutine_wrapper(func, replace_callback):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            '''
                大体过程:
                future = TracebackFuture()  
                result = func(*args, **kwargs)
                if isinstance(result, GeneratorType):
                    yielded = next(result)
                    Runner(result, future, yielded)
                return future
            '''
            future = TracebackFuture()                   # TracebackFuture = Future
    
            if replace_callback and 'callback' in kwargs:
                callback = kwargs.pop('callback')
                IOLoop.current().add_future(future, lambda future: callback(future.result()))
    
            try:
                result = func(*args, **kwargs)           # 执行func,若func中包含yield,则返回一个generator对象
            except (Return, StopIteration) as e:
                result = _value_from_stopiteration(e)
            except Exception:
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, GeneratorType):      # 判断其是否为generator对象
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        yielded = next(result)            # 第一次执行
                        if stack_context._state.contexts is not orig_stack_contexts:
                            yielded = TracebackFuture()
                            yielded.set_exception(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        future.set_result(_value_from_stopiteration(e))
                    except Exception:
                        future.set_exc_info(sys.exc_info())
                    else:
                        Runner(result, future, yielded)  # Runner(result, future, yield)
                    try:
                        return future            
                    finally:
                        future = None
            future.set_result(result)
            return future
        return wrapper
    复制代码

    先来看一下大体过程:

      1  首先生成一个Future对象

      2  运行该被装饰函数并将结果赋值给result。 在这里因为tornado的'异步'实现是基于generator的,所以一般情况下 result是一个generator对象

      3  yielded = next(result)  执行到被装饰函数的第一次yield,将结果赋值给yielded。一般情况下,yielded很大情况下是一个Future对象。

      4  Runner(result, future, yielded)

      5  return future

    除了第4步以外其他都很好理解,所以来了解一下第四步Runner()干了些啥:

    三  Runner()类

    1 为什么要有Runner()?或者说Runner()的作用是什么?

    Runner()可以自动的将异步操作的结果send()至生成器中止的地方

    tornado的协程或者说异步是基于generator实现的,generator较为常用的有两个方法:send() next() ,关于这两个方法的流程分析在这

    很多情况下会有generator的嵌套。比如说经常会yield 一个generator。当A生成器yield B生成器时,分两步:

      1 我们首先中止A的执行转而执行B

      2 当B执行完成后,我们需要将B的结果send()至A中止的地方,继续执行A

    Runner()主要就是来做这些的,也就是控制生成器的执行与中止,并在合适的情况下使用send()方法同时传入B生成器的结果唤醒A生成器。

    来看一个简单例子:

     View Code

    上例中的Runner()仅仅完成了第一步,我们还需要手动的执行第二步,而tornado的gen的Runner()则做了全套奥!

    2 剖析Runner()

    在Runner()中主要有三个方法__init__  handle_yield  run:

     Runner()

    2.1 __init__方法

    __init__ 里面执行了一些初始化的操作,最主要是最后两句:

    if self.handle_yield(first_yielded): # 运行
        self.run()

    2.2 handle_yield方法

    handle_yield(self, yielded) 函数,这个函数顾名思义,就是用来处理yield返回的对象的。

    首先我们假设yielded是一个Future对象(因为这是最常用的情况),这样的话代码就缩减了很多

    def handle_yield(self, yielded):
            self.future = convert_yielded(yielded)                         # 如果yielded是Future对象则原样返回
            if not self.future.done() or self.future is moment:            # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
                self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
                return False
            return True

    也就是干了三步:

      首先解析出self.future  

      然后判断self.future对象是否已经被done(完成),如果没有的话为其添加回调函数,这个回调函数会执行self.run()

      返回self.future对象是否被done

    总体来说,handle_yield返回yielded对象是否被set_done,如果没有则为yielded对象添加回调函数,这个回调函数执行self.run()

    还有一个有趣的地方,就是上面代码的第四行:  self.io_loop.add_future(self.future, lambda f: self.run()) 

    复制代码
    def add_future(self, future, callback):
        # 为future添加一个回调函数,这个回调函数的作用是:将参数callback添加至self._callbacks中
        # 大家思考一个问题: 如果某个Future对象被set_done,那么他的回调函数应该在什么时候执行? 
        # 是立即执行亦或者是将回调函数添加到IOLoop实例的_callbacks中进行统一执行? 
        # 虽然前者更简单,但导致回调函数的执行过于混乱,我们应该让所有满足执行条件的回调函数统一执行。显然后者更合理
        # 而add_future()的作用就是这样
        future.add_done_callback(lambda future: self.add_callback(callback, future))
            
    def add_callback(self, callback, *args, **kwargs):
        # 将callback添加至_callbacks列表中
        self._callbacks.append(functools.partial(callback, *args, **kwargs))
    复制代码

    2.3 run方法

    再来看self.run()方法。这个方法实际上就是一个循环,不停的执行generator的send()方法,发送的值就是yielded的result。

    我们可以将run()方法简化一下:

    复制代码
        def run(self):
            """Starts or resumes the generator, running until it reaches a
            yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
            """
            try:
                self.running = True 
                while True:
                    future = self.future  
                    if not future.done():
                        return
                    self.future = None      # 清空self.future
                    value = future.result()   # 获取future对象的结果
                    try:    
                        yielded = self.gen.send(value)  # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
                    except (StopIteration, Return) as e:
                        self.finished = True
                        self.future = _null_future
                        self.result_future.set_result(_value_from_stopiteration(e))
                        self.result_future = None
                        self._deactivate_stack_context()
                        return
                    if not self.handle_yield(yielded):  # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
                        return
            finally:
                self.running = False
    复制代码

    总结:

      1 每一个Future对应一个异步操作

      2 该Future对象可以添加回调函数,当该异步操作完成后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数

      3 凡是使用了coroutine装饰器的generator函数都会返回一个Future对象,同时会不断为该generator,该generator每一次运行send()或者next()的返回结果yielded以及future对象运行Runner()

      4 Runner()会对generator不断进行send()或者next()操作。具体步骤是:上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send()至generator中,不断循环该操作,直到产生StopIteration或者Return异常(这表示该generator执行结束),这时会为该generator对应的Future对象set_result。

        我们可以看到tornado的协程是基于generator的,generator可以通过yield关键字暂停执行,也可以通过next()或者send()恢复执行,同时send()可以向generator中传递值。

        而将协程连接起来的纽带则是Future对象,每一个Future对象都对应着一个异步操作,我们可以为该对象添加许多回调函数,当异步操作完成后通过对Future对象进行set_done或者set_result就可以执行相关的回调函数。

        提供动力的则是Runner(),他不停的将generator所yield的每一个future对象的结果send()至generator,当generator运行结束,他会进行最后的包装工作,对该generator所对应的Future对象执行set_result操作。

    参考:

      http://blog.csdn.net/wyx819/article/details/45420017

      http://www.cnblogs.com/apexchu/p/4226784.html

  • 相关阅读:
    网络安全分析
    java实现 洛谷 P1464 Function
    java实现 洛谷 P1464 Function
    java实现 洛谷 P1014 Cantor表
    java实现 洛谷 P1014 Cantor表
    java实现 洛谷 P1014 Cantor表
    java实现 洛谷 P1014 Cantor表
    java实现 洛谷 P1014 Cantor表
    java实现 洛谷 P1540 机器
    java实现 洛谷 P1540 机器
  • 原文地址:https://www.cnblogs.com/yezuhui/p/6863790.html
Copyright © 2011-2022 走看看