1.gen.coroutine的作用
自动执行生成器
2.Future对象
在介绍异步使用之前,先了解一下Future对象的作用。
Future简单可以理解为一个占位符,将来会执行的对象,类似javascript中的promise对象,是实现异步的关键。
class Future(object):
def __init__(self):
self._callback = []
self._result = None
self._done = False
def set_callback(self, cb):
self._callback.append(cb)
def _run_callback(self):
for cb in self._callback:
cb()
def set_result(self, result)
self._done = True
self._result = result
self._run_callback()
def is_ready(self):
return self._done is True
_result:返回结果值
_done:是否完成
_callback:完成后,执行的回调列表
set_result():赋值result,Future对象完成,执行回调。
3.callback实现异步
from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler
class Test1Handler(RequestHandler):
def get(self, *args, **kwargs):
http_client = AsyncHTTPClient()
http_client.fetch('www.baidu.com', callback = self.on_fetched)
print('done')
def on_fetched(self, response):
print('response')
运行结果:
源码分析:
def fetch(self, request, callback=None, raise_error=True, **kwargs):
if callback is not None:
def handle_future(future):
response = future.result()
self.io_loop.add_callback(callback, response)
future.add_done_callback(handle_future)
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
self.fetch_impl(request, handle_response)
return future
def fetch_impl(self, request, callback):
raise NotImplementedError()
fetch函数返回一个Future类型对象,fetch_impl()执行完毕,返回结果response作为参数,执行回调handle_response
handle_response将response赋值给future。future状态变为已完成,执行future的callback函数handle_future,handle_future将callback加入ioloop执行队列,response作为参数。
由ioloop调度完成callback。
关键点在于,Future占位符控制了什么时候执行回调。
3.gen.coroutine实现异步
from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler
from tornado import gen
class Test1Handler(RequestHandler):
@gen.coroutine
def get(self, *args, **kwargs):
http_client = AsyncHTTPClient()
response = yield http_client.fetch('http://www.baidu.com')
print('response')
运行结果:
当执行到yield 表达式时,表达式会返回一个Future占位符,然后返回,当表达式执行完毕后,自动继续执行生成器。
关键点在于,gen.coroutine使生成器可以自动执行。
源码分析:
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
def _make_coroutine_wrapper(func, replace_callback):
try:
yielded = next(result)
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
result:生成器对象
yielded:Future对象,生成器首次执行结果,如果异常StopIteration,表示生成器执行完毕,将结果设置成future的值,返回,装饰器gen.coroutine返回的为Future对象。
Runner:判断yielded是否完成,完成则执行run函数,继续执行生成器;否则,添加run函数到这个Future对象yielded,执行完毕之后,才调用run函数。
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
self.future = convert_yielded(yielded)
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done(): #执行run时generator返回的那个future必须已经有结果,否则就没必要传回到generator中了
return
self.future = None
try:
value = future.result()
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
#generator执行完毕并成功的处理
except Exception:
#generator执行过程中异常的处理
if not self.handle_yield(yielded):
return
finally:
self.running = False
handle_yield:判断Future对象yielded是否完成,未完成,注册run()函数回调到这个Future对象,完成,才调用。
run:将yielded这个Future对象的result,作为参数传递给生成器,继续执行生成器。