zoukankan      html  css  js  c++  java
  • [转]Tornado get/post请求异步处理框架分析

    转自: http://m.blog.csdn.net/blog/joeyon/41956027

    首先说下环境,python2.7,Tornado3.0

    然后本文讨论的不是Tornado底层如何用epoll处理socket通信,而是如何在应用层异步处理get/post请求。下面是正文:

    同时在get或者post方法处理上应用@tornado.web.asynchronous和@tornado.gen.engine装饰器,可以非常方便的和有callback参数的异步方法配合实现非阻塞请求处理。
     
    这里不想说官方的那个例子,因为官方的例子给的本来就是用异步的http客户端来拉取数据然后自己转到回调函数执行,讨论http客户端的代码不在本文范围内。于是我用一个简单的函数来演示,且这样更能说明问题。
     
    class RegisterHandler(basehandler.BaseHandler):
        @tornado.web.asynchronous
        @tornado.gen.engine
        def get(self, *arg, **args):
            response = yield tornado.gen.Task(self.method_while)
            print "response",response
            self.finish()
       
        def method_while(self, *arg, **args):
            callback(1)
     
    这是我写的一个请求处理的Handler,get请求上加了这两个装饰器,第一个装饰器标明这个get函数是不会自动断掉输出流的,需要显式的调用finish方法。这个装饰器需要和@tornado.gen.engine一块用。
     
    重点到了,来看@tornado.gen.engine的源码
    def engine(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            runner = None
     
            def handle_exception(typ, value, tb):
                if runner is not None:
                    return runner.handle_exception(typ, value, tb)
                return False
            with ExceptionStackContext(handle_exception) as deactivate:
                gen = func(*args, **kwargs)
                if isinstance(gen, types.GeneratorType):
                    runner = Runner(gen, deactivate)
                    runner.run()
                    return
                assert gen is None, gen
                deactivate()
        return wrapper
    (我把原来有的注释删掉了)
     
    我们直接看with语句的内部就可以了:
    gen = func(*args, **kwargs)
    if isinstance(gen, types.GeneratorType):
    runner = Runner(gen, deactivate)
    runner.run()
    return

    gen就是我们的get方法,gen=fun(*args,**kwargs)相当于gen=get(*args,**kwargs),get方法执行结果返回给gen?no~no!get方法里有yield,意味着get(*args,**kwargs)返回的是一个生成器类型GeneratorType的数据,是不会马上执行的,需要调用他的next或者send方法让其执行到下一个yield方法处。关于生成器以及它的send和next的具体用法不在本文的讨论范围内,请查阅官方文档。
     
    可以看到它new了一个Runner,gen传了进去,然后执行run方法,我们跟进Runner类的run方法的代码:
    def run(self ):
            if self.running or self.finished:#判断是否是在运行中或者已经结束,如果是立即返回
                return
            try:
                self.running = True #让状态为运行中
                while True:#进入循环
                    if self.exc_info is None:#上次循环没有异常
                        try:
                            if not self.yield_point.is_ready():#判断key是否可用
                                return
                            next = self.yield_point.get_result()#获取yield表达式的结果给next
                        except Exception:
                            self.exc_info = sys.exc_info()
                    try:
                        if self.exc_info is not None:
                            self.had_exception = True
                            exc_info = self.exc_info
                            self.exc_info = None
                            yielded = self.gen.throw(*exc_info)
                        else:
                            yielded = self.gen.send(next)#如果上次没异常,get函数继续执行,这里把Task对象返回给yielded,相当于要迭代使用这个Task(最开头标红的代码段),每次执行到yield都要把task重新赋给yielded
                    except StopIteration:#如果没有可以执行的了,返回
                        self.finished = True
                        if self.pending_callbacks and not self.had_exception:
                            raise LeakedCallbackError(
                                "finished without waiting for callbacks %r" %
                                self.pending_callbacks)
                        self.deactivate_stack_context()
                        self.deactivate_stack_context = None
                        return
                    except Exception:
                        self.finished = True
                        raise
                    if isinstance(yielded, list):
                        yielded = Multi(yielded)
                    if isinstance(yielded, YieldPoint):#如果yielded为Task类型(Task继承于YieldPoint)
                        self.yield_point = yielded
                        try:
                            self.yield_point.start(self)#执行task的start方法
                        except Exception:
                            self.exc_info = sys.exc_info()
                    else:
                        self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
            finally:
                self.running = False
     
    具体步骤可以详见上述代码中的注释,这里我们就继续看task的start方法:
    class Task(YieldPoint):
    (省略其他代码)
        def start(self, runner):
            self.runner = runner
            self.key = object()
            runner.register_callback( self.key)
            self.kwargs["callback"] = runner.result_callback(self.key)
            self.func(*self.args, ** self.kwargs)
    (省略其他代码)
     
     
    这里的func方法就是最上面的method_while,传了一个callback参数进去:runner.result_callback( self.key),然后执行该方法。为了讨论这个callback参数的作用,我们假设这个method_while方法里,没有用到这个参数,即没有执行callback:
    start方法在method_while执行完后就返回了,继续执行Runner的run方法里的那个循环的第二遍,它会直接在这一行
    if not self.yield_point.is_ready():#判断key是否可用
         return
    直接返回,原因就是在results 里这个key对应的值为None。
     
    这里涉及到了两个名词,results 和key,result是在哪里的呢?是Runner的一个字段,从下面的Runner的构造函数里可以看到results是个字典 :
    class Runner(object):
        def __init__(self, gen, deactivate_stack_context):
            (省略其他代码)
                   self.pending_callbacks = set()
                   self.results = {}
           (省略其他代码)
     
    那么key是怎么设置到results这个字典里的呢?是在Task的start方法里,runner.register_callback( self.key),具体代码可以再看上面的Task类。这个register_callback是有点绕的,它将key设置到pending_callbacks里面,这个pending_callbacks是一个set类型,即不可重复的集合,有这个集合的作用是为了每次都要从这里面判断key是否存在,key存在说明有个Task要执行。在上面的run方法中的判断key是否可用的时候,is_ready每次都会先判断key是否在这个pending_callbacks,没有直接报异常,有的话才会去results去取结果。我们可以看到results一直是空啊,什么时候设置的值呢??就是刚刚我们在method_while里没有执行的callback方法,即runner.result_callback( self.key):
    def result_callback(self , key):
            def inner(*args, **kwargs):
                if kwargs or len(args) > 1:
                    result = Arguments(args, kwargs)
                elif args:
                    result = args[ 0]
                else:
                    result = None
                self.set_result(key, result)
            return inner
     
    可以看出来,他把callback实际调用时候的参数列表又写进了results里了,返回给yield表达式的值response。所以我们发现,一定要在自定义的方法中执行参数中的callback,把你想返回的数据写进callback的参数里,yield表达式的值就会是这个参数了,然后run方法继续执行到下一个yield直到get方法完成。
     
    比如最上面那个例子,打印结果为:
    response 1
     
    最后来说这个异步方案有什么用呢?
    一说到异步,新手肯定会以为是get/post方法阻塞了也没关系,一个get请求过来,get处理函数如果阻塞了,是必然导致整个服务器阻塞的!不要以为这个是相对web前端请求的并行异步解决方案,单线程的Tornado毕竟是单线程,方法阻塞,必然导致服务器阻塞,毋庸置疑的。
     
    这个异步是相对与get请求来说的,get方法在没有加装饰器@tornado.web.asynchronous的情况下,get方法结束后,请求会自动断开,但是加上这个装饰器后,get请求可以直接return且保持连接,直到显式的调用finish方法才会关闭输出流。这样就萌生了异步(异步:把事情交给除了自己之外的人来做,自己不管)解决方案,get方法直接return,交给别的函数来做,做完了再回来。加上@tornado.gen.engine只是把上面这个思想实现了下,将代码浓缩到最小而已,这并不表示异步处理的函数或者这个get里有死循环的话也能同时让服务器处理别的请求,你查询数据库花费10000s,那么你的服务器就肯定要死10000s的,这种问题的解决方法有四个:crouchDB(有http接口,异步httpclient调用),优化当前数据库,异步httpclient调webservice,开线程。推荐第二个第三个,其次第四个,最后第一个(原因不解释,貌似crouchDB口碑不好,我也不爱用,mongo党飘过)。
     
    就算是异步httpclient,就像下面这样
    def handle_request(response):
        if response.error:
            print "Error:", response.error
        else:
            print response.body
    
    http_client = AsyncHTTPClient()
    http_client.fetch("http://www.google.com/", handle_request)
    
    也只是fetch过后不管socket如何拉数据而已,回调也是要在本线程执行的!在别处如果有耗时操作或者死循环的,它拉过来的数据是永远进不到handle_request函数中的。
  • 相关阅读:
    LeetCode Weekly Contest 266
    DMS(dede) 如何设置二级域名
    PHP 主流产品
    SQL 学习历程
    Javascript 动态时间
    浏览器对象模型
    Javascript 或运算的判断小问题
    CRISPDM数据挖掘标准化流程简析[一] project understanding部分(guide to Intelligent data analysis学习笔记)
    美国大选数据挖掘相关论文笔记(A 61millionperson experiment in social influence and political mobilization)
    数据挖掘和统计学的区别(guide to Intelligent data analysis学习笔记)
  • 原文地址:https://www.cnblogs.com/ymy124/p/4921052.html
Copyright © 2011-2022 走看看