网上都说nginx和lighthttpd是高性能web服务器,而tornado也是著名的高抗负载应用,它们间有什么相似处呢?上节提到的ioloop对象是如何循环的呢?往下看。
首先关于TCP服务器的开发上节已经提过,很明显那个三段式的示例是个效率很低的(因为只有一个连接被端开新连接才能被接受)。要想开发高性能的服务器,就得在这accept上下功夫。
首先,新连接的到来一般是经典的三次握手,只有当服务器收到一个SYN时才说明有一个新连接(还没建立),这时监听fd是可读的可以调用accept,此前服务器可以干点别的,这就是SELECT/POLL/EPOLL的思路。而只有三次握手成功后,accept才会返回,此时监听fd是读完成状态,似乎服务器在此之前可以转身去干别的,等到读完成再调用accept就不会有延迟了,这就是AIO的思路,不过在*nix平台上好像支持不是很广。。。再有,accept得到的新fd,不一定是可读的(客户端请求还没到达),所以可以等新fd可读时在read()(可能会有一点延迟),也可以用AIO等读完后再read就不会延迟了。同样类似,对于write,close也有类似的事件。
总的思路就是,在我们关心的fd上注册关心的多个事件,事件发生了就启动回调,没发生就看点别的。这是单线程的,多线程的复杂一点,但差不多。nginx和lightttpd以及tornado都是类似的方式,只不过是多进程和多线程或单线程的区别而已。为简便,我们只分析tornado单线程的情况。
关于ioloop.py的代码,主要有两个要点。一个是configurable机制,一个就是epoll循环。先看epoll循环吧。IOLoop 类的start是循环所在,但它必须被子类覆盖实现,因此它的start在PollIOLoop里。略过循环外部的多线程上下文环境的保存与恢复,单看循环:
while True: poll_timeout = 3600.0 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] for callback in callbacks: self._run_callback(callback) if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # the timeout was cancelled heapq.heappop(self._timeouts) elif self._timeouts[0].deadline <= now: timeout = heapq.heappop(self._timeouts) self._run_callback(timeout.callback) else: seconds = self._timeouts[0].deadline - now poll_timeout = min(seconds, poll_timeout) break if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except (OSError, IOError) as e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: app_log.error("Exception in I/O handler for fd %s", fd, exc_info=True) except Exception: app_log.error("Exception in I/O handler for fd %s", fd, exc_info=True)
首先是设定超时时间。然后在互斥锁下取出上次循环遗留下的回调列表(在add_callback添加对象),把这次列表置空,然后依次执行列表里的回调。这里的_run_callback就没什么好分析的了。紧接着是检查上次循环遗留的超时列表,如果列表里的项目有回调而且过了截止时间,那肯定超时了,就执行对应的超时回调。然后检查是否又有了事件回调(因为很多回调函数里可能会再添加回调),如果是,则不在poll循环里等待,如注释所述。接下来最关键的一句是event_pairs = self._impl.poll(poll_timeout),这句里的_impl是epoll,在platform/epoll.py里定义,总之就是一个等待函数,当有事件(超时也算)发生就返回。然后把事件集保存下来,对于每个事件,self._handlers[fd](fd, events)根据fd找到回调,并把fd和事件做参数回传。如果fd是监听的fd,那么这个回调handler就是accept_handler函数,详见上节代码。如果是新fd可读,一般就是_on_headers 或者 _on_requet_body了,详见前几节。我好像没看到可写时的回调?以上,就是循环的流程了。可能还是看的糊里糊涂的,因为很多对象怎么来的都不清楚,configurable也还没有看。看完下面的分析,应该就可以了。
Configurable类在util.py里被定义。类里有一段注释,已经很明确的说明了它的设计意图和用法。它是可配置接口的父类,可配置接口对外提供一致的接口标识,但它的子类实现可以在运行时进行configure。一般在跨平台时由于子类实现有多种选择,这时候就可以使用可配置接口,例如select和epoll。首先注意 Configurable 的两个函数: configurable_base 和 configurable_default, 两函数都需要被子类(即可配置接口类)覆盖重写。其中,base函数一般返回接口类自身,default返回接口的默认子类实现,除非接口指定了__impl_class。IOLoop及其子类实现都没有初始化函数也没有构造函数,其构造函数继承于Configurable,如下:
def __new__(cls, **kwargs): base = cls.configurable_base() args = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: args.update(base.__impl_kwargs) else: impl = cls args.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatiblity with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(**args) return instance
当子类对象被构造时,子类__new__被调用,因此参数里的cls指的是Configurabel的子类(可配置接口类,如IOLoop)。先是得到base,查看IOLoop的代码发现它返回的是自身类。由于base和cls是一样的,所以调用configured_class()得到接口的子类实现,其实就是调用base(现在是IOLoop)的configurable_default,总之就是返回了一个子类实现(epoll/kqueue/select之一),顺便把__impl_kwargs合并到args里。接着把kwargs并到args里。然后调用Configurable的父类(Object)的__new__方法,生成了一个impl的对象,紧接着把args当参数调用该对象的initialize(继承自PollIOloop,其initialize下段进行分析),返回该对象。
所以,当构造IOLoop对象时,实际得到的是EPollIOLoop或其它相似子类。另外,Configurable 还提供configure方法来给接口指定实现子类和参数。可以看的出来,Configurable类主要提供构造方法,相当于对象工厂根据配置来生产对象,同时开放configure接口以供配置。而子类按照约定调整配置即可得到不同对象,代码得到了复用。
解决了构造,来看看IOLoop的instance方法。先检查类是否有成员_instance,一开始肯定没有,于是就构造了一个IOLoop对象(即EPollIOLoop对象)。以后如果再调用instance,得到的则是已有的对象,这样就确保了ioloop在全局是单例。再看epoll循环时注意到self._impl,Configurable 和 IOLoop 里都没有, 这是在哪儿定义的呢? 为什么IOLoop的start跑到PollIOLoop里,应该是EPollIOLoop才对啊。 对,应该看出来了,EPollIOLoop 就是PollIOLoop的子类,所以方法被继承了是很常见的哈。
从上一段的构造流程里可以看到,EPollIOLoop对象的initialize方法被调用了,看其代码发现它调用了其父类(PollIOLoop)的它方法, 并指定了impl=select.epoll(), 然后在父类的方法里就把它保存了下来,所以self._impl.poll就等效于select.epoll().poll().PollIOLoop里还有一些注册,修改,删除监听事件的方法,其实就是对self._impl的封装调用。就如上节的 add_accept_handler 就是调用ioloop的add_handler方法把监听fd和accept_handler方法进行关联。
IOLoop基本是个事件循环,因此它总是被其它模块所调用。而且为了足够通用,基本上对回调没多大限制,一个可执行对象即可。事件分发就到此结束了,和IO事件密切相关的另一个部分是IOStream,看看它是如何读写的。
IOLoop instance()方法的讲解
Tornado 的源码写得有点难懂,需要你理解好 socket、epoll 这样的东西才能充分理解。需要深入到 Tornado 的源码,ioloop.py 这个文件很关键。
接下来,我们继续读 ioloop.py 这个文件。
IOLoop 是基于 epoll 实现的底层网络I/O的核心调度模块,用于处理 socket 相关的连接、响应、异步读写等网络事件。每个 Tornado 进程都会初始化一个全局唯一的 IOLoop 实例,在 IOLoop 中通过静态方法 instance() 进行封装,获取 IOLoop 实例直接调用此方法即可。
@staticmethod def instance(): """Returns a global `IOLoop` instance. Most applications have a single, global `IOLoop` running on the main thread. Use this method to get this instance from another thread. To get the current thread's `IOLoop`, use `current()`. """ if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance
Tornado 服务器启动时会创建监听 socket,并将 socket 的 file descriptor 注册到 IOLoop 实例中,IOLoop 添加对 socket 的IOLoop.READ 事件监听并传入回调处理函数。当某个 socket 通过 accept 接受连接请求后调用注册的回调函数进行读写。接下来主要分析IOLoop 对 epoll 的封装和 I/O 调度具体实现。
epoll是Linux内核中实现的一种可扩展的I/O事件通知机制,是对POISX系统中 select 和 poll 的替代,具有更高的性能和扩展性,FreeBSD中类似的实现是kqueue。Tornado中基于Python C扩展实现的的epoll模块(或kqueue)对epoll(kqueue)的使用进行了封装,使得IOLoop对象可以通过相应的事件处理机制对I/O进行调度。具体可以参考前面小节的 预备知识:我读过的对epoll最好的讲解 。
IOLoop模块对网络事件类型的封装与epoll一致,分为READ / WRITE / ERROR三类,具体在源码里呈现为:
# Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP
回到前面章节的 开始用Tornado:从Hello World开始 里面的示例,
http_server = tornado.httpserver.HTTPServer(application) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()
前两句是启动服务器,启动服务器之后,还需要启动 IOLoop 的实例,这样可以启动事件循环机制,配合非阻塞的 HTTP Server 工作。更多关于 IOLoop的与Http服务器的细节,在 Tornado对Web请求与响应的处理机制 这里有介绍到。
这就是 IOLoop 的 instance() 方法的一些细节,接下来我们再看看 start() 的细节。
IOLoop start()里的核心调度
IOLoop的初始化
初始化过程中选择 epoll 的实现方式,Linux 平台为 epoll,BSD 平台为 kqueue,其他平台如果安装有C模块扩展的 epoll 则使用 tornado对 epoll 的封装,否则退化为 select。
def __init__(self, impl=None): self._impl = impl or _poll() #省略部分代码 self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" self._handlers[fd] = stack_context.wrap(handler) self._impl.register(fd, events | self.ERROR)
在 IOLoop 初始化的过程中创建了一个 Waker 对象,将 Waker 对象 fd 的读端注册到事件循环中并设定相应的回调函数(这样做的好处是当事件循环阻塞而没有响应描述符出现,需要在最大 timeout 时间之前返回,就可以向这个管道发送一个字符)。
Waker 的使用:一种是在其他线程向 IOLoop 添加 callback 时使用,唤醒 IOLoop 同时会将控制权转移给 IOLoop 线程并完成特定请求。唤醒的方法向管道中写入一个字符'x'。另外,在 IOLoop的stop 函数中会调用self._waker.wake(),通过向管道写入'x'停止事件循环。
add_handler 函数使用了stack_context 提供的 wrap 方法。wrap 返回了一个可以直接调用的对象并且保存了传入之前的堆栈信息,在执行时可以恢复,这样就保证了函数的异步调用时具有正确的运行环境。
IOLoop的start方法
IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用 start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。
- 为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
- 超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback 调用函数;如果没有超时需要重新设定 poll_timeout 的值。
- 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
- epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
while True: poll_timeout = 3600.0 with self._callback_lock: callbacks = self._callbacks self._callbacks = [] for callback in callbacks: self._run_callback(callback) # 超时处理 if self._timeouts: now = time.time() while self._timeouts: if self._timeouts[0].callback is None: # the timeout was cancelled heapq.heappop(self._timeouts) elif self._timeouts[0].deadline <= now: timeout = heapq.heappop(self._timeouts) self._run_callback(timeout.callback) else: seconds = self._timeouts[0].deadline - now poll_timeout = min(seconds, poll_timeout) break if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for events. signal.setitimer(signal.ITIMER_REAL, 0, 0) # epoll阻塞,当有事件通知或超时返回event_pairs try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: # 异常处理,省略 # 对epoll返回event_pairs事件的处理 self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except Exception e: # 异常处理,省略
3.0后的一些改动
Tornado3.0以后 IOLoop 模块的一些改动。
IOLoop 成为 util.Configurable 的子类,IOLoop 中绝大多数成员方法都作为抽象接口,具体实现由派生类 PollIOLoop 完成。IOLoop 实现了 Configurable 中的 configurable_base 和 configurable_default 这两个抽象接口,用于初始化过程中获取类类型和类的实现方法(即 IOLoop 中 poller 的实现方式)。
在 Tornado3.0+ 中针对不同平台,单独出 poller 相应的实现,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均继承于 PollIOLoop。下边的代码是 configurable_default 方法根据平台选择相应的 epoll 实现。初始化 IOLoop 的过程中会自动根据平台选择合适的 poller 的实现方法。
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
IOLoop与Configurable类
IOLoop 是 tornado 的核心。程序中主函数通常调用 tornado.ioloop.IOLoop.instance().start() 来启动IOLoop,但是看了一下 IOLoop 的实现,start 方法是这样的:
def start(self): """Starts the I/O loop. The loop will run until one of the callbacks calls `stop()`, which will make the loop stop after the current event iteration completes. """ raise NotImplementedError()
也就是说 IOLoop 是个抽象的基类,具体工作是由它的子类负责的。由于是 Linux 平台,所以应该用 Epoll,对应的类是 PollIOLoop。PollIOLoop 的 start 方法开始了事件循环。
问题来了,tornado.ioloop.IOLoop.instance() 是怎么返回 PollIOLoop 实例的呢?刚开始有点想不明白,后来看了一下 IOLoop 的代码就豁然开朗了。
IOLoop 继承自 Configurable,后者位于 tornado/util.py。
A configurable interface is an (abstract) class whose constructor acts as a factory function for one of its implementation subclasses. The implementation subclass as well as optional keyword arguments to its initializer can be set globally at runtime with configure.
Configurable 类实现了一个工厂方法,也就是设计模式中的“工厂模式”,看一下__new__函数的实现:
def __new__(cls, **kwargs): base = cls.configurable_base() args = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: args.update(base.__impl_kwargs) else: impl = cls args.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatiblity with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(**args) return instance
当创建一个Configurable类的实例的时候,其实创建的是configurable_class()返回的类的实例。
@classmethod def configured_class(cls): """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class
最后,就是返回的configurable_default()。此函数在IOLoop中的实现如下:
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
EPollIOLoop 是 PollIOLoop 的子类。至此,这个流程就理清楚了。
对socket封装的IOStream机制概览
IOStream对socket读写进行了封装,分别提供读、写缓冲区实现对socket的异步读写。当socket被accept之后HTTPServer的_handle_connection会被回调并初始化IOStream对象,进一步通过IOStream提供的功能接口完成socket的读写。文章接下来将关注IOStream实现读写的细节。
IOStream的初始化
IOStream初始化过程中主要完成以下操作:
- 绑定对应的socket
- 绑定ioloop
- 创建读缓冲区_read_buffer,一个python deque容器
- 创建写缓冲区_write_buffer,同样也是一个python deque容器
IOStream提供的主要功能接口
主要的读写接口包括以下四个:
class IOStream(object): def read_until(self, delimiter, callback): def read_bytes(self, num_bytes, callback, streaming_callback=None): def read_until_regex(self, regex, callback): def read_until_close(self, callback, streaming_callback=None): def write(self, data, callback=None):
- read_until和read_bytes是最常用的读接口,它们工作的过程都是先注册读事件结束时调用的回调函数,然后调用_try_inline_read方法。_try_inline_read首先尝试_read_from_buffer,即从上一次的读缓冲区中取数据,如果有数据直接调用 self._run_callback(callback, self._consume(data_length)) 执行回调函数,_consume消耗掉了_read_buffer中的数据;否则即_read_buffer之前没有未读数据,先通过_read_to_buffer将数据从socket读入_read_buffer,然后再执行_read_from_buffer操作。read_until和read_bytes的区别在于_read_from_buffer过程中截取数据的方法不同,read_until读取到delimiter终止,而read_bytes则读取num_bytes个字节终止。执行过程如下图所示
- read_until_regex相当于delimiter为某一正则表达式的read_until。
- read_until_close主要用于IOStream流关闭前后的读取:如果调用read_until_close时stream已经关闭,那么将会_consume掉_read_buffer中的所有数据;否则_read_until_close标志位设为True,注册_streaming_callback回调函数,调用_add_io_state添加io_loop.READ状态。
- write首先将data按照数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,然后调用handle_write向socket发送数据。
其他内部功能接口
- def _handle_events(self, fd, events): 通常为IOLoop对象add_handler方法传入的回调函数,由IOLoop的事件机制来进行调度。
- def _add_io_state(self, state): 为IOLoop对象的handler注册IOLoop.READ或IOLoop.WRITE状态,handler为IOStream对象的_handle_events方法。
- def _consume(self, loc): 合并读缓冲区loc个字节,从读缓冲区删除并返回这些数据。
Tornado的多进程管理分析
Tornado的多进程管理我们可以参看process.py这个文件。
在编写多进程的时候我们一般都用python自带的multiprocessing,使用方法和threading基本一致,只需要继承里面的Process类以后就可以编写多进程程序了,这次我们看看tornado是如何实现他的multiprocessing,可以说实现的功能不多,但是更加简单高效。
我们只看fork_process里面的代码:
global _task_id assert _task_id is None if num_processes is None or num_processes <= 0: num_processes = cpu_count() if ioloop.IOLoop.initialized(): raise RuntimeError("Cannot run in multiple processes: IOLoop instance " "has already been initialized. You cannot call " "IOLoop.instance() before calling start_processes()") logging.info("Starting %d processes", num_processes) children = {}
这一段很简单,就是在没有传入进程数的时候使用默认的cpu个数作为将要生成的进程个数。
def start_child(i): pid = os.fork() if pid == 0: # child process _reseed_random() global _task_id _task_id = i return i else: children[pid] = i return None
这是一个内函数,作用就是生成子进程。fork是个很有意思的方法,他会同时返回两种状态,为什么呢?其实fork相当于在原有的一条路(父进程)旁边又修了一条路(子进程)。如果这条路修成功了,那么在原有的路上(父进程)你就看到旁边来了另外一条路(子进程),所以也就是返回新生成的那条路的名字(子进程的pid),但是在另外一条路上(子进程),你看到的是自己本身修建成功了,也就返回自己的状态码(返回结果是0)。
所以if pid==0表示这时候cpu已经切换到子进程了,相当于我们在新生成的这条路上面做事(返回任务id);else表示又跑到原来的路上做事了,在这里我们记录下新生成的子进程,这时候children[pid]=i里面的pid就是新生成的子进程的pid,而 i 就是刚才在子进程里面我们返回的任务id(其实就是用来代码子进程的id号)。
for i in range(num_processes): id = start_child(i) if id is not None: return id
if id is not None表示如果我们在刚刚生成的那个子进程的上下文里面,那么就什么都不干,直接返回子进程的任务id就好了,啥都别想了,也别再折腾。如果还在父进程的上下文的话那么就继续生成子进程。
num_restarts = 0 while children: try: pid, status = os.wait() except OSError, e: if e.errno == errno.EINTR: continue raise if pid not in children: continue id = children.pop(pid) if os.WIFSIGNALED(status): logging.warning("child %d (pid %d) killed by signal %d, restarting", id, pid, os.WTERMSIG(status)) elif os.WEXITSTATUS(status) != 0: logging.warning("child %d (pid %d) exited with status %d, restarting", id, pid, os.WEXITSTATUS(status)) else: logging.info("child %d (pid %d) exited normally", id, pid) continue num_restarts += 1 if num_restarts > max_restarts: raise RuntimeError("Too many child restarts, giving up") new_id = start_child(id) if new_id is not None: return new_id
剩下的这段代码都是在父进程里面做的事情(因为之前在子进程的上下文的时候已经返回了,当然子进程并没有结束)。
pid, status = os.wait()的意思是等待任意子进程退出或者结束,这时候我们就把它从我们的children表里面去除掉,然后通过status判断子进程退出的原因。
如果子进程是因为接收到kill信号或者抛出exception了,那么我们就重新启动一个子进程,用的当然还是刚刚退出的那个子进程的任务号。如果子进程是自己把事情做完了才退出的,那么就算了,等待别的子进程退出吧。
我们看到在重新启动子进程的时候又使用了
if new_id is not None: return new_id
主要就是退出子进程的空间,只在父进程上面做剩下的事情,不然刚才父进程的那些代码在子进程里面也会同样的运行,就会形成无限循环了,我没试过,不如你试试?