zoukankan      html  css  js  c++  java
  • Tornado源码分析之事件循环

    hello world

    #!/usr/bin/env python
    import tornado.httpserver
    import tornado.ioloop
    import tornado.options
    import tornado.web
    from tornado.options import define, options
    define("port", default=8888, help="run on the given port", type=int)
    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            self.write("Hello, world")
    def main():
        tornado.options.parse_command_line()
        application = tornado.web.Application([
            (r"/", MainHandler),
        ])
        http_server = tornado.httpserver.HTTPServer(application)
        http_server.listen(options.port)
        tornado.ioloop.IOLoop.instance().start()
    if __name__ == "__main__":
        main()
    

    tornado提供了高效的异步机制,我们先不管Application实例化过程,以及http_server创建socket、bind、listen的过程,直接调IOLoop.instance().start进行源码分析。

    IOLoop.instance()

        @classmethod
        def instance(cls):
            """Returns a global IOLoop instance.
            Most single-threaded applications have a single, global IOLoop.
            Use this method instead of passing around IOLoop instances
            throughout your code.
            A common pattern for classes that depend on IOLoops is to use
            a default argument to enable programs with multiple IOLoops
            but not require the argument for simpler applications:
                class MyClass(object):
                    def __init__(self, io_loop=None):
                        self.io_loop = io_loop or IOLoop.instance()
            """
            if not hasattr(cls, "_instance"):
                cls._instance = cls()
            return cls._instance
    

    显然是一个单例模式,注意tornado中的注释,大多数单线程只能有一个ioloop。

    start()

    这个函数将开始事件循环。

    def start():
         """
         Starts the I/O loop.
         The loop will run until one of the I/O handlers calls stop(), which
          will make the loop stop after the current event iteration completes.
          """
            # 判断是否设置了,如果是,将直接退出。
            if self._stopped:
                self._stopped = False
                return
            self._running = True
            while True:
                # Never use an infinite timeout here - it can stall epoll
                # 设置轮询时间
                poll_timeout = 0.2
                # Prevent IO event starvation by delaying new callbacks
                # to the next iteration of the event loop.
                callbacks = self._callbacks
                self._callbacks = []
                # 及时调用回调函数
                for callback in callbacks:
                    self._run_callback(callback)
                if self._callbacks:
                    poll_timeout = 0.0
                # 如果设置了超时时间
                if self._timeouts:
                    # 获取当前时间
                    now = time.time()
                    while self._timeouts and self._timeouts[0].deadline <= now:
                        timeout = self._timeouts.pop(0)
                        self._run_callback(timeout.callback)
                    if self._timeouts:
                        milliseconds = self._timeouts[0].deadline - now
                        poll_timeout = min(milliseconds, poll_timeout)
                # 再一次检查事件循环是否在运行
                if not self._running:
                    break
                # 目前不清楚作用
                if self._blocking_signal_threshold is not None:
                    # clear alarm so it doesn't fire while poll is waiting for
                    # events.
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
                try:
                    # 开始等待事件发生
                    # _impl初始化和poll源代码见下面
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception, e:
                    # Depending on python version and IOLoop implementation,
                    # different exception types may be thrown and there are
                    # two ways EINTR might be signaled:
                    # * e.errno == errno.EINTR
                    # * e.args is like (errno.EINTR, 'Interrupted system call')
                    if (getattr(e, 'errno', None) == errno.EINTR or
                        (isinstance(getattr(e, 'args', None), tuple) and
                         len(e.args) == 2 and e.args[0] == errno.EINTR)):
                        continue
                    else:
                        raise
                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)
                # Pop one fd at a time from the set of pending fds and run
                # its handler. Since that handler may perform actions on
                # other file descriptors, there may be reentrant calls to
                # this IOLoop that update self._events
                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        # 见下面的分析
                        self._handlers[fd](fd, events)
                    except (KeyboardInterrupt, SystemExit):
                        raise
                    except (OSError, IOError), e:
                        if e.args[0] == errno.EPIPE:
                            # Happens when the client closes the connection
                            pass
                        else:
                            logging.error("Exception in I/O handler for fd %d",
                                          fd, exc_info=True)
                    except:
                        logging.error("Exception in I/O handler for fd %d",
                                      fd, exc_info=True)
            # reset the stopped flag so another start/stop pair can be issued
            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
    

    我们看看输入localhost:8888,event_pairs的值:

    可以看出,event_pairs是一个元组列表,其中第一个成员4为accept套接字值,1表示为事件类型。我们看看事件类型为:

     _EPOLLIN = 0x001
     _EPOLLPRI = 0x002
     _EPOLLOUT = 0x004  
     _EPOLLERR = 0x008
     _EPOLLHUP = 0x010
     _EPOLLRDHUP = 0x2000
     _EPOLLONESHOT = (1 << 30)
     _EPOLLET = (1 << 31)
     NONE = 0
     READ = _EPOLLIN
     WRITE = _EPOLLOUT
     ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
    

    可见上述,是文件描述符4,可读事件发生。

    self._impl

    我们跟踪self._impl初始化过程,可以看到事件循环核心epoll是如何被使用的。在IOLoop实例化开始:

    class IOLoop(object):
        def __init__(self, impl = None):
            self._impl = impl or _poll
    
    # Choose a poll implementation. Use epoll if it is available, fall back to
    # select() for non-Linux platforms
    # hasattr(object, attrname)表示某个对象中是否包含属性
    if hasattr(select, "epoll"):
        # Python 2.6+ on Linux
        # 在linux上使用的是select.epoll
        _poll = select.epoll
    elif hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        _poll = _KQueue
    else:
        try:
            # Linux systems with our C module installed
            import epoll
            _poll = _EPoll
        except:
            # All other systems
            import sys
            if "linux" in sys.platform:
                logging.warning("epoll module not found; using select()")
            _poll = _Select
    

    从上面的代码中,可以看到,_poll是对于多个平台下epoll、_kQueue的抽象。看一下select.epoll下的返回结果:其返回对象是一个边沿触发的polling对象,当然也可以用作水平触发。

    返回的select.epoll对象的方法:

    • epoll.close() 关闭epoll fd文件描述符
    • epoll.fileno() 返回epoll fd文件描述符只
    • epoll.register(fd, eventmask) 注册fd某个事件
    • epoll.poll([timeout = -1, maxevents = -1]) wait for events. timeout in seconds

    self._handlers[fd](fd, events)

    显然,self._handlers[fd]是返回一个回调函数,用来处理fd上的事件events,这里测试的fd为4,事件EPOLLIN。我们来跟踪一下self._handlers变化过程。看看在IOLoop初始化的过程。

    def __init__(self):
        self._handles = {}
        ....
        if os.name != 'nt':
            r, w = os.pipe()
            self._set_nonblocking(r)
            self._set_nonblocking(w)
            .....
            self._waker_reader = os.fdopen(r, "rb", 0)
            self._waker_writer = os.fdopen(w, "wb", 0)    
        # 显然这是对读管道文件描述符事件处理函数
        self.add_handler(r, self._read_waker, self.READ)
    

    add_handler(self, fd, handler, events)

    def add_handler(self, fd, handler, events):
        self._handlers[fd] = stack_context.wrap(handler)
        self._impl.register(fd, events| self.ERROR)
    

    可见add_handler干了两件事情:

    • 回调函数设置,当然不仅仅是简单的将handler赋值,而是使用了stack_context.wrap包裹了该函数,具体实现,见下面。
    • epoll对象添加该事件,就是在代码的第二行。所以self._handlers[fd](fd, args)实际上就是设置的回调函数。那么用stack_context.wrap()来包裹究竟是为了什么了?

    update_handler(self, fd, events)

     def update_handler(self, fd, events):
            """Changes the events we listen for fd."""
            self._impl.modify(fd, events | self.ERROR)
    

    该函数用来修改fd感兴趣的事件

  • 相关阅读:
    常见问题汇总
    python的正则表达式
    Python 程序读取外部文件、网页的编码与JSON格式的转化
    第三方仓库PyPI
    文件名称一定不要设置为某些模块名称,会引起冲突!
    迟来的博客
    FENLIQI
    fenye
    Notif
    phpv6_css
  • 原文地址:https://www.cnblogs.com/bofengqiye/p/7353056.html
Copyright © 2011-2022 走看看