zoukankan      html  css  js  c++  java
  • 学习 Tornado

    异步编程

    预习

    lambda

     Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Semantically, they are just syntactic sugar for a normal function definition. the expression lambda arguments: expression yields a function object. The unnamed object behaves like a function object defined with

    def name(arguments):
        return expression
    

     看下 Tornado 中的一个例子:

    def add_done_callback(self, fn):
        pass    
    
    future.add_done_callback(
        lambda future: self.add_callback(callback, future))
    

    lambda future: self.add_callback(callback, future) 的调用: 

    for cb in self._callbacks:
        try:
            cb(self)
    

    装饰器

    Future

    处理过程

    • 通过 add_done_callback(self, fn) 函数,给 future 添加回调。

    如果这个Future对象已经done,则直接执行fn,否则将fn加入到 self._callbacks(一个列表) 中保存。

    • _set_done(self) 函数
    • 遍历 self._callbacks,并调用 callback 函数。
    • set_result(self, result) 给 Future 对象设置 result,并调用 _set_done。

    IOLoop add_callback 与 Future add_done_callback

    都是加入 self._callbacks。只是 IOLoop 的是 deque,加入时有原子性;而 Future 的是 list。

    先看 Future add_done_callback

    什么时候调用这个方法?

    IOLoop.add_future(关键)

        def add_future(self, future, callback):
            """Schedules a callback on the ``IOLoop`` when the given
            `.Future` is finished.
    
            The callback is invoked with one argument, the
            `.Future`.
            """
            assert is_future(future)
            callback = stack_context.wrap(callback)
            future.add_done_callback(
                lambda future: self.add_callback(callback, future))
    

     通过调用 IOLoop.add_future,可以使得:当一个 future 完成了,将其加入到 IOLoop 的 _callbacks 中,然后 IOLoop 就会运行这个 callback。

    callback 执行的顺序

    future callbacks

    self._callbacks = []
    

     添加

    self._callbacks.append(fn)
    

     执行

    for cb in self._callbacks:
        try:
            cb(self)
    

    所以顺序是先进先出。 

    IOLoop callbacks

    self._callbacks = collections.deque()

    添加

    self._callbacks.append(functools.partial(
        stack_context.wrap(callback), *args, **kwargs))

    执行

    for i in range(ncallbacks):
        self._run_callback(self._callbacks.popleft())
    

     所以顺序是先进先出。  

    def add_done_callback(self, fn):
        """Attaches the given callback to the `Future`.
    
        It will be invoked with the `Future` as its argument when the Future
        has finished running and its result is available.  In Tornado
        consider using `.IOLoop.add_future` instead of calling
        `add_done_callback` directly.
        """
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)
    

     如果 Future 执行完并且结果可用,那么调用回调函数,否则,将回调函数加入 self._callbacks 列表。

    self._callbacks 在 _set_done 中被遍历并被调用。

    接着看 IOLoop add_callback

    def add_callback(self, callback, *args, **kwargs):
        if self._closing:
            return
        # Blindly insert into self._callbacks. This is safe even
        # from signal handlers because deque.append is atomic.
        self._callbacks.append(functools.partial(
            stack_context.wrap(callback), *args, **kwargs))
        if thread.get_ident() != self._thread_ident:
            # This will write one byte but Waker.consume() reads many
            # at once, so it's ok to write even when not strictly
            # necessary.
            self._waker.wake()
        else:
            # If we're on the IOLoop's thread, we don't need to wake anyone.
            pass
    

     将 partial 后的han's函数添加到 self._callbacks 中(这里的 self 指的是 IOLoop 对象,而上面的 self._callbacks 指的是 Future 对象)

     对这段不是很理解。所以只能说下 functools.partial 的作用。

    Return a new partial object which when called will behave like func called with the positional arguments args and keyword arguments keywords. If more arguments are supplied to the call, they are appended to args. If additional keyword arguments are supplied, they extend and override keywords. Roughly equivalent to:

    def partial(func, *args, **keywords):
        def newfunc(*fargs, **fkeywords):
            newkeywords = keywords.copy()
            newkeywords.update(fkeywords)
            return func(*(args + fargs), **newkeywords)
        newfunc.func = func
        newfunc.args = args
        newfunc.keywords = keywords
        return newfunc
    

    所以之后调用这个函数 newfunc(),就相当于调用 callback(*args, **kwargs)。newfunc 不需要传入任何参数。  

    self._callbacks 在 IOLoop.start() 之后调用,截取部分代码:

    while True:
        # Prevent IO event starvation by delaying new callbacks
        # to the next iteration of the event loop.
        ncallbacks = len(self._callbacks)
        # ......#       
        for i in range(ncallbacks):
            self._run_callback(self._callbacks.popleft())

    意思是,当 IOLoop 启动之后,只要存在未处理的 callback,就运行它。而上面的 future 是在 future 结果出来之后再运行所有的 self._callbacks

    传递的 callback

    说了这么久,传递的这个 callback 是什么?我们在使用 @gen.coroutine 并没有传递这个。

    我根据调用了 IOLoop.add_future 的地方来查找。

    • tornado.gen.Runner#handle_yield
    if not self.future.done() or self.future is moment:
        def inner(f):
            # Break a reference cycle to speed GC.
            f = None # noqa
            self.run()
        self.io_loop.add_future(
            self.future, inner)
        return False
    return True
    

     当一个 future 完成了,将其设置为 None 来加快垃圾回收,然后继续运行这个 generator。发现这里使用的最多的。

    • tornado.gen._make_coroutine_wrapper

    也就是 @gen.coroutine 的内部函数。

    if replace_callback and 'callback' in kwargs:
        callback = kwargs.pop('callback')
        IOLoop.current().add_future(
            future, lambda future: callback(future.result()))
    

    当一个 future 完成了,通过传递进来的 callback 函数处理 future 的结果。

    kwargs 中的 callback 是哪里传入的?

    从源码 tornado.gen.coroutine 中可看出,该装饰器包装了两层:

    def coroutine(func, replace_callback=True):
        return _make_coroutine_wrapper(func, replace_callback=True)
    
    def _make_coroutine_wrapper(func, replace_callback):
        @functools.wraps(wrapped)
        def wrapper(*args, **kwargs):
            pass
    
        return wrapper
    

      发现普通请求并不会含有 'callback',即 tornaod 不会自己加入 callback 进去。

    然后我在 

    @gen.coroutine
    def get(self, callback='callback')
        pass
    

    加入 callback='callback',发现并没有生效。

    • tornado.ioloop.IOLoop#run_sync
    def run_sync(self, func, timeout=None):
            # ...#
            self.add_future(future_cell[0], lambda future: self.stop())
    

    当一个 future 完成了,停止当前的 IOLoop 

      

    gen.coroutine

    @gen.coroutine 中 yield a_future 就会返回 a_future 的结果,上面的例子写成这样

    from tornado import gen
    
    @gen.coroutine
    def fetch_coroutine(url):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        raise gen.Return(response.body)
    

    @gen.coroutine的原理

    A function containing yield is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The @gen.coroutine decorator communicates with the generator via the yield expressions, and with the coroutine’s caller by returning a Future.

    Here is a simplified version of the coroutine decorator’s inner loop:

    # Simplified inner loop of tornado.gen.Runner
    def run(self):
        # send(x) makes the current yield return x.
        # It returns when the next yield is reached
        future = self.gen.send(self.next)
        def callback(f):
            self.next = f.result()
            self.run()
        future.add_done_callback(callback)
    

    The decorator receives a Future from the generator, waits (without blocking) for that Future to complete, then “unwraps” the Future and sends the result back into the generator as the result of the yield expression. Most asynchronous code never touches the Future class directly except to immediately pass the Future returned by an asynchronous function to a yield expression.

    简化版本并不是很清晰。比如,@gen.coroutine 是与被修饰函数中的每个 yield 交互吗?下面是更详细的源码:

    def _make_coroutine_wrapper(func, replace_callback):
        """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
    
        The two decorators differ in their treatment of the ``callback``
        argument, so we cannot simply implement ``@engine`` in terms of
        ``@coroutine``.
        """
        # On Python 3.5, set the coroutine flag on our generator, to allow it
        # to be used with 'await'.
        wrapped = func
        if hasattr(types, 'coroutine'):
            func = types.coroutine(func)
    
        @functools.wraps(wrapped)
        def wrapper(*args, **kwargs):
            future = TracebackFuture()
    
            if replace_callback and 'callback' in kwargs:
                callback = kwargs.pop('callback')
                IOLoop.current().add_future(
                    future, lambda future: callback(future.result()))
    
            try:
                result = func(*args, **kwargs)
            except (Return, StopIteration) as e:
                result = _value_from_stopiteration(e)
            except Exception:
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, GeneratorType):
                    # Inline the first iteration of Runner.run.  This lets us
                    # avoid the cost of creating a Runner when the coroutine
                    # never actually yields, which in turn allows us to
                    # use "optional" coroutines in critical path code without
                    # performance penalty for the synchronous case.
                    try:
                        orig_stack_contexts = stack_context._state.contexts
                        yielded = next(result)
                        if stack_context._state.contexts is not orig_stack_contexts:
                            yielded = TracebackFuture()
                            yielded.set_exception(
                                stack_context.StackContextInconsistentError(
                                    'stack_context inconsistency (probably caused '
                                    'by yield within a "with StackContext" block)'))
                    except (StopIteration, Return) as e:
                        future.set_result(_value_from_stopiteration(e))
                    except Exception:
                        future.set_exc_info(sys.exc_info())
                    else:
                        _futures_to_runners[future] = Runner(result, future, yielded)
                    yielded = None
                    try:
                        return future
                    finally:
                        # Subtle memory optimization: if next() raised an exception,
                        # the future's exc_info contains a traceback which
                        # includes this stack frame.  This creates a cycle,
                        # which will be collected at the next full GC but has
                        # been shown to greatly increase memory usage of
                        # benchmarks (relative to the refcount-based scheme
                        # used in the absence of cycles).  We can avoid the
                        # cycle by clearing the local variable after we return it.
                        future = None
            future.set_result(result)
            return future
    
        wrapper.__wrapped__ = wrapped
        wrapper.__tornado_coroutine__ = True
        return wrapper
    

    if replace_callback and 'callback' in kwargs:

      通过 IOLoop.add_future(future, callback) 将 future 加入 IOLoop

     else:

      try:
        result = func(*args, **kwargs)

      func 有什么要求,不能阻塞?

      result 是 future ?

    然后将被修饰函数的结果放入 future 中

    如果被修饰函数的结果是 generator 类型,对这个 generator 调用 next,然后将结果放入 future 中。

    _futures_to_runners[future] = Runner(result, future, yielded)
    

      

    _futures_to_runners = weakref.WeakKeyDictionary()

    IOLoop

    Tornado 基于 epoll 实现

    深入理解 tornado 之底层 ioloop 实现

    怎样将 socket 加入 IOLoop 监控列表?

    为什么 yield future 会返回 future 的结果

    Python 中 yield 的作用

    通过 generator.send 将结果送进当前 yield,详细见 Python generator/yield

    具体怎么实现的就不知道了。

     发现,只有在 @gen.coroutine 中,yield future 才会返回 future 中的结果,否则,会报错

     BadYieldError: yielded unknown object <generator object f at 0x10faaef00>

    future 嵌套

    @gen.coroutine
    def divide(x, y):
        return x / y
    
    def bad_call():
        # This should raise a ZeroDivisionError, but it won't because
        # the coroutine is called incorrectly.
        divide(1, 0)
    
    @gen.coroutine
    def good_call():
        # yield will unwrap the Future returned by divide() and raise
        # the exception.
        yield divide(1, 0)
    

     上面的例子中,good_call 返回的也是一个 future。  

    Most asynchronous functions in Tornado return a Future; yielding this object returns its result

    为什么要这样做?@gen.coroutine 把结果放到 future 中,yield 把结果中 future 中拿出来,两者不是抵消了吗?

    另外,我返现返回了一个 future 的函数,加上 @gen.coroutine 和 yield 还会出错。比如 motor count

    什么情况下使用 @gen.coroutine 和 yield 或者 @gen.coroutine 和 return?

    raise gen.Return

    @gen.coroutine
    def divide(x, y):
        return x / y
    
    >>> divide(1, 1).result()
    >>> 1
    

     使用 raise gen.Return  

    @gen.coroutine
    def divide2(x, y):
        raise gen.Return(x/y)
    divide2(1, 1).result()
    Out[16]: 1
    

     发现原因了,是 yield 和 return 不能同时出现

    class Return(Exception):
        """Special exception to return a value from a `coroutine`.
    
        If this exception is raised, its value argument is used as the
        result of the coroutine::
    
            @gen.coroutine
            def fetch_json(url):
                response = yield AsyncHTTPClient().fetch(url)
                raise gen.Return(json_decode(response.body))
    
        In Python 3.3, this exception is no longer necessary: the ``return``
        statement can be used directly to return a value (previously
        ``yield`` and ``return`` with a value could not be combined in the
        same function).
    

      

      

    return 之后还可以执行操作

    try:
        return future
    finally:
        future = None
    

     return 之后控制权不是交出去了吗?  

    回调

    Interaction with callbacks
    To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:

    @gen.coroutine
    def call_task():
        # Note that there are no parens on some_function.
        # This will be translated by Task into
        # some_function(other_args, callback=callback)
        yield gen.Task(some_function, other_args)
    

     上面的调用是什么意思?

    gen.coroutine 做了什么

    Any generator that yields objects from this module must be wrapped in either this decorator or engine.

    Functions with this decorator return a Future.

    当函数完成,将结果储存在 Future 中。使用 yield future 就能得到结果。

    为什么 get, post 需要这个装饰器?

    get, post 最后一步,一般都是 self.write()

    下面两个函数效果是一样:

    @gen.coroutine
    def f():
        yield function_that_return_future()
    
    def f():
        return function_that_return_future()
    

     最后返回的都是 Future。我们选择哪种形式呢?

    带有 @gen.coroutine 是更好的,读代码的时候知道这个函数会返回一个 future。  

     In Tornado they are normally used with IOLoop.add_future or by yielding them in a gen.coroutine

    gen.coroutine 和 asynchronous 

    gen.coroutine 返回一个 future,调用 yield 等待其完成; 

    asynchronous 完成后,需要一个 callback 函数处理。

    阻塞

    To minimize the cost of concurrent connections, Tornado uses a single-threaded event loop. This means that all application code should aim to be asynchronous and non-blocking because only one operation can be active at a time.--http://www.tornadoweb.org/en/stable/guide/async.html#asynchronous-and-non-blocking-i-o

    上面写应用的代码需要异步和非阻塞,为什么又有一个调用阻塞函数的办法,用在什么地方?

    The simplest way to call a blocking function from a coroutine is to use a ThreadPoolExecutor, which returns Futures that are compatible with coroutines:

    来源:http://www.tornadoweb.org/en/stable/guide/coroutines.html#calling-blocking-functions 

    thread_pool = ThreadPoolExecutor(4)
    
    @gen.coroutine
    def call_blocking():
        yield thread_pool.submit(blocking_func, args)
    

     根据 Python Tornado - Confused how to convert a blocking function into a non-blocking function:

    If the blocking function is CPU-bound (as your for/xrange example is), then threads (or processes) are the only way to make it non-blocking. Creating a thread per incoming request is expensive, but making a small ThreadPoolExecutor to handle all CPU-bound operations is not.

    To make a function non-blocking without using threads, the function must be event-driven: it must be waiting on some external event (such as network I/O) so that it can be awoken when that event occurs.

      上面两个都是返回 future,与 coroutine 交互是否都需要返回 future?为什么?

    根据上文,future 是一 Future 的 instance,在函数运行完后的回调中,设置函数的结果为 future 的一个属性。如果使用了 gen.coroutine 与 yield,那么 yield 会返回函数的结果而不是 future。我们可以从 future 中得到结果,那么为什么不直接返回结果呢?

    处理阻塞代码的方法

    1. Find a coroutine-friendly equivalent. 

    For time.sleep, use tornado.gen.sleep instead

    2. Find a callback-based equivalent. 

    the Tornado wiki can be useful to find suitable libraries.

    class CoroutineTimeoutHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            io_loop = IOLoop.current()
            for i in range(5):
                print(i)
                yield gen.Task(io_loop.add_timeout, io_loop.time() + 1)
    

    add_timeout:

    IOLoop.add_timeout(deadline, callback, *args, **kwargs)

    在 deadline 这个时间点运行 callback 函数  

    gen.Task

    Adapts a callback-based asynchronous function for use in coroutines.

    Takes a function (and optional additional arguments) and runs it with those arguments plus a callback keyword argument. The argument passed to the callback is returned as the result of the yield expression.

    yield gen.Task(io_loop.add_timeout, io_loop.time() + 1) 中哪里有 callback ?

    这个 callback 是 gen.Task 自己加入的,不是我们传入的。作用是返回 Future。

    3. Run the blocking code on another thread.

     MongoDB 异步

    使用 motor 使 MongoDB 异步。

    一个请求就建立一个 MotorClient 还是多个请求公用一个 MotorClient ?

    只建立一个与 MongoDB 的连接: 

    It is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process, as shown in these examples.

    来源:https://motor.readthedocs.io/en/stable/tutorial-tornado.html#tornado-application-startup-sequence

    关闭连接

    为什么使用 tornado.web.asynchronous 时,write() 之后连接不会自动关闭,而使用 @tornado.gen.coroutine 就会自动关闭?

    日志

    tornado with sentry 

    异常处理

    所有的异常都返回 json 形式,通过自定义 write_error

    If a handler raises an exception, Tornado will call RequestHandler.write_error to generate an error page. tornado.web.HTTPError can be used to generate a specified status code; all other exceptions return a 500 status.

    运行与部署

    为什么 Django 需要 WSGI 而 Tornado 不需要?

    部署方式:

    只开一个;

    开 CPU 核心数目个应用,并监听不同端口,然后使用 Nginx 负载均衡;

    通过命令行或配置文件运行

    tornado.options 是一个全局对象

    1. 定义需要的 options,比如 host 表示需要监听的主机、port 表示需要监听的端口号,定义之后就会添加到全局对象 tornado.options 中;

    2. 调用 parse_command_line,把命令行中传入的值传入到对应的 tornado.options 中;

    3. 引用 tornado.options。

      在新建 Application 时引用,比如

      

    tornado.web.Application(urls, default_host=tornado.options.host)
    

      在 handlers 中引用。因为 tornado.options 是全局变量。

    Tornado Server 配置

    一般是使用 

    application = tornado.web.Application(urls, default_host=options.host)
    application.listen(options.port, xheaders=True, max_body_size=MAX_BODY_SIZE,)
    

    application.listen 相当于

    server = HTTPServer(app)
    server.listen(port)
    IOLoop.current().start()
    

     所以 listen() 的参数与 HTTPServer 的参数一样。

    HTTPServer

    xheader

    If xheaders is True, we support the X-Real-Ip/X-Forwarded-For and X-Scheme/X-Forwarded-Proto headers, which override the remote IP and URI scheme/protocol for all requests. 

    这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。

    ssl_options

    HTTPServer 支持 HTTP1ConnectionParameters 参数。

    HTTP1ConnectionParameters

    no_keep_alive (bool) – If true, always close the connection after one request.
    chunk_size (int) – how much data to read into memory at once
    max_header_size (int) – maximum amount of data for HTTP headers
    header_timeout (float) – how long to wait for all headers (seconds)
    max_body_size (int) – maximum amount of data for body
    body_timeout (float) – how long to wait while reading body (seconds)
    decompress (bool) – if true, decode incoming Content-Encoding: gzip

    IOLoop

    IOLoop.current() and IOLoop.instance()

    IOLoop.instance()

    Most applications have a single, global IOLoop running on the main thread. Use this method to get this instance from another thread. In most other cases, it is better to use current() to get the current thread’s IOLoop.

    IOLoop.current(instance=True)

    返回当前线程的 IOLoop

    If an IOLoop is currently running or has been marked as current by make_current, returns that instance. If there is no current IOLoop, returns IOLoop.instance() (i.e. the main thread’s IOLoop, creating one if necessary) if instance is true.

    总结

    是一个全局的变量。如果你有多个线程,要所有的线程公用一个 IOLoop,那么使用 IOLoop.instance()。

    In general you should use IOLoop.current as the default when constructing an asynchronous object, and use IOLoop.instance when you mean to communicate to the main thread from a different one.

    文档中提到,少量的情况下你需要使用多个 IOLoop,比如每个线程一个 IOLoop,那么如何做到呢?

    The vast majority of Tornado apps should have only one IOLoop, running in the main thread. You can run multiple HTTPServers (or other servers) on the same IOLoop.

    It is possible to create multiple IOLoops and give each one its own thread, but this is rarely useful, because the GIL ensures that only one thread is running at a time. If you do use multiple IOLoops you must be careful to ensure that the different threads only communicate with each other through thread-safe methods (i.e. IOLoop.add_callback).

     Config file

    Your main() method can parse the command line or parse a config file with either:

    tornado.options.parse_command_line()
    # or
    tornado.options.parse_config_file("/etc/server.conf")
    

    Command line formats are what you would expect (--myoption=myvalue). Config files are just Python files. Global names become options, e.g.:

    myoption = "myvalue"
    myotheroption = "myothervalue"

    config 文件的路径通过自定义命令行参数读取

    安全和验证

    使用 IP 白名单

    在 application.listen() 中设置 xheader=True,这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。

    我选择覆盖 RequestHandler.prepare 方法来实现。

  • 相关阅读:
    Powershell 的自己主动部署
    Python 爬虫批量下载美剧 from 人人影视 HR-HDTV
    c :函数指针具体解释
    云计算设计模式(二十二)——静态内容托管模式
    Bash 脚本 getopts为什么最后一个參数取不到
    清理SYSAUX表空间的WRH$_LATCH_CHILDREN表
    Linux配置防火墙,开启80port、3306port 可能会遇到的小问题
    Android v4包中的 SwipeRefreshLayout 官方的下拉刷新组件
    Nginx 笔记与总结(3)配置虚拟主机
    用SPSS做时间序列
  • 原文地址:https://www.cnblogs.com/jay54520/p/6993434.html
Copyright © 2011-2022 走看看