zoukankan      html  css  js  c++  java
  • 深入tornado中的ioLoop

    本文所剖析的tornado源码版本为4.4.2

    ioloop是tornado的关键,是他的最底层。

    ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中

    ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。

    另外,ioloop还被用来集中运行回调函数以及集中处理定时任务

    一 准备知识:

      1 首先我们要了解Reactor模型

      2 其次,我们要了解I/O多路复用,由于本文假设系统为Linux,所以要了解epoll以及Python中的select模块

      3 IOLoop类是Configurable类的子类,而Configurable类是一个工厂类,讲解在这

    二  创建IOLoop实例

    来看IOLoop,它的父类是Configurable类,也就是说:IOLoop是一个直属配置子类

    class IOLoop(Configurable):
        ......

    这里就要结合Configurable类进行讲解:

    def __new__(cls, *args, **kwargs)
            '''
            解析出impl对象    
                1 cls是直属配置子类时,impl就是该直属配置子类的'执行类对象'
                2 cls是从属配置子类时,impl就是该从属配置子类自身
            然后实例化一个impl实例对象
            运行其initialize方法,并传入合并后的参数
            返回该impl实例对象
            '''
            base = cls.configurable_base()   
            init_kwargs = {}
            if cls is base:
                impl = cls.configured_class()
                if base.__impl_kwargs:
                    init_kwargs.update(base.__impl_kwargs)
            else:
                impl = cls
            init_kwargs.update(kwargs)
            instance = super(Configurable, cls).__new__(impl)
            instance.initialize(*args, **init_kwargs)
            return instance
    Configurable中的__new__方法

    1 首先实例化一个该直属配置子类的'执行类对象',也就是调用该类的configurable_default方法并返回赋值给impl:

        @classmethod
        def configurable_default(cls):
            if hasattr(select, "epoll"):     # 因为我们假设我们的系统为Linux,且支持epoll,所以这里为True
                from tornado.platform.epoll import EPollIOLoop
                return EPollIOLoop 
            if hasattr(select, "kqueue"):
                # Python 2.6+ on BSD or Mac
                from tornado.platform.kqueue import KQueueIOLoop
                return KQueueIOLoop
            from tornado.platform.select import SelectIOLoop
            return SelectIOLoop

    2 也就是impl是EPollIOLoop类对象,然后实例化该对象,运行其initialize方法

    class EPollIOLoop(PollIOLoop):  # 该类只有这么短短的几句,可见主要的方法是在其父类PollIOLoop中实现。
        def initialize(self, **kwargs):
            super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 执行了父类PollIOLoop的initialize方法,并将select.epoll()传入

      来看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:

    class PollIOLoop(IOLoop):  # 从属配置子类
    
        def initialize(self, impl, time_func=None, **kwargs):
            super(PollIOLoop, self).initialize(**kwargs)                # 调用IOLoop的initialize方法
            self._impl = impl                               # self._impl = select.epoll()
            if hasattr(self._impl, 'fileno'):               # 文件描述符的close_on_exec属性
                set_close_exec(self._impl.fileno())
            self.time_func = time_func or time.time
            self._handlers = {}                             # 文件描述符对应的fileno()作为key,(文件描述符对象,处理函数)作为value
            self._events = {}                               # 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}
            self._callbacks = []
            self._callback_lock = threading.Lock()          # 添加线程锁
            self._timeouts = []                             # 存储定时任务
            self._cancellations = 0
            self._running = False
            self._stopped = False
            self._closing = False
            self._thread_ident = None                       # 获得当前线程标识符
            self._blocking_signal_threshold = None
            self._timeout_counter = itertools.count()
    
            # Create a pipe that we send bogus data to when we want to wake
            # the I/O loop when it is idle
            self._waker = Waker()
            self.add_handler(self._waker.fileno(),
                             lambda fd, events: self._waker.consume(),
                             self.READ)

      首先调用了IOLoop.initialize(self,**kwargs)方法:

        def initialize(self, make_current=None):
            if make_current is None:
                if IOLoop.current(instance=False) is None:
                    self.make_current()
            elif make_current:
                if IOLoop.current(instance=False) is not None:
                    raise RuntimeError("current IOLoop already exists")
                self.make_current()
    @staticmethod
    def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self

        我们可以看到IOLoop.initialize()主要是对线程做了一些支持和操作。

    3 返回该实例

    三 剖析PollIOLoop

    1 处理I/O事件以及其对应handler的相关属性以及方法

        使用self._handlers用来存储fd与handler的对应关系,文件描述符对应的fileno()作为key,元组(文件描述符对象,处理函数)作为value

      self._events 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}

        add_handler方法用来添加handler

      update_handle方法用来更新handler

        remove_handler方法用来移除handler

        def add_handler(self, fd, handler, events):
            # 向epoll中注册事件 , 并在self._handlers[fd]中为该文件描述符添加相应处理函数
            fd, obj = self.split_fd(fd)   # fd.fileno(),fd
            self._handlers[fd] = (obj, stack_context.wrap(handler))
            self._impl.register(fd, events | self.ERROR)
    
        def update_handler(self, fd, events):
            fd, obj = self.split_fd(fd)
            self._impl.modify(fd, events | self.ERROR)
    
        def remove_handler(self, fd):
            fd, obj = self.split_fd(fd)
            self._handlers.pop(fd, None)
            self._events.pop(fd, None)
            try:
                self._impl.unregister(fd)
            except Exception:
                gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

    2 处理回调函数的相关属性以及方法

      self._callbacks用来存储回调函数

      add_callback方法用来直接添加回调函数

      add_future方法用来间接的添加回调函数,future对象详解在这

        def add_callback(self, callback, *args, **kwargs):
            # 因为Python的GIL的限制,导致Python线程并不算高效。加上tornado实现了多进程 + 协程的模式,所以我们略过源码中的部分线程相关的一些操作
            if self._closing:
                return
            self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
        def add_future(self, future, callback):
            # 为future对象添加经过包装后的回调函数,该回调函数会在future对象被set_done后添加至_callbacks中
            assert is_future(future)
            callback = stack_context.wrap(callback)
            future.add_done_callback(
                lambda future: self.add_callback(callback, future))

    3 处理定时任务的相关属性以及方法

      self._timeouts用来存储定时任务

      self.add_timeout用来添加定时任务(self.call_later   self.call_at都是间接调用了该方法)

    def add_timeout(self, deadline, callback, *args, **kwargs):
            """
                ``deadline``可能是一个数字,表示相对于当前时间的时间(与“IOLoop.time”通常为“time.time”相同的大小),或者是datetime.timedelta对象。 
                自从Tornado 4.0以来,`call_later`是一个比较方便的替代方案,因为它不需要timedelta对象。
    
            """
            if isinstance(deadline, numbers.Real):
                return self.call_at(deadline, callback, *args, **kwargs)
            elif isinstance(deadline, datetime.timedelta):
                return self.call_at(self.time() + timedelta_to_seconds(deadline),
                                    callback, *args, **kwargs)
            else:
                raise TypeError("Unsupported deadline %r" % deadline)

    4 启动io多路复用器

      启动也一般就意味着开始循环,那么循环什么呢?

        1 运行回调函数

        2 运行时间已到的定时任务

        3 当某个文件描述法发生事件时,运行该事件对应的handler

      使用start方法启动ioloop,看一下其简化版(去除线程相关,以及一些相对不重要的细节):

    def start(self):
            try:
                while True:    
                    callbacks = self._callbacks
                    self._callbacks = []
                    due_timeouts = []
                    # 将时间已到的定时任务放置到due_timeouts中,过程省略
                    for callback in callbacks:          # 执行callback
                        self._run_callback(callback)
                    for timeout in due_timeouts:        # 执行定时任务
                        if timeout.callback is not None:
                            self._run_callback(timeout.callback)       
                    callbacks = callback = due_timeouts = timeout = None    # 释放内存
                    # 根据情况设置poll_timeout的值,过程省略
                    if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环
                        break
    try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系统调用被信号处理函数中断,进行下一次循环 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 获取一个fd以及对应事件 try: fd_obj, handler_func = self._handlers[fd] # 获取该fd对应的事件处理函数 handler_func(fd_obj, events) # 运行该事件处理函数 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 当客户端关闭连接时会产生EPIPE错误 pass # 其他异常处理已经省略 fd_obj = handler_func = None # 释放内存空间
    def start(self):
            if self._running:
                raise RuntimeError("IOLoop is already running")
            self._setup_logging()
            if self._stopped:
                self._stopped = False
                return
            old_current = getattr(IOLoop._current, "instance", None)
            IOLoop._current.instance = self
            self._thread_ident = thread.get_ident()     # 获得当前线程标识符
            self._running = True
            old_wakeup_fd = None
            if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
                # 需要Python2.6及以上版本,类UNIX系统,set_wake_up_fd存在。在windows系统上运行会崩溃
                try:
                    old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                    if old_wakeup_fd != -1:
                        # Already set, restore previous value.  This is a little racy,
                        # but there's no clean get_wakeup_fd and in real use the
                        # IOLoop is just started once at the beginning.
                        signal.set_wakeup_fd(old_wakeup_fd)
                        old_wakeup_fd = None
                except ValueError:
                    # Non-main thread, or the previous value of wakeup_fd
                    # is no longer valid.
                    old_wakeup_fd = None
    
            try:
                while True:
                    # 防止多线程模型时产生脏数据
                    with self._callback_lock:    
                        callbacks = self._callbacks
                        self._callbacks = []
    
                    due_timeouts = []
                    if self._timeouts:         # 将时间已到的定时任务放置到due_timeouts中
                        now = self.time()
                        while self._timeouts:
                            if self._timeouts[0].callback is None:
                                heapq.heappop(self._timeouts)
                                self._cancellations -= 1
                            elif self._timeouts[0].deadline <= now:
                                due_timeouts.append(heapq.heappop(self._timeouts))
                            else:
                                break
                        if (self._cancellations > 512 and
                                self._cancellations > (len(self._timeouts) >> 1)):
                            self._cancellations = 0
                            self._timeouts = [x for x in self._timeouts
                                              if x.callback is not None]
                            heapq.heapify(self._timeouts)
    
                    for callback in callbacks:        # 执行callbacks
                        self._run_callback(callback)
                    for timeout in due_timeouts:    # 执行timeout_callback
                        if timeout.callback is not None:
                            self._run_callback(timeout.callback)
                    # 释放内存
                    callbacks = callback = due_timeouts = timeout = None
    
                    if self._callbacks: # 如果在执行callbacks 或者 timeouts的过程中,他们执行了add_callbacks ,那么这时:self._callbacks就非空了,
                        # 为了尽快的执行其中的callbacks,我们需要将poll_timeout 设置为0,这样我们就不需要等待fd事件发生,尽快运行callbacks了
                        poll_timeout = 0.0
                    elif self._timeouts:
                        # If there are any timeouts, schedule the first one.
                        # Use self.time() instead of 'now' to account for time
                        # spent running callbacks.
                        poll_timeout = self._timeouts[0].deadline - self.time()
                        poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                    else:
                        # 如果没有回调函数也没有定时任务,我们就使用默认值
                        poll_timeout = _POLL_TIMEOUT
    
                    if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环
                        break
    
                    if self._blocking_signal_threshold is not None:
                        # clear alarm so it doesn't fire while poll is waiting for
                        # events.
                        signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
                    try:
                        event_pairs = self._impl.poll(poll_timeout)
                    except Exception as e:
                        # http://blog.csdn.net/benkaoya/article/details/17262053 解释EINTR是什么。系统调用被信号处理函数中断,进行下一次循环
                        if errno_from_exception(e) == errno.EINTR:
                            continue
                        else:
                            raise
    
                    if self._blocking_signal_threshold is not None:
                        signal.setitimer(signal.ITIMER_REAL,
                                         self._blocking_signal_threshold, 0)
    
                    # 从一组待处理的fds中一次弹出一个fd并运行其处理程序。 
                    # 由于该处理程序可能会对其他文件描述符执行操作,因此可能会重新调用此IOLoop来修改self._events
                    self._events.update(event_pairs)
                    while self._events:
                        fd, events = self._events.popitem()             # 获取一个fd以及对应事件
                        try:
                            fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数
                            handler_func(fd_obj, events)                # 运行该事件处理函数
                        except (OSError, IOError) as e:         
                            if errno_from_exception(e) == errno.EPIPE:
                                # 当客户端关闭连接时会产生EPIPE错误
                                pass
                            else:
                                self.handle_callback_exception(self._handlers.get(fd))
                        except Exception:
                            self.handle_callback_exception(self._handlers.get(fd))
                    # 释放内存空间
                    fd_obj = handler_func = None                        
    
            finally:
                # reset the stopped flag so another start/stop pair can be issued
                self._stopped = False 
                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)
                IOLoop._current.instance = old_current
                if old_wakeup_fd is not None:
                    signal.set_wakeup_fd(old_wakeup_fd)
    start完整版

    5 关闭io多路复用器

    def close(self, all_fds=False):
            with self._callback_lock:
                self._closing = True
            self.remove_handler(self._waker.fileno())
            if all_fds:    # 该参数若为True,则表示会关闭所有文件描述符
                for fd, handler in self._handlers.values():
                    self.close_fd(fd)
            self._waker.close()
            self._impl.close() 
            self._callbacks = None
            self._timeouts = None

     四 参考 

      https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/
      https://www.zhihu.com/question/20021164
      http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752
      http://blog.csdn.net/benkaoya/article/details/17262053

  • 相关阅读:
    窗口宽高 滚动条滚动距离 元素的文档坐标和窗口坐标
    parentNode parentElement childNodes children
    ffmpeg 提取mp3
    Linux修改时区的正确方法
    pip
    wireguard
    Nextcloud挂载谷歌云盘搭建
    ts合并
    screen
    google drive
  • 原文地址:https://www.cnblogs.com/MnCu8261/p/6730691.html
Copyright © 2011-2022 走看看