【Tornado异步非阻塞】
异步非阻塞
阻塞式:(适用于所有框架,Django,Flask,Tornado,Bottle)
一个请求到来未处理完成,后续一直等待
解决方案:多线程,多进程
异步非阻塞(存在IO请求): Tornado(单进程+单线程)
使用异步非阻塞,需要遵循Tornado框架内部规则,gen
多个连接请求,连接给服务端,如果是有异步非阻塞的话,服务端会接收所有的请求交由后台处理,等待其他链接的同时,原先连接不断开,直至返回后台处理完成的结果!
外部请求,连接服务端 或在select中创建Future对象,然后服务端再把请求交给业务处理平台,此时select监听的列表中又会生成一个socket对象,当业务平台对请求处理完成之后就会把信息返回到服务端的select监听列表中,同时对这个Future对象赋值,用于标记服务端是否要给客户端返回请求信息。
执行流程,本质上都是返回一个future对象,如果对这个对象被set_result了就返回值,否则就是夯住,一直保持连接,不终止请求。
1、基本使用
装饰器 + Future 从而实现Tornado的异步非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class AsyncHandler(tornado.web.RequestHandler): @gen .coroutine def get( self ): future = Future() future.add_done_callback( self .doing) yield future # 或 # tornado.ioloop.IOLoop.current().add_future(future,self.doing) # yield future def doing( self , * args, * * kwargs): self .write( 'async' ) self .finish() |
当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个 Future对象,那么Tornado会等待,等待用户向future对象中放置数据或者发送信号,如果获取到数据或信号之后,就开始执行doing方法。
异步非阻塞体现在当在Tornaod等待用户向future对象中放置数据时,还可以处理其他请求。
注意:在等待用户向future对象中放置数据或信号时,此连接是不断开的。
2、同步阻塞和异步非阻塞对比
class SyncHandler(tornado.web.RequestHandler): def get(self): self.doing() self.write('sync') def doing(self): time.sleep(10)
class AsyncHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): future = Future() tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing) yield future def doing(self, *args, **kwargs): self.write('async') self.finish()
3、httpclient类库
Tornado提供了httpclient类库用于发送Http请求,其配合Tornado的异步非阻塞使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import tornado.web from tornado import gen from tornado import httpclient # 方式一: class AsyncHandler(tornado.web.RequestHandler): @gen .coroutine def get( self , * args, * * kwargs): print ( '进入' ) http = httpclient.AsyncHTTPClient() data = yield http.fetch( "http://www.google.com" ) print ( '完事' ,data) self .finish( '6666' ) # 方式二: # class AsyncHandler(tornado.web.RequestHandler): # @gen.coroutine # def get(self): # print('进入') # http = httpclient.AsyncHTTPClient() # yield http.fetch("http://www.google.com", self.done) # # def done(self, response): # print('完事') # self.finish('666') application = tornado.web.Application([ (r "/async" , AsyncHandler), ]) if __name__ = = "__main__" : application.listen( 8888 ) tornado.ioloop.IOLoop.instance().start() |
#!/usr/bin/env python # -*- coding:utf-8 -*- """ 需要先安装支持异步操作Mysql的类库: Tornado-MySQL: https://github.com/PyMySQL/Tornado-MySQL#installation pip3 install Tornado-MySQL """ import tornado.web from tornado import gen import tornado_mysql from tornado_mysql import pools POOL = pools.Pool( dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb'), max_idle_connections=1, max_recycle_sec=3) @gen.coroutine def get_user_by_conn_pool(user): cur = yield POOL.execute("SELECT SLEEP(%s)", (user,)) row = cur.fetchone() raise gen.Return(row) @gen.coroutine def get_user(user): conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb', charset='utf8') cur = conn.cursor() # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,)) yield cur.execute("select sleep(10)") row = cur.fetchone() cur.close() conn.close() raise gen.Return(row) class LoginHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.render('login.html') @gen.coroutine def post(self, *args, **kwargs): user = self.get_argument('user') data = yield gen.Task(get_user, user) if data: print(data) self.redirect('http://www.oldboyedu.com') else: self.render('login.html') application = tornado.web.Application([ (r"/login", LoginHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
异步非阻塞渐进学习代码+笔记注释
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado.httpserver import HTTPServer # 单线程操作,请求来排队等待,顺序执行 #人为干预模拟IO设置 sleep10秒 class IndexHandler(RequestHandler): def get(self): print('开始') import time time.sleep(10) self.write("Hello, world") print('结束') application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": # 单进程单线程 application.listen(8888) tornado.ioloop.IOLoop.instance().start() # 利用多进程 实现 # server = HTTPServer(application) # server.bind(8888) # server.start(4) # Forks multiple sub-processes # tornado.ioloop.IOLoop.current().start()
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen # 执行异步IO 导入gen模块 from tornado.concurrent import Future # 执行异步IO导入 Future模块,引用Future对象 import time # 单线程 实现异步非阻塞操作!所有的连接请求不等待直接执行 class IndexHandler(RequestHandler): @gen.coroutine # 异步IO 固定写法,在请求上以装饰器的形式添加 def get(self): print('开始') future = Future() #创建 Future() 对象 tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing) #给当前的客户端添加超时时间,固定写法 yield future # yield 返回 Future() 对象 IO操作的固定写法 # 操作完成之后,需要执行的回调函数,一般是用于给请求返回消息 def doing(self, *args, **kwargs): self.write('async') # 返回消息 self.finish() #结束连接 application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": # 单线程 application.listen(8888) tornado.ioloop.IOLoop.instance().start() # 多进程 # server = HTTPServer(application) # server.bind(8888) # server.start(4) # Forks multiple sub-processes # tornado.ioloop.IOLoop.current().start()
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from tornado import httpclient #针对HTTP请求进行异步非阻塞处理的模块 # 针对API接口 HTTP请求 实现异步非阻塞 class IndexHandler(RequestHandler): @gen.coroutine def get(self): print('收到订单') http = httpclient.AsyncHTTPClient() #创建 执行异步非阻塞 客户端 yield http.fetch("http://www.github.com", self.done) # 固定写法 请求对某个API接口(url地址)传递消息,处理完毕执行回调函数 #请求处理完毕,执行的回调函数。 def done(self, response): self.write('订单成功') # 给请求返回的信息 self.finish() # 断开连接 application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start() # server = HTTPServer(application) # server.bind(8888) # server.start(4) # Forks multiple sub-processes # tornado.ioloop.IOLoop.current().start()
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from tornado import httpclient fu = None class IndexHandler(RequestHandler): @gen.coroutine def get(self): global fu print('疯狂的追求') fu = Future() # 创建Future对象,建立连接,如果没有人改变状态,请求就会永久存在连接不断开,除非Future() 对象被赋值或是反生改变 # fu.set_result("") # 给Future 对象赋值,fu发生变化,返回请求。 fu.add_done_callback(self.done)# 给fu添加要执行的回调函数 yield fu def done(self, response): self.write('终于等到你') self.finish() class TestHandler(RequestHandler): def get(self): fu.set_result(666) # 给future对象赋值,用以改变连接状态,返回消息 (注意:返回的内容就是 result的值) self.write('我只能帮你到这里了') application = tornado.web.Application([ (r"/index", IndexHandler), (r"/test", TestHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from tornado import httpclient from threading import Thread def waiting(futher): #线程要执行处理的函数 import time time.sleep(10) futher.set_result(666) class IndexHandler(RequestHandler): @gen.coroutine def get(self): global fu print('疯狂的追求') fu = Future() fu.add_done_callback(self.done) thread = Thread(target=waiting,args=(fu,)) # 开一个线程 自动给设置值,以自动给请求返回处理的消息 thread.start() yield fu def done(self, response): self.write('终于等到你') self.finish() application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
import tornado.web import tornado.ioloop from tornado import gen import tornado_mysql @gen.coroutine # 注意需要写上装饰器 def get_user(user): # 异步非阻塞,Task操作的函数,连接数据库,注意语法结构 conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='tornadoTest', charset='utf8') cur = conn.cursor() # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,)) yield cur.execute("select sleep(10)") row = cur.fetchone() cur.close() conn.close() raise gen.Return(row) # 注意task函数的返回值 class LoginHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.render('login.html') @gen.coroutine def post(self, *args, **kwargs): user = self.get_argument('user') data = yield gen.Task(get_user, user) # 执行Task函数,内部还是返回future对象。Task函数上第一个参数是要执行的函数,第二个是参数 if data: print(data) self.redirect('http://www.baidu.com') else: self.render('login.html') #原始方案,请求来了,连接数据库,等待操作完成,关闭连接! # def post(self, *args, **kwargs): # user = self.get_argument('user') # # 连接数据库: IO耗时 # # 查询语句: IO耗时 # # 获取结果 # data = {'id':1,'user':'alex'} # if data: # print(data) # self.redirect('http://www.baidu.com') # else: # self.render('login.html') application = tornado.web.Application([ (r"/login", LoginHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()