zoukankan      html  css  js  c++  java
  • [原]tornado源码分析系列(二)[网络层 IOLoop类]

    引言:上一章起了个头,讲了tornado的源码结构和IOLoop的简单Demo,这一章就IOLoop类的方法来看看IOLoop提供了哪些功能。

    看看IOLoop的类组织结构

    	|---IOLoop
    			---__init__(self, impl=None)
    			---instance(cls)
    			---initialized(cls)
    			---add_handler(self, fd, handler, events)
    			---update_handler(self, fd, events)
    			---remove_handler(self, fd)
    			---set_blocking_signal_threshold(self, seconds, action)
    			---set_blocking_log_threshold(self, seconds)
    			---log_stack(self, signal, frame)
    			---start(self)
    			---stop(self)
    			---running(self)
    			---add_timeout(self, deadline, callback)
    			---remove_timeout(self, timeout)
    			---add_callback(self, callback)
    			---_wake(self)
    			---_run_callback(self, callback)
    			---handle_callback_exception(self, callback)
    			---_read_waker(self, fd, events)
    			---_set_nonblocking(self, fd)
    			---_set_close_exec(self, fd)
    	---|
    

    从上一章的Demo里面可以看到最重要的对外提供的方法有

    0.instance() @classmethod

    1.add_handler(...)

    2.start()

    类似于传统的事件驱动方式,这里的使用方式也很简单

    从IOLoop类中看起:

    先是自己定义了几个EPOLL的宏,就是EPOLL的事件类型

    #epoll 的事件类型,类似于这里的宏定义
    _EPOLLIN = 0x001
    _EPOLLPRI = 0x002
    _EPOLLOUT = 0x004
    _EPOLLERR = 0x008
    _EPOLLHUP = 0x010
    _EPOLLRDHUP = 0x2000
    _EPOLLONESHOT = (1 << 30)
    _EPOLLET = (1 << 31)

    # Our events map exactly to the epoll events
    #将这几个事件类型重定义一番
    NONE = 0
    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

    常用的就是三种,READ,WRITE,ERROR

    #ioloop的构造函数
        def __init__(self, impl=None):
            #选择异步事件循环监听方式,默认是epoll,后面的_impl都是指的是epoll
            self._impl = impl or _poll()
            #自省,查看 self._impl 中是否有 fileno
            #如果有,就关闭起exec性质
            if hasattr(self._impl, 'fileno'):
                self._set_close_exec(self._impl.fileno())
            # _set_close_exec 是一个类方法,下面有定义
      	# 当 FD_CLOEXEC 设置了以后,exec() 函数执行的时候会自动关闭描述符
    """     def _set_close_exec(self, fd):
                flags = fcntl.fcntl(fd, fcntl.F_GETFD)
                fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)   """
            #handlers 是一个函数集字典
            self._handlers = {}
            self._events = {}
            #回调函数使用的是列表
            self._callbacks = []
            #用来记录链接超时
            self._timeouts = []
            self._running = False
            self._stopped = False
            self._blocking_signal_threshold = None
    
            # Create a pipe that we send bogus data to when we want to wake
            # the I/O loop when it is idle
            #判断是否是 NT 操作系统
            if os.name != 'nt':
                #创建一个管道 ,返回的为读写两端的文件描述符
                r, w = os.pipe()
                #设置为非阻塞
                self._set_nonblocking(r)
                self._set_nonblocking(w)
               
                self._set_close_exec(r)
                self._set_close_exec(w)
                #分别以读方式和写方式打开管道
                self._waker_reader = os.fdopen(r, "rb", 0)
                self._waker_writer = os.fdopen(w, "wb", 0)
            else:
                #如若不是 NT 系统,改用win32 支持的管道类型
                self._waker_reader = self._waker_writer = win32_support.Pipe()
                r = self._waker_writer.reader_fd
            #将 管道的  read端与 函数 _read_waker  关联,事件类型为 READ
            #这里也是IO 多路复用的一种机制,将管道的描述符也添加进多路复用的IO 管理
            self.add_handler(r, self._read_waker, self.READ)
    

    注意最后的几点,将管道描述符的读端也加入事件循环检查,并设置相应的回调函数,这样做的好处是以便事件循环阻塞而没有相应描述符出现,需要在最大timeout时间之前返回,就可以向这个管道发送一个字符,用来终止阻塞在监听阶段的事件循环监听函数。

    看看waker是这样定义的:

        def _wake(self):
            try:
                self._waker_writer.write("x")
            except IOError:
                pass
    

    需要唤醒阻塞中的事件循环监听函数的时候,只需要向管道写入一个字符,就可以提前结束循环

    instance就是简单的返回一个实例:

        def instance(cls):
            """Returns a global IOLoop instance.
    
            Most single-threaded applications have a single, global IOLoop.
            Use this method instead of passing around IOLoop instances
            throughout your code.
    
            A common pattern for classes that depend on IOLoops is to use
            a default argument to enable programs with multiple IOLoops
            but not require the argument for simpler applications:
    
                class MyClass(object):
                    def __init__(self, io_loop=None):
                        self.io_loop = io_loop or IOLoop.instance()
            """
            if not hasattr(cls, "_instance"):
                cls._instance = cls()
            return cls._instance
    

    instance()是一个静态方法,代表此IOLoop是一个单实例方法,一个进程只有一个

    在add_handler()里面

        #将文件描述符发生相应的事件时的回调函数对应
        def add_handler(self, fd, handler, events):
            """Registers the given handler to receive the given events for fd."""
            self._handlers[fd] = stack_context.wrap(handler)
            #在 epoll 中注册对应事件
            #epoll_ctl
            self._impl.register(fd, events | self.ERROR)
        #更新相应的事件类型
    

    可以看到,使用字典的方式,每一个fd就对应一个handler,下次事件循环返回的时候按照返回后的fd列表,依次调用相应的callback

    |------

    在tornado中,函数是通过stack_context.wrap()包装过,可以用来记录上下文

    如果需要调用被包装过的函数,需要调用方法

    _run_callback(self, callback)  

    这个函数将包装过的callback作为参数出入,然后执行函数

        def _run_callback(self, callback):
            try:
                callback()
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                self.handle_callback_exception(callback)
    

    当函数执行发生异常时,可以记录下函数执行状态  

    -------|

    _impl.register就是被封装过的epoll的epoll_ctl,参数是EPOLL_CTL_ADD

    见同一个文件下的_EPoll类

    class _EPoll(object):
        """An epoll-based event loop using our C module for Python 2.5 systems"""
        _EPOLL_CTL_ADD = 1
        _EPOLL_CTL_DEL = 2
        _EPOLL_CTL_MOD = 3
    
        def __init__(self):
            self._epoll_fd = epoll.epoll_create()
    
        def fileno(self):
            return self._epoll_fd
    
        def register(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
    
        def modify(self, fd, events):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
    
        def unregister(self, fd):
            epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
    
        def poll(self, timeout):
            return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
    

      

    总结:这一章讲了IOLoop中的几个重要函数,后面依次会有分析其他方法,还有其中一些细节值得平常注意的。  

      

      

    文章属原创,转载请注明出处 联系作者: Email:zhangbolinux@sina.com QQ:513364476
  • 相关阅读:
    【zookpeer】Failed to check the status of the service com.xxx.UserSerivce. No provider available for
    【solr】Spring data solr Document is missing mandatory uniqueKey field: id 解决
    【ssm】springmvc-spring-mybatis框架的搭建
    【jdbc】jdbc连接池理解
    【java基础】接口的理解
    【java基础】private protect的理解
    Single Number II
    Single Number I
    Candy
    Gas Station
  • 原文地址:https://www.cnblogs.com/Bozh/p/2597114.html
Copyright © 2011-2022 走看看