异步编程
预习
lambda
Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. Semantically, they are just syntactic sugar for a normal function definition. the expression lambda arguments: expression
yields a function object. The unnamed object behaves like a function object defined with
def name(arguments): return expression
看下 Tornado 中的一个例子:
def add_done_callback(self, fn): pass future.add_done_callback( lambda future: self.add_callback(callback, future))
lambda future: self.add_callback(callback, future) 的调用:
for cb in self._callbacks: try: cb(self)
装饰器
Future
处理过程
- 通过 add_done_callback(self, fn) 函数,给 future 添加回调。
如果这个Future对象已经done,则直接执行fn,否则将fn加入到 self._callbacks(一个列表) 中保存。
- _set_done(self) 函数
- 遍历 self._callbacks,并调用 callback 函数。
- set_result(self, result) 给 Future 对象设置 result,并调用 _set_done。
IOLoop add_callback 与 Future add_done_callback
都是加入 self._callbacks。只是 IOLoop 的是 deque,加入时有原子性;而 Future 的是 list。
先看 Future add_done_callback
什么时候调用这个方法?
IOLoop.add_future(关键)
def add_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
通过调用 IOLoop.add_future,可以使得:当一个 future 完成了,将其加入到 IOLoop 的 _callbacks 中,然后 IOLoop 就会运行这个 callback。
callback 执行的顺序
future callbacks
self._callbacks = []
添加
self._callbacks.append(fn)
执行
for cb in self._callbacks: try: cb(self)
所以顺序是先进先出。
IOLoop callbacks
self._callbacks = collections.deque()
添加
self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
执行
for i in range(ncallbacks): self._run_callback(self._callbacks.popleft())
所以顺序是先进先出。
def add_done_callback(self, fn): """Attaches the given callback to the `Future`. It will be invoked with the `Future` as its argument when the Future has finished running and its result is available. In Tornado consider using `.IOLoop.add_future` instead of calling `add_done_callback` directly. """ if self._done: fn(self) else: self._callbacks.append(fn)
如果 Future 执行完并且结果可用,那么调用回调函数,否则,将回调函数加入 self._callbacks 列表。
self._callbacks 在 _set_done 中被遍历并被调用。
接着看 IOLoop add_callback
def add_callback(self, callback, *args, **kwargs): if self._closing: return # Blindly insert into self._callbacks. This is safe even # from signal handlers because deque.append is atomic. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if thread.get_ident() != self._thread_ident: # This will write one byte but Waker.consume() reads many # at once, so it's ok to write even when not strictly # necessary. self._waker.wake() else: # If we're on the IOLoop's thread, we don't need to wake anyone. pass
将 partial 后的han's函数添加到 self._callbacks 中(这里的 self 指的是 IOLoop 对象,而上面的 self._callbacks 指的是 Future 对象)
对这段不是很理解。所以只能说下 functools.partial 的作用。
Return a new partial object which when called will behave like func called with the positional arguments args and keyword arguments keywords. If more arguments are supplied to the call, they are appended to args. If additional keyword arguments are supplied, they extend and override keywords. Roughly equivalent to:
def partial(func, *args, **keywords): def newfunc(*fargs, **fkeywords): newkeywords = keywords.copy() newkeywords.update(fkeywords) return func(*(args + fargs), **newkeywords) newfunc.func = func newfunc.args = args newfunc.keywords = keywords return newfunc
所以之后调用这个函数 newfunc(),就相当于调用 callback(*args, **kwargs)。newfunc 不需要传入任何参数。
self._callbacks 在 IOLoop.start() 之后调用,截取部分代码:
while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. ncallbacks = len(self._callbacks) # ......# for i in range(ncallbacks): self._run_callback(self._callbacks.popleft())
意思是,当 IOLoop 启动之后,只要存在未处理的 callback,就运行它。而上面的 future 是在 future 结果出来之后再运行所有的 self._callbacks
传递的 callback
说了这么久,传递的这个 callback 是什么?我们在使用 @gen.coroutine 并没有传递这个。
我根据调用了 IOLoop.add_future 的地方来查找。
- tornado.gen.Runner#handle_yield
if not self.future.done() or self.future is moment: def inner(f): # Break a reference cycle to speed GC. f = None # noqa self.run() self.io_loop.add_future( self.future, inner) return False return True
当一个 future 完成了,将其设置为 None 来加快垃圾回收,然后继续运行这个 generator。发现这里使用的最多的。
- tornado.gen._make_coroutine_wrapper
也就是 @gen.coroutine 的内部函数。
if replace_callback and 'callback' in kwargs: callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result()))
当一个 future 完成了,通过传递进来的 callback 函数处理 future 的结果。
kwargs 中的 callback 是哪里传入的?
从源码 tornado.gen.coroutine 中可看出,该装饰器包装了两层:
def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True) def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(wrapped) def wrapper(*args, **kwargs): pass return wrapper
发现普通请求并不会含有 'callback',即 tornaod 不会自己加入 callback 进去。
然后我在
@gen.coroutine def get(self, callback='callback') pass
加入 callback='callback',发现并没有生效。
- tornado.ioloop.IOLoop#run_sync
def run_sync(self, func, timeout=None): # ...# self.add_future(future_cell[0], lambda future: self.stop())
当一个 future 完成了,停止当前的 IOLoop
gen.coroutine
@gen.coroutine 中 yield a_future 就会返回 a_future 的结果,上面的例子写成这样
from tornado import gen @gen.coroutine def fetch_coroutine(url): http_client = AsyncHTTPClient() response = yield http_client.fetch(url) raise gen.Return(response.body)
@gen.coroutine的原理
A function containing
yield
is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The@gen.coroutine
decorator communicates with the generator via theyield
expressions, and with the coroutine’s caller by returning aFuture
.Here is a simplified version of the coroutine decorator’s inner loop:
# Simplified inner loop of tornado.gen.Runner def run(self): # send(x) makes the current yield return x. # It returns when the next yield is reached future = self.gen.send(self.next) def callback(f): self.next = f.result() self.run() future.add_done_callback(callback)
The decorator receives a
Future
from the generator, waits (without blocking) for thatFuture
to complete, then “unwraps” theFuture
and sends the result back into the generator as the result of theyield
expression. Most asynchronous code never touches theFuture
class directly except to immediately pass theFuture
returned by an asynchronous function to ayield
expression.
简化版本并不是很清晰。比如,@gen.coroutine 是与被修饰函数中的每个 yield 交互吗?下面是更详细的源码:
def _make_coroutine_wrapper(func, replace_callback): """The inner workings of ``@gen.coroutine`` and ``@gen.engine``. The two decorators differ in their treatment of the ``callback`` argument, so we cannot simply implement ``@engine`` in terms of ``@coroutine``. """ # On Python 3.5, set the coroutine flag on our generator, to allow it # to be used with 'await'. wrapped = func if hasattr(types, 'coroutine'): func = types.coroutine(func) @functools.wraps(wrapped) def wrapper(*args, **kwargs): future = TracebackFuture() if replace_callback and 'callback' in kwargs: callback = kwargs.pop('callback') IOLoop.current().add_future( future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # Inline the first iteration of Runner.run. This lets us # avoid the cost of creating a Runner when the coroutine # never actually yields, which in turn allows us to # use "optional" coroutines in critical path code without # performance penalty for the synchronous case. try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( 'stack_context inconsistency (probably caused ' 'by yield within a "with StackContext" block)')) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: _futures_to_runners[future] = Runner(result, future, yielded) yielded = None try: return future finally: # Subtle memory optimization: if next() raised an exception, # the future's exc_info contains a traceback which # includes this stack frame. This creates a cycle, # which will be collected at the next full GC but has # been shown to greatly increase memory usage of # benchmarks (relative to the refcount-based scheme # used in the absence of cycles). We can avoid the # cycle by clearing the local variable after we return it. future = None future.set_result(result) return future wrapper.__wrapped__ = wrapped wrapper.__tornado_coroutine__ = True return wrapper
if replace_callback and 'callback' in kwargs:
通过 IOLoop.add_future(future, callback) 将 future 加入 IOLoop
else:
try:
result = func(*args, **kwargs)
func 有什么要求,不能阻塞?
result 是 future ?
然后将被修饰函数的结果放入 future 中
如果被修饰函数的结果是 generator 类型,对这个 generator 调用 next,然后将结果放入 future 中。
_futures_to_runners[future] = Runner(result, future, yielded)
_futures_to_runners = weakref.WeakKeyDictionary()
IOLoop
Tornado 基于 epoll 实现
怎样将 socket 加入 IOLoop 监控列表?
为什么 yield future 会返回 future 的结果
Python 中 yield 的作用
通过 generator.send 将结果送进当前 yield,详细见 Python generator/yield。
具体怎么实现的就不知道了。
发现,只有在 @gen.coroutine 中,yield future 才会返回 future 中的结果,否则,会报错
BadYieldError: yielded unknown object <generator object f at 0x10faaef00>
future 嵌套
@gen.coroutine def divide(x, y): return x / y def bad_call(): # This should raise a ZeroDivisionError, but it won't because # the coroutine is called incorrectly. divide(1, 0) @gen.coroutine def good_call(): # yield will unwrap the Future returned by divide() and raise # the exception. yield divide(1, 0)
上面的例子中,good_call 返回的也是一个 future。
Most asynchronous functions in Tornado return a Future
; yielding this object returns its result
为什么要这样做?@gen.coroutine 把结果放到 future 中,yield 把结果中 future 中拿出来,两者不是抵消了吗?
另外,我返现返回了一个 future 的函数,加上 @gen.coroutine 和 yield 还会出错。比如 motor count。
什么情况下使用 @gen.coroutine 和 yield 或者 @gen.coroutine 和 return?
raise gen.Return
@gen.coroutine def divide(x, y): return x / y >>> divide(1, 1).result() >>> 1
使用 raise gen.Return
@gen.coroutine def divide2(x, y): raise gen.Return(x/y) divide2(1, 1).result() Out[16]: 1
发现原因了,是 yield 和 return 不能同时出现
class Return(Exception): """Special exception to return a value from a `coroutine`. If this exception is raised, its value argument is used as the result of the coroutine:: @gen.coroutine def fetch_json(url): response = yield AsyncHTTPClient().fetch(url) raise gen.Return(json_decode(response.body)) In Python 3.3, this exception is no longer necessary: the ``return`` statement can be used directly to return a value (previously ``yield`` and ``return`` with a value could not be combined in the same function).
return 之后还可以执行操作
try: return future finally: future = None
return 之后控制权不是交出去了吗?
回调
Interaction with callbacks
To interact with asynchronous code that uses callbacks instead of Future, wrap the call in a Task. This will add the callback argument for you and return a Future which you can yield:
@gen.coroutine def call_task(): # Note that there are no parens on some_function. # This will be translated by Task into # some_function(other_args, callback=callback) yield gen.Task(some_function, other_args)
上面的调用是什么意思?
gen.coroutine 做了什么
Any generator that yields objects from this module must be wrapped in either this decorator or
engine.
Functions with this decorator return a
Future.
当函数完成,将结果储存在 Future 中。使用 yield future 就能得到结果。
为什么 get, post 需要这个装饰器?
get, post 最后一步,一般都是 self.write()
下面两个函数效果是一样:
@gen.coroutine def f(): yield function_that_return_future() def f(): return function_that_return_future()
最后返回的都是 Future。我们选择哪种形式呢?
带有 @gen.coroutine 是更好的,读代码的时候知道这个函数会返回一个 future。
In Tornado they are normally used with
IOLoop.add_future
or by yielding them in agen.coroutine
gen.coroutine 和 asynchronous
gen.coroutine 返回一个 future,调用 yield 等待其完成;
asynchronous 完成后,需要一个 callback 函数处理。
阻塞
To minimize the cost of concurrent connections, Tornado uses a single-threaded event loop. This means that all application code should aim to be asynchronous and non-blocking because only one operation can be active at a time.--http://www.tornadoweb.org/en/stable/guide/async.html#asynchronous-and-non-blocking-i-o
上面写应用的代码需要异步和非阻塞,为什么又有一个调用阻塞函数的办法,用在什么地方?
The simplest way to call a blocking function from a coroutine is to use a
ThreadPoolExecutor
, which returnsFutures
that are compatible with coroutines:来源:http://www.tornadoweb.org/en/stable/guide/coroutines.html#calling-blocking-functions
thread_pool = ThreadPoolExecutor(4) @gen.coroutine def call_blocking(): yield thread_pool.submit(blocking_func, args)
根据 Python Tornado - Confused how to convert a blocking function into a non-blocking function:
If the blocking function is CPU-bound (as your for/xrange example is), then threads (or processes) are the only way to make it non-blocking. Creating a thread per incoming request is expensive, but making a small ThreadPoolExecutor to handle all CPU-bound operations is not.
To make a function non-blocking without using threads, the function must be event-driven: it must be waiting on some external event (such as network I/O) so that it can be awoken when that event occurs.
上面两个都是返回 future,与 coroutine 交互是否都需要返回 future?为什么?
根据上文,future 是一 Future 的 instance,在函数运行完后的回调中,设置函数的结果为 future 的一个属性。如果使用了 gen.coroutine 与 yield,那么 yield 会返回函数的结果而不是 future。我们可以从 future 中得到结果,那么为什么不直接返回结果呢?
处理阻塞代码的方法
1. Find a coroutine-friendly equivalent.
For time.sleep
, use tornado.gen.sleep
instead
2. Find a callback-based equivalent.
the Tornado wiki can be useful to find suitable libraries.
class CoroutineTimeoutHandler(RequestHandler): @gen.coroutine def get(self): io_loop = IOLoop.current() for i in range(5): print(i) yield gen.Task(io_loop.add_timeout, io_loop.time() + 1)
IOLoop.add_timeout(deadline, callback, *args, **kwargs)
在 deadline 这个时间点运行 callback 函数
Adapts a callback-based asynchronous function for use in coroutines.
Takes a function (and optional additional arguments) and runs it with those arguments plus a
callback
keyword argument. The argument passed to the callback is returned as the result of the yield expression.
yield gen.Task(io_loop.add_timeout, io_loop.time() + 1) 中哪里有 callback ?
这个 callback 是 gen.Task 自己加入的,不是我们传入的。作用是返回 Future。
3. Run the blocking code on another thread.
MongoDB 异步
使用 motor 使 MongoDB 异步。
一个请求就建立一个 MotorClient 还是多个请求公用一个 MotorClient ?
只建立一个与 MongoDB 的连接:
It is a common mistake to create a new client object for every request; this comes at a dire performance cost. Create the client when your application starts and reuse that one client for the lifetime of the process, as shown in these examples.
来源:https://motor.readthedocs.io/en/stable/tutorial-tornado.html#tornado-application-startup-sequence
关闭连接
为什么使用 tornado.web.asynchronous
时,write() 之后连接不会自动关闭,而使用 @tornado.gen.coroutine 就会自动关闭?
日志
异常处理
所有的异常都返回 json 形式,通过自定义 write_error
If a handler raises an exception, Tornado will call
RequestHandler.write_error
to generate an error page.tornado.web.HTTPError
can be used to generate a specified status code; all other exceptions return a 500 status.
运行与部署
为什么 Django 需要 WSGI 而 Tornado 不需要?
部署方式:
只开一个;
开 CPU 核心数目个应用,并监听不同端口,然后使用 Nginx 负载均衡;
通过命令行或配置文件运行
tornado.options 是一个全局对象
1. 定义需要的 options,比如 host 表示需要监听的主机、port 表示需要监听的端口号,定义之后就会添加到全局对象 tornado.options 中;
2. 调用 parse_command_line,把命令行中传入的值传入到对应的 tornado.options 中;
3. 引用 tornado.options。
在新建 Application 时引用,比如
tornado.web.Application(urls, default_host=tornado.options.host)
在 handlers 中引用。因为 tornado.options 是全局变量。
Tornado Server 配置
一般是使用
application = tornado.web.Application(urls, default_host=options.host) application.listen(options.port, xheaders=True, max_body_size=MAX_BODY_SIZE,)
application.listen 相当于
server = HTTPServer(app) server.listen(port) IOLoop.current().start()
所以 listen() 的参数与 HTTPServer 的参数一样。
xheader
If
xheaders
isTrue
, we support theX-Real-Ip
/X-Forwarded-For
andX-Scheme
/X-Forwarded-Proto
headers, which override the remote IP and URI scheme/protocol for all requests.
这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。
ssl_options
HTTPServer 支持 HTTP1ConnectionParameters 参数。
no_keep_alive (bool) – If true, always close the connection after one request.
chunk_size (int) – how much data to read into memory at once
max_header_size (int) – maximum amount of data for HTTP headers
header_timeout (float) – how long to wait for all headers (seconds)
max_body_size (int) – maximum amount of data for body
body_timeout (float) – how long to wait while reading body (seconds)
decompress (bool) – if true, decode incoming Content-Encoding: gzip
IOLoop
IOLoop.current() and IOLoop.instance()
IOLoop.instance()
Most applications have a single, global
IOLoop
running on the main thread. Use this method to get this instance from another thread. In most other cases, it is better to usecurrent()
to get the current thread’sIOLoop
.
IOLoop.current(instance=True)
返回当前线程的 IOLoop
If an
IOLoop
is currently running or has been marked as current bymake_current
, returns that instance. If there is no currentIOLoop
, returnsIOLoop.instance()
(i.e. the main thread’sIOLoop
, creating one if necessary) ifinstance
is true.
总结
是一个全局的变量。如果你有多个线程,要所有的线程公用一个 IOLoop,那么使用 IOLoop.instance()。
In general you should use
IOLoop.current
as the default when constructing an asynchronous object, and useIOLoop.instance
when you mean to communicate to the main thread from a different one.
文档中提到,少量的情况下你需要使用多个 IOLoop,比如每个线程一个 IOLoop,那么如何做到呢?
The vast majority of Tornado apps should have only one IOLoop, running in the main thread. You can run multiple HTTPServers (or other servers) on the same IOLoop.
It is possible to create multiple IOLoops and give each one its own thread, but this is rarely useful, because the GIL ensures that only one thread is running at a time. If you do use multiple IOLoops you must be careful to ensure that the different threads only communicate with each other through thread-safe methods (i.e. IOLoop.add_callback).
Config file
Your main()
method can parse the command line or parse a config file with either:
tornado.options.parse_command_line()
# or
tornado.options.parse_config_file("/etc/server.conf")
Command line formats are what you would expect (--myoption=myvalue
). Config files are just Python files. Global names become options, e.g.:
myoption = "myvalue"
myotheroption = "myothervalue"
config 文件的路径通过自定义命令行参数读取
安全和验证
使用 IP 白名单
在 application.listen() 中设置 xheader=True,这样确保了 request.remote_ip 是用户的真实 IP 而不是代理的 IP。
我选择覆盖 RequestHandler.prepare 方法来实现。