zoukankan      html  css  js  c++  java
  • 〖Python〗-- Tornado异步非阻塞

    【Tornado异步非阻塞】

    异步非阻塞

    阻塞式:(适用于所有框架,Django,Flask,Tornado,Bottle)
      一个请求到来未处理完成,后续一直等待
      解决方案:多线程,多进程

    异步非阻塞(存在IO请求): Tornado(单进程+单线程)
      使用异步非阻塞,需要遵循Tornado框架内部规则,gen

      多个连接请求,连接给服务端,如果是有异步非阻塞的话,服务端会接收所有的请求交由后台处理,等待其他链接的同时,原先连接不断开,直至返回后台处理完成的结果!
      外部请求,连接服务端 或在select中创建Future对象,然后服务端再把请求交给业务处理平台,此时select监听的列表中又会生成一个socket对象,当业务平台对请求处理完成之后就会把信息返回到服务端的select监听列表中,同时对这个Future对象赋值,用于标记服务端是否要给客户端返回请求信息。

      执行流程,本质上都是返回一个future对象,如果对这个对象被set_result了就返回值,否则就是夯住,一直保持连接,不终止请求。

    1、基本使用

    装饰器 + Future 从而实现Tornado的异步非阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class AsyncHandler(tornado.web.RequestHandler):
      
        @gen.coroutine
        def get(self):
            future = Future()
            future.add_done_callback(self.doing)
            yield future
            # 或
            # tornado.ioloop.IOLoop.current().add_future(future,self.doing)
            # yield future
      
        def doing(self,*args, **kwargs):
            self.write('async')
            self.finish()

      当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个 Future对象,那么Tornado会等待,等待用户向future对象中放置数据或者发送信号,如果获取到数据或信号之后,就开始执行doing方法。

      异步非阻塞体现在当在Tornaod等待用户向future对象中放置数据时,还可以处理其他请求。

      注意:在等待用户向future对象中放置数据或信号时,此连接是不断开的。

    2、同步阻塞和异步非阻塞对比

    class SyncHandler(tornado.web.RequestHandler):
    
        def get(self):
            self.doing()
            self.write('sync')
    
        def doing(self):
            time.sleep(10)
    同步阻塞
    class AsyncHandler(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self):
            future = Future()
            tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing)
            yield future
    
    
        def doing(self, *args, **kwargs):
            self.write('async')
            self.finish()
    异步非阻塞

    3、httpclient类库

      Tornado提供了httpclient类库用于发送Http请求,其配合Tornado的异步非阻塞使用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
      
      
    import tornado.web
    from tornado import gen
    from tornado import httpclient
      
    # 方式一:
    class AsyncHandler(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self*args, **kwargs):
            print('进入')
            http = httpclient.AsyncHTTPClient()
            data = yield http.fetch("http://www.google.com")
            print('完事',data)
            self.finish('6666')
      
    # 方式二:
    # class AsyncHandler(tornado.web.RequestHandler):
    #     @gen.coroutine
    #     def get(self):
    #         print('进入')
    #         http = httpclient.AsyncHTTPClient()
    #         yield http.fetch("http://www.google.com", self.done)
    #
    #     def done(self, response):
    #         print('完事')
    #         self.finish('666')
      
      
      
    application = tornado.web.Application([
        (r"/async", AsyncHandler),
    ])
      
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    需要先安装支持异步操作Mysql的类库: 
        Tornado-MySQL: https://github.com/PyMySQL/Tornado-MySQL#installation
        
        pip3 install Tornado-MySQL
    
    """
    
    import tornado.web
    from tornado import gen
    
    import tornado_mysql
    from tornado_mysql import pools
    
    POOL = pools.Pool(
        dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb'),
        max_idle_connections=1,
        max_recycle_sec=3)
    
    
    @gen.coroutine
    def get_user_by_conn_pool(user):
        cur = yield POOL.execute("SELECT SLEEP(%s)", (user,))
        row = cur.fetchone()
        raise gen.Return(row)
    
    
    @gen.coroutine
    def get_user(user):
        conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb',
                                           charset='utf8')
        cur = conn.cursor()
        # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
        yield cur.execute("select sleep(10)")
        row = cur.fetchone()
        cur.close()
        conn.close()
        raise gen.Return(row)
    
    
    class LoginHandler(tornado.web.RequestHandler):
        def get(self, *args, **kwargs):
            self.render('login.html')
    
        @gen.coroutine
        def post(self, *args, **kwargs):
            user = self.get_argument('user')
            data = yield gen.Task(get_user, user)
            if data:
                print(data)
                self.redirect('http://www.oldboyedu.com')
            else:
                self.render('login.html')
    
    
    application = tornado.web.Application([
        (r"/login", LoginHandler),
    ])
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    基于异步非阻塞和Tornado-MySQL实现用户登录示例

    异步非阻塞渐进学习代码+笔记注释

    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado.httpserver import HTTPServer
    
    # 单线程操作,请求来排队等待,顺序执行
    #人为干预模拟IO设置 sleep10秒
    class IndexHandler(RequestHandler):
        def get(self):
            print('开始')
            import time
            time.sleep(10)
            self.write("Hello, world")
            print('结束')
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    
    if __name__ == "__main__":
        # 单进程单线程
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    
        # 利用多进程 实现
        # server = HTTPServer(application)
        # server.bind(8888)
        # server.start(4)  # Forks multiple sub-processes
        # tornado.ioloop.IOLoop.current().start()
    s1.py
    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen # 执行异步IO 导入gen模块
    from tornado.concurrent import Future # 执行异步IO导入 Future模块,引用Future对象
    import time
    
    # 单线程 实现异步非阻塞操作!所有的连接请求不等待直接执行
    class IndexHandler(RequestHandler):
        @gen.coroutine # 异步IO 固定写法,在请求上以装饰器的形式添加
        def get(self):
            print('开始')
            future = Future() #创建 Future() 对象
            tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing) #给当前的客户端添加超时时间,固定写法
            yield future # yield 返回 Future() 对象 IO操作的固定写法
    
        # 操作完成之后,需要执行的回调函数,一般是用于给请求返回消息
        def doing(self, *args, **kwargs):
            self.write('async') # 返回消息
            self.finish() #结束连接
    
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    
    if __name__ == "__main__":
        # 单线程
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    
        # 多进程
        # server = HTTPServer(application)
        # server.bind(8888)
        # server.start(4)  # Forks multiple sub-processes
        # tornado.ioloop.IOLoop.current().start()
    s2.py
    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from tornado import httpclient #针对HTTP请求进行异步非阻塞处理的模块
    
    # 针对API接口 HTTP请求 实现异步非阻塞
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            print('收到订单')
            http = httpclient.AsyncHTTPClient() #创建 执行异步非阻塞 客户端
            yield http.fetch("http://www.github.com", self.done) # 固定写法 请求对某个API接口(url地址)传递消息,处理完毕执行回调函数
    
        #请求处理完毕,执行的回调函数。
        def done(self, response):
            self.write('订单成功') # 给请求返回的信息
            self.finish() # 断开连接
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    
        # server = HTTPServer(application)
        # server.bind(8888)
        # server.start(4)  # Forks multiple sub-processes
        # tornado.ioloop.IOLoop.current().start()
    s3.py
    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from tornado import httpclient
    
    fu = None
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            global fu
            print('疯狂的追求')
            fu = Future() # 创建Future对象,建立连接,如果没有人改变状态,请求就会永久存在连接不断开,除非Future() 对象被赋值或是反生改变
            # fu.set_result("") # 给Future 对象赋值,fu发生变化,返回请求。
            fu.add_done_callback(self.done)# 给fu添加要执行的回调函数
            yield fu
    
        def done(self, response):
            self.write('终于等到你')
            self.finish()
    
    class TestHandler(RequestHandler):
        def get(self):
            fu.set_result(666) # 给future对象赋值,用以改变连接状态,返回消息 (注意:返回的内容就是 result的值)
            self.write('我只能帮你到这里了')
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
        (r"/test", TestHandler),
    ])
    
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    s4.py
    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from tornado import httpclient
    from threading import Thread
    
    def waiting(futher):
        #线程要执行处理的函数
        import time
        time.sleep(10)
        futher.set_result(666)
    
    
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            global fu
            print('疯狂的追求')
            fu = Future()
            fu.add_done_callback(self.done)
    
            thread = Thread(target=waiting,args=(fu,)) # 开一个线程 自动给设置值,以自动给请求返回处理的消息
            thread.start()
    
            yield fu
    
        def done(self, response):
            self.write('终于等到你')
            self.finish()
    
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    s5.py
    import tornado.web
    import tornado.ioloop
    from tornado import gen
    import tornado_mysql
    
    @gen.coroutine # 注意需要写上装饰器
    def get_user(user):
        # 异步非阻塞,Task操作的函数,连接数据库,注意语法结构
        conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='tornadoTest',
                                           charset='utf8')
        cur = conn.cursor()
        # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
        yield cur.execute("select sleep(10)")
        row = cur.fetchone()
        cur.close()
        conn.close()
        raise gen.Return(row) # 注意task函数的返回值
    
    
    class LoginHandler(tornado.web.RequestHandler):
        def get(self, *args, **kwargs):
            self.render('login.html')
    
        @gen.coroutine
        def post(self, *args, **kwargs):
            user = self.get_argument('user')
            data = yield gen.Task(get_user, user) # 执行Task函数,内部还是返回future对象。Task函数上第一个参数是要执行的函数,第二个是参数
            if data:
                print(data)
                self.redirect('http://www.baidu.com')
            else:
                self.render('login.html')
    
        #原始方案,请求来了,连接数据库,等待操作完成,关闭连接!
        # def post(self, *args, **kwargs):
        #     user = self.get_argument('user')
        #     # 连接数据库: IO耗时
        #     # 查询语句: IO耗时
        #     # 获取结果
        #     data = {'id':1,'user':'alex'}
        #     if data:
        #         print(data)
        #         self.redirect('http://www.baidu.com')
        #     else:
        #         self.render('login.html')
    
    
    application = tornado.web.Application([
        (r"/login", LoginHandler),
    ])
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    s6.py
  • 相关阅读:
    发布(Windows)
    Parallel并行编程
    query通用开源框架
    深入了解三种针对文件(JSON、XML与INI)的配置源
    GitLab CI
    雅思创始人Keith Taylor谈英语学习
    查看内存使用情况
    Reverse String
    分布式消息系统jafka快速起步(转)
    深入浅出 消息队列 ActiveMQ(转)
  • 原文地址:https://www.cnblogs.com/SHENGXIN/p/7774999.html
Copyright © 2011-2022 走看看