zoukankan      html  css  js  c++  java
  • [原]tornado源码分析系列(二)[网络层 IOLoop类]

    引言:上一章起了个头,讲了tornado的源码结构和IOLoop的简单Demo,这一章就IOLoop类的方法来看看IOLoop提供了哪些功能。

    看看IOLoop的类组织结构

    	|---IOLoop
    			---__init__(self, impl=None)
    			---instance(cls)
    			---initialized(cls)
    			---add_handler(self, fd, handler, events)
    			---update_handler(self, fd, events)
    			---remove_handler(self, fd)
    			---set_blocking_signal_threshold(self, seconds, action)
    			---set_blocking_log_threshold(self, seconds)
    			---log_stack(self, signal, frame)
    			---start(self)
    			---stop(self)
    			---running(self)
    			---add_timeout(self, deadline, callback)
    			---remove_timeout(self, timeout)
    			---add_callback(self, callback)
    			---_wake(self)
    			---_run_callback(self, callback)
    			---handle_callback_exception(self, callback)
    			---_read_waker(self, fd, events)
    			---_set_nonblocking(self, fd)
    			---_set_close_exec(self, fd)
    	---|
    

    从上一章的Demo里面可以看到最重要的对外提供的方法有

    0.instance() @classmethod

    1.add_handler(...)

    2.start()

    类似于传统的事件驱动方式,这里的使用方式也很简单

    从IOLoop类中看起:

    先是自己定义了几个EPOLL的宏,就是EPOLL的事件类型

    #epoll 的事件类型,类似于这里的宏定义
    _EPOLLIN = 0x001
    _EPOLLPRI = 0x002
    _EPOLLOUT = 0x004
    _EPOLLERR = 0x008
    _EPOLLHUP = 0x010
    _EPOLLRDHUP = 0x2000
    _EPOLLONESHOT = (1 << 30)
    _EPOLLET = (1 << 31)

    # Our events map exactly to the epoll events
    #将这几个事件类型重定义一番
    NONE = 0
    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

    常用的就是三种,READ,WRITE,ERROR

    #ioloop的构造函数
        def __init__(self, impl=None):
            #选择异步事件循环监听方式,默认是epoll,后面的_impl都是指的是epoll
            self._impl = impl or _poll()
            #自省,查看 self._impl 中是否有 fileno
            #如果有,就关闭起exec性质
            if hasattr(self._impl, 'fileno'):
                self._set_close_exec(self._impl.fileno())
            # _set_close_exec 是一个类方法,下面有定义
      	# 当 FD_CLOEXEC 设置了以后,exec() 函数执行的时候会自动关闭描述符
    """     def _set_close_exec(self, fd):
                flags = fcntl.fcntl(fd, fcntl.F_GETFD)
                fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)   """
            #handlers 是一个函数集字典
            self._handlers = {}
            self._events = {}
            #回调函数使用的是列表
            self._callbacks = []
            #用来记录链接超时
            self._timeouts = []
            self._running = False
            self._stopped = False
            self._blocking_signal_threshold = None
    
            # Create a pipe that we send bogus data to when we want to wake
            # the I/O loop when it is idle
            #判断是否是 NT 操作系统
            if os.name != 'nt':
                #创建一个管道 ,返回的为读写两端的文件描述符
                r, w = os.pipe()
                #设置为非阻塞
                self._set_nonblocking(r)
                self._set_nonblocking(w)
               
                self._set_close_exec(r)
                self._set_close_exec(w)
                #分别以读方式和写方式打开管道
                self._waker_reader = os.fdopen(r, "rb", 0)
                self._waker_writer = os.fdopen(w, "wb", 0)
            else:
                #如若不是 NT 系统,改用win32 支持的管道类型
                self._waker_reader = self._waker_writer = win32_support.Pipe()
                r = self._waker_writer.reader_fd
            #将 管道的  read端与 函数 _read_waker  关联,事件类型为 READ
            #这里也是IO 多路复用的一种机制,将管道的描述符也添加进多路复用的IO 管理
            self.add_handler(r, self._read_waker, self.READ)
    

    注意最后的几点,将管道描述符的读端也加入事件循环检查,并设置相应的回调函数,这样做的好处是以便事件循环阻塞而没有相应描述符出现,需要在最大timeout时间之前返回,就可以向这个管道发送一个字符,用来终止阻塞在监听阶段的事件循环监听函数。

    看看waker是这样定义的:

        def _wake(self):
            try:
                self._waker_writer.write("x")
            except IOError:
                pass
    

    需要唤醒阻塞中的事件循环监听函数的时候,只需要向管道写入一个字符,就可以提前结束循环

    instance就是简单的返回一个实例:

        def instance(cls):
            """Returns a global IOLoop instance.
    
            Most single-threaded applications have a single, global IOLoop.
            Use this method instead of passing around IOLoop instances
            throughout your code.
    
            A common pattern for classes that depend on IOLoops is to use
            a default argument to enable programs with multiple IOLoops
            but not require the argument for simpler applications:
    
                class MyClass(object):
                    def __init__(self, io_loop=None):
                        self.io_loop = io_loop or IOLoop.instance()
            """
            if not hasattr(cls, "_instance"):
                cls._instance = cls()
            return cls._instance
    

    instance()是一个静态方法,代表此IOLoop是一个单实例方法,一个进程只有一个

    在add_handler()里面

        #将文件描述符发生相应的事件时的回调函数对应
        def add_handler(self, fd, handler, events):
            """Registers the given handler to receive the given events for fd."""
            self._handlers[fd] = stack_context.wrap(handler)
            #在 epoll 中注册对应事件
            #epoll_ctl
            self._impl.register(fd, events | self.ERROR)
        #更新相应的事件类型
    

    可以看到,使用字典的方式,每一个fd就对应一个handler,下次事件循环返回的时候按照返回后的fd列表,依次调用相应的callback

    |------

    在tornado中,函数是通过stack_context.wrap()包装过,可以用来记录上下文

    如果需要调用被包装过的函数,需要调用方法

    _run_callback(self, callback)  

    这个函数将包装过的callback作为参数出入,然后执行函数

        def _run_callback(self, callback):
            try:
                callback()
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                self.handle_callback_exception(callback)
    

    当函数执行发生异常时,可以记录下函数执行状态  

    -------|

    _impl.register就是被封装过的epoll的epoll_ctl,参数是EPOLL_CTL_ADD

    见同一个文件下的_EPoll类

    class _EPoll(object):
        """An epoll-based event loop using our C module for Python 2.5 systems"""
        _EPOLL_CTL_ADD = 1
        _EPOLL_CTL_DEL = 2
        _EPOLL_CTL_MOD = 3
    
        def __init__(self):
            self._epoll_fd = epoll.epoll_create()
    
        def fileno(self):
            return self._epoll_fd
    
        def register(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
    
        def modify(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
    
        def unregister(self, fd):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
    
        def poll(self, timeout):
            return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
    

      

    总结:这一章讲了IOLoop中的几个重要函数,后面依次会有分析其他方法,还有其中一些细节值得平常注意的。  

      

      

    文章属原创,转载请注明出处 联系作者: Email:zhangbolinux@sina.com QQ:513364476
  • 相关阅读:
    POJ 1328 Radar Installation
    POJ 1700 Crossing River
    POJ 1700 Crossing River
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3253 Fence Repair (贪心,优先队列)
    poj 3069 Saruman's Army(贪心)
    poj 3069 Saruman's Army(贪心)
    Redis 笔记与总结2 String 类型和 Hash 类型
    数据分析方法有哪些_数据分析方法
    数据分析方法有哪些_数据分析方法
  • 原文地址:https://www.cnblogs.com/Bozh/p/2597114.html
Copyright © 2011-2022 走看看