zoukankan      html  css  js  c++  java
  • Tornado异步非阻塞及自定义异步非阻塞

    一、Tornado异步非阻塞

    一般的web框架,可以分为两类:
    阻塞式:(Django,Flask,Tornado,Bottle)
    一个请求到来未处理完成,后续一直等待
    解决方案:多线程或多进程

    异步非阻塞(存在IO请求):Tornado (单进程+单线程)

    - 使用
    - @gen.coroutine
    - yield Future对象
    1.简单的异步例子
    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            print('开始')
            future = Future()
            #从当前时间开始夯住10S,当时间结束执行 self.doing方法
            tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing)
            yield future
    
        def doing(self, *args, **kwargs):
            self.write('async')
            #关闭连接
            self.finish()
    
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    

      

    在内部中,Select循环检测Future,当有返回值时,Future内部会作出变化,此时即可释放向自己发送请求的链接

    2.作为中转,接收用户对自己的请求,而自己需要向另一个API发送请求处理,在发送请求的过程中,占用大量时间,这时候就可以使用异步非阻塞的形式。

    比如一个购物网站,用户向自己发来购买订单,自己再向预定的后台处理API发送处理请求,在这段时间内可以接收其它用户发来的订单请求,这将大大提升处理的效率。

    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from tornado import httpclient
    
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            print('收到订单')
            #开启一个非阻塞HTTP客户端对象
            http = httpclient.AsyncHTTPClient()
            # 执行请求,异步返回HTTPResponse。在向另一url请求的过程中,可以继续接收其它对自己的请求
            yield http.fetch("http://www.github.com", self.done,)
    
        def done(self, response):  #异步方法接收HTTPResponse,并可以继续执行其它代码
            print(response)
            self.write('订单成功')
            #关闭此连接
            self.finish()
    
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    

      

    fetchrequestcallback = Noneraise_error = True** kwargs [source]

    执行请求,异步返回HTTPResponse

    请求可以是字符串URL或HTTPRequest对象。如果它是一个字符串,我们构造一个HTTPRequest使用任何额外的kwargs:HTTPRequest(request, **kwargs)

    该方法return一个Future结果是一个 HTTPResponse。默认情况下,Future将引发一个HTTPError,如果该请求返回一个非200响应代码(其他错误也可以得到提升,如果服务器无法联系)。相反,如果raise_error设置为False,则无论响应代码如何,都将始终返回响应。

    如果callback给出一个,它将被调用HTTPResponse。在回调界面中,HTTPError不会自动提出。相反,您必须检查响应的error属性或调用其rethrow方法。

    3.模拟异步非阻塞执行的流程:

    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from tornado import httpclient
    
    fu = None
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            #将fu设置为全局变量
            global fu
            print('疯狂的追求')
            fu = Future()
            #添加一个回调,如果完成运行并且其结果可用时,它将作为其参数被调用
            fu.add_done_callback(self.done)
            yield fu
    
        def done(self, response):
            self.write('终于等到你')
            self.finish()
    
    
    class TestHandler(RequestHandler):
        def get(self):
        #手动设置Future结果 fu.set_result(666) self.write('我只能帮你到这里了') application = tornado.web.Application([ (r"/index", IndexHandler), (r"/test", TestHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()

      

    当return一个Future时,可以添加一个回调函数,而这个回调函数,只有在请求成功接收到结果时才会执行,否则将一直夯住,而Future内部则是通过set_result()方法使其得到一个成功返回的信号从而执行回调。正常情况下set_result()的值是正确的返回结果,这里通过伪造结果达到异步的效果。done()方法中的response即set_result()的值。

    3.1 多线程自动执行:

    import tornado.ioloop
    import tornado.web
    from tornado.web import RequestHandler
    from tornado import gen
    from tornado.concurrent import Future
    import time
    from threading import Thread
    
    def waiting(future):
        import time
        time.sleep(10)
        #10S后执行set_result使回调生效。
        future.set_result(666)
    
    class IndexHandler(RequestHandler):
        @gen.coroutine
        def get(self):
            global fu
            print('疯狂的追求')
            fu = Future()
            fu.add_done_callback(self.done)
            #开启线程执行waiting函数,
            thread = Thread(target=waiting,args=(fu,))
            thread.start()
    
            yield fu
    
        def done(self, response):
            self.write('终于等到你')
            self.finish()
    
    
    application = tornado.web.Application([
        (r"/index", IndexHandler),
    ])
    
    if __name__ == "__main__":
        application.listen(8888)
        tornado.ioloop.IOLoop.instance().start()
    

      

     4.使用Tornado异步非阻塞模拟用户登录

    import tornado.web
    from tornado import gen
    import tornado_mysql
    
    #必须要使用支持的类库,如tornado_mysql ,sqlalchemy以及pymysql都不支持
    @gen.coroutine
    def get_user(user):
        #连接时会有IO耗时
        conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb',
                                           charset='utf8')
        cur = conn.cursor()
        #查询时也会有IO耗时
        # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,))
        yield cur.execute("select sleep(10)")
        row = cur.fetchone()
        cur.close()
        conn.close()
        #gen.Task特有的return方式,通过捕捉异常获取数据库查询的值
        raise gen.Return(row)
    
    class LoginHandler(tornado.web.RequestHandler):
        def get(self, *args, **kwargs):
            self.render('login.html')
    
        @gen.coroutine
        def post(self, *args, **kwargs):
            user = self.get_argument('user')
            #yield一个Task任务,封装数据库查询的函数,当查询遇到IO操作时不会阻塞,可以接收其它请求。
            data = yield gen.Task(get_user, user)
            if data:
                print(data)
                self.redirect('http://www.baidu.com')
            else:
                self.render('login.html')
    

      

    二、自定义简易非阻塞框架

    1.socket服务端基本(阻塞式):

    import socket
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("127.0.0.1", 9999,))
    sock.listen(5)
    
    while True:
        client, address = sock.accept()
        data = client.recv(8096)
        print(data)
        # /r/n/r/n
        # 请求头中URL
        # 请求体中数据
        # URL去路由关系中匹配 (请求体中数据)
        # response = func()
        import time
        time.sleep(10)
        client.sendall(b'uuuuuuuuuuuuuu')
        client.close()
    

      

    2.应用select模拟非阻塞web框架的基本流程:

    import socket
    import select
    
    def f1(request):
        return "内容1"
    
    def f2(request):
        return "内容2"
    
    #路由匹配系统
    urls = [
        ('/index.html',f1),
        ('/home.html',f2),
    ]
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    #设置非阻塞
    sock.setblocking(False)
    sock.bind(("127.0.0.1", 9999,))
    sock.listen(5)
    
    input_list = [sock,]
    
    while True:
        """
        客户端第一次建立连接以及后续的请求发来数据都会对rlist有变动,对rlist进行循环,
        如果等于sock即是建立连接的请求,否则就是客户端后续发来数据,因此进行不同的操作
        """
        rlist,wlist,elist = select.select(input_list,[],[],0.005)
        for sk in rlist:
            # 有新人来连接:sock  进行accept,接收连接,等待客户来连接
            if sk == sock:
                client, address = sock.accept()
                client.setblocking(False)
                input_list.append(client)  #将客户端连接添加到循环列表中
            # 老人发来数据: client  建立连接并接收数据
            else:
                data = sk.recv(8096)
                # /r/n/r/n  (分割请求头跟请求体)
                # 请求头中URL
                # 请求体中数据      (提取URL跟数据)
                # URL去路由关系中匹配 (请求体中数据)
                # response = func()
                #根据数据匹配URL,服务器做相应的视图处理,处理后的数据返回给客户端
                response = f1(data)
    
                sk.sendall(response.encode('utf-8'))
                sk.close()
    

      

    3.简单的web框架基本实现原理

    import re
    import socket
    import select
    import time
    
    class Future(object):
        """
        异步非阻塞模式时封装回调函数以及是否准备就绪
        """
        def __init__(self, callback):
            self.callback = callback
            self._ready = False
            self.value = None
    
        def set_result(self, value=None):
            self.value = value
            self._ready = True
    
        @property
        def ready(self):
            return self._ready
    
    class HttpResponse(object):
        """
        封装响应信息
        """
        def __init__(self, content=''):
            self.content = content
    
            self.headers = {}
            self.cookies = {}
    
        def response(self):
            return bytes(self.content, encoding='utf-8')
    
    class HttpNotFound(HttpResponse):
        """
        404时的错误提示
        """
        def __init__(self):
            super(HttpNotFound, self).__init__('404 Not Found')
    
    class HttpRequest(object):
        """
        用户封装用户请求信息
        """
        def __init__(self, conn):
            """
    
            :param conn: 客户端的连接
            """
            self.conn = conn
    
            self.header_bytes = bytes()
            self.body_bytes = bytes()
    
            self.header_dict = {}
    
            self.method = ""
            self.url = ""
            self.protocol = ""
    
            self.initialize()
            self.initialize_headers()
    
        def initialize(self):
    
            header_flag = False
            while True:
                try:
                    received = self.conn.recv(8096)
                except Exception as e:
                    received = None
                if not received:
                    break
                if header_flag:
                    self.body_bytes += received
                    continue
                temp = received.split(b'
    
    ', 1)
                if len(temp) == 1:
                    self.header_bytes += temp
                else:
                    h, b = temp
                    self.header_bytes += h
                    self.body_bytes += b
                    header_flag = True
    
        @property
        def header_str(self):
            return str(self.header_bytes, encoding='utf-8')
    
        def initialize_headers(self):
            headers = self.header_str.split('
    ')
            first_line = headers[0].split(' ')
            if len(first_line) == 3:
                self.method, self.url, self.protocol = headers[0].split(' ')
                for line in headers:
                    kv = line.split(':')
                    if len(kv) == 2:
                        k, v = kv
                        self.header_dict[k] = v
    
    
    class Snow(object):
        """
        微型Web框架类,
        """
        def __init__(self, routes):
            """
    
            :param routes: url路由匹配
            """
            self.routes = routes
            self.inputs = set()
            self.request = None
            self.async_request_handler = {}
    
        def run(self, host='localhost', port=9999):
            """
            事件循环
            :param host: IP地址
            :param port: 端口
            :return:
            """
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind((host, port,))
            sock.setblocking(False)
            sock.listen(128)
            sock.setblocking(0)
            self.inputs.add(sock)
            try:
                while True:
                    readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
                    for conn in readable_list:
                        if sock == conn:
                            # 连接
                            client, address = conn.accept()
                            client.setblocking(False)
                            self.inputs.add(client)
                        else:
                            # 发送请求
                            # HttpResponse
                            # 生成器
                            """
                            gen=HttpResponse对象,执行了三件事,
                1.HttpRequest类,将客户端拿到,分割请求头与请求体,将请求头信息处理成header_dict {'method':xxx,'url':xxx}
                2.循环路由,实例化HttpRequest类, .url与其进行匹配,匹配成功将self.fun 赋值称为路由系统填写的func函数。
                3.处理func视图,返回2种类型,HttpResponse对象或者Future对象。如果是HttpResponse()对象,封装了将字符串转换
                成字节的方法,即还是一种阻塞式,即一般的如Django类web框架的处理流程,
                            """
                            gen = self.process(conn)
                            if isinstance(gen,HttpResponse):
                                conn.sendall(gen.response())
                                self.inputs.remove(conn)
                                conn.close()
                            else:
                                yielded = next(gen)
                                self.async_request_handler[conn] = yielded
    
                    for conn,fur in self.async_request_handler.items():
                        if not fur.ready:
                            continue
                        response = fur.callback(fur.value)
                        conn.sendall(response)
                        conn.close()
                        del self.async_request_handler[conn]
            except Exception as e:
                pass
            finally:
                sock.close()
    
        def process(self, conn):
            """
            处理路由系统以及执行函数
            :param conn:
            :return:
            """
            self.request = HttpRequest(conn)
            func = None
            for route in self.routes:
                if re.match(route[0], self.request.url):
                    func = route[1]
                    break
            if not func:
                return HttpNotFound()
            else:
                return func(self.request)
    
    
    def index(request):
        return HttpResponse('OK')
    
    def asynccccc(request):
        fur = Future(callback=done)
        yield fur
    
    def done(arg):
        pass
    
    routes = [
        (r'/index/', index),
        (r'/async/', asynccccc),
    ]
    
    if __name__ == '__main__':
        app = Snow(routes)
        app.run(port=8012)
    

      

    gen对象中,如果返回的是一个Future对象,则内部会将封装的Future对象yield给回调函数,或者说是交给中转站,由中转站向基础平台的API发送IO请求,这时不会阻塞,会在循环中继续yield新的请求,并生成一个字典,{conn:yield Future的对象},而在外层的循环中监听这个对象,一旦对象发送的IO请求处理完成,将执行set_result方法,将ready设置成True,并拿到请求接收的值即赋值给value,再把value传递给回调函数中进行相应处理, 获得一个最后的值,将conn从字典中移除,将处理后的值返回给用户。

    4.最后整理后的简单异步非阻塞web框架

    import re
    import socket
    import select
    import time
    
    
    class HttpResponse(object):
        """
        封装响应信息
        """
        def __init__(self, content=''):
            self.content = content
    
            self.headers = {}
            self.cookies = {}
    
        def response(self):
            return bytes(self.content, encoding='utf-8')
    
    
    class HttpNotFound(HttpResponse):
        """
        404时的错误提示
        """
        def __init__(self):
            super(HttpNotFound, self).__init__('404 Not Found')
    
    
    class HttpRequest(object):
        """
        用户封装用户请求信息
        """
        def __init__(self, conn):
            self.conn = conn
    
            self.header_bytes = bytes()
            self.header_dict = {}
            self.body_bytes = bytes()
    
            self.method = ""
            self.url = ""
            self.protocol = ""
    
            self.initialize()
            self.initialize_headers()
    
        def initialize(self):
    
            header_flag = False
            while True:
                try:
                    received = self.conn.recv(8096)
                except Exception as e:
                    received = None
                if not received:
                    break
                if header_flag:
                    self.body_bytes += received
                    continue
                temp = received.split(b'
    
    ', 1)
                if len(temp) == 1:
                    self.header_bytes += temp
                else:
                    h, b = temp
                    self.header_bytes += h
                    self.body_bytes += b
                    header_flag = True
    
        @property
        def header_str(self):
            return str(self.header_bytes, encoding='utf-8')
    
        def initialize_headers(self):
            headers = self.header_str.split('
    ')
            first_line = headers[0].split(' ')
            if len(first_line) == 3:
                self.method, self.url, self.protocol = headers[0].split(' ')
                for line in headers:
                    kv = line.split(':')
                    if len(kv) == 2:
                        k, v = kv
                        self.header_dict[k] = v
    
    
    class Future(object):
        """
        异步非阻塞模式时封装回调函数以及是否准备就绪
        """
        def __init__(self, callback):
            self.callback = callback
            self._ready = False
            self.value = None
    
        def set_result(self, value=None):
            self.value = value
            self._ready = True
    
        @property
        def ready(self):
            return self._ready
    
    
    class TimeoutFuture(Future):
        """
        异步非阻塞超时
        """
        def __init__(self, timeout):
            super(TimeoutFuture, self).__init__(callback=None)
            self.timeout = timeout
            self.start_time = time.time()
    
        @property
        def ready(self):
            current_time = time.time()
            if current_time > self.start_time + self.timeout:
                self._ready = True
            return self._ready
    
    
    class Snow(object):
        """
        微型Web框架类
        """
        def __init__(self, routes):
            self.routes = routes
            self.inputs = set()
            self.request = None
            self.async_request_handler = {}
    
        def run(self, host='localhost', port=9999):
            """
            事件循环
            :param host:
            :param port:
            :return:
            """
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind((host, port,))
            sock.setblocking(False)
            sock.listen(128)
            sock.setblocking(0)
            self.inputs.add(sock)
            try:
                while True:
                    readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
                    for conn in readable_list:
                        if sock == conn:
                            client, address = conn.accept()
                            client.setblocking(False)
                            self.inputs.add(client)
                        else:
                            gen = self.process(conn)
                            if isinstance(gen, HttpResponse):
                                conn.sendall(gen.response())
                                self.inputs.remove(conn)
                                conn.close()
                            else:
                                yielded = next(gen)
                                self.async_request_handler[conn] = yielded
                    self.polling_callback()
    
            except Exception as e:
                pass
            finally:
                sock.close()
    
        def polling_callback(self):
            """
            遍历触发异步非阻塞的回调函数
            :return:
            """
            for conn in list(self.async_request_handler.keys()):
                yielded = self.async_request_handler[conn]
                if not yielded.ready:
                    continue
                if yielded.callback:
                    ret = yielded.callback(self.request, yielded)
                    conn.sendall(ret.response())
                self.inputs.remove(conn)
                del self.async_request_handler[conn]
                conn.close()
    
        def process(self, conn):
            """
            处理路由系统以及执行函数
            :param conn:
            :return:
            """
            self.request = HttpRequest(conn)
            func = None
            for route in self.routes:
                if re.match(route[0], self.request.url):
                    func = route[1]
                    break
            if not func:
                return HttpNotFound()
            else:
                return func(self.request)
    

      

    使用:

    基本使用(没有使用异步非阻塞):

    from s4 import Snow
    from s4 import HttpResponse
    
    def index(request):
        return HttpResponse('OK')
    
    routes = [
        (r'/index/', index),
    ]
    
    app = Snow(routes)
    app.run(port=8012) 

    手动的异步非阻塞使用

    from s4 import Snow
    from s4 import HttpResponse
    from s4 import Future
    
    request_list = []
    
    def callback(request, future):
        return HttpResponse(future.value)
    
    def req(request):
        print('请求到来')
        obj = Future(callback=callback)
        request_list.append(obj)
        yield obj
    
    def stop(request):
        obj = request_list[0]
        obj.set_result('done')
        del request_list[0]
        return HttpResponse('stop')
    
    routes = [
        (r'/req/', req),
        (r'/stop/', stop),
    ]
    
    app = Snow(routes)
    app.run(port=8012)
    

      

    由于我们在最外层循环了一个{conn:future}的字典,所以需要手动去对stop视图函数发送请求,由它进行set_result,如果是自动的方法,就是将这个conn添加到select中,由select进行监听并set_result,

    达到程序自动返回的效果。

  • 相关阅读:
    学算法的那些年,吴师兄接触的网站、软件、视频、书籍大揭秘
    阮一峰:CSS Modules 用法教程
    截取url参数
    在dotnet core实现类似crontab的定时任务
    开源一个基于dotnet standard的轻量级的ORM框架-Light.Data
    ABP Vnext使用mysql数据库
    实现ElementUI Dialog宽度响应式变化
    使用Vue Baidu Map对百度地图实现输入框搜索定位
    使用Docker搭建HttpRunnerManager环境
    SpringBoot集成spring aop开发
  • 原文地址:https://www.cnblogs.com/mitsui/p/7543089.html
Copyright © 2011-2022 走看看