引言:由于都是在工作当中抽出时间看源代码,所以更新速度比较慢,但是还是希望通过对好的源码的分析和探讨,大家相互学习,发现不好的地方共同讨论。
上次讲了IOLoop中的几个重要的方法,inistance() 和 add_handler() .. 今天看看Demo中一个最重要的方法,start(),顺带用stop()收尾
def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ if self._stopped: self._stopped = False return self._running = True while True: # Never use an infinite timeout here - it can stall epoll poll_timeout = 0.2 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. callbacks = self._callbacks self._callbacks = [] #先运行注册了的回调函数 for callback in callbacks: self._run_callback(callback) if self._callbacks: poll_timeout = 0.0 #检查超时事件 #方法是,在timeout这个bisect的排序的列表,每次取出头部最小的一个 #将deadline与当前时间比较,如果 <= 当前时间,就认为超时,然后调用相应的超时处理的回调函数 #这里不好理解的是deadline <= 当前时间 , 如果说deadline 大于当前时间,就代表还没有到 #超时条件 #循环检查,直到超时事件处理完成 #值得一说的是在libevent中是使用了最小堆每次取出当前的最小deadline #由于最小堆的特性,每次从头取出的都是最小的 #Nginx的网络模块是用的红黑树来做,原理也是一样的 if self._timeouts: now = time.time() while self._timeouts and self._timeouts[0].deadline <= now: timeout = self._timeouts.pop(0) self._run_callback(timeout.callback) #处理完了超时时间之后,需要将epoll最大阻塞时间改为小于当前最小超时时间的绝对值 #不然可能在epoll返回后,本来不属于超时事件的事件被超时 if self._timeouts: milliseconds = self._timeouts[0].deadline - now poll_timeout = min(milliseconds, poll_timeout) #判断“反应堆”是否结束 #结束有两个方式,一个是设置_running 标志位,第二个就是往写管道写入"x" if not self._running: break #从注释中可以看出,每次进入epoll等待事件之前都需要把sigalrm清空,以免在 #epoll阻塞期间收到信号,在epoll完成后重新设置 if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) #进入epoll循环 try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: #在 epoll和 select 阻塞过程当中,经常会收到系统或者其他方式发过来的信号,这 #时候系统的 errno 会被设置为 EINTR ,如果将遇到这样的情况,直接重启epoll就可以 #如果不是这样的错误,则看做是致命错误 # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise #将被阻塞的sigalarm 还原 , 第二个参数是最大阻塞阈值 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events #将新的事件加入到待处理队列中,现代非阻塞的网络库都使用的是这种方式 self._events.update(event_pairs) #作者在写这段代码的过程当中不是使用的简单的顺序遍历这个队列,而使用的方式是 #将就绪事件逐个弹出,以防止在处理过程当中就绪事件发生改变 while self._events: fd, events = self._events.popitem() #在处理过程当中,常常会遇到客户端异常终止的情况 #一般情况下如果读取错误,服务端会产生一个 sigpipe信号 #这时候需要忽略这个信号 #这里我有一个疑问就是为什么在add_handler 的时候 handler是经过 context.wrap包装过的 #而在这里是直接调用,按道理应该是通过_running_callback调用,不过这里显然处理了异常情况了 try: self._handlers[fd](fd, events) except (KeyboardInterrupt, SystemExit): raise except (OSError, IOError), e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) # reset the stopped flag so another start/stop pair can be issued self._stopped = False #将定时事件清空 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0)
这段代码中值得注意的部分就是在几个方面:
1.超时事件的处理,timeout是一个排序后的列表,每次都是取得最前面最小的一个
2.在开始epoll循环的过程当中,设置阻塞sigalarm
3.在处理事件过程当中忽略sigpipe信号
4.在处理就绪事件过程当中,是通过每次pop一个来处理,而不是一次遍历
stop()函数
def stop(self): """Stop the loop after the current event loop iteration is complete. If the event loop is not currently running, the next call to start() will return immediately. To use asynchronous methods from otherwise-synchronous code (such as unit tests), you can start and stop the event loop like this: ioloop = IOLoop() async_method(ioloop=ioloop, callback=ioloop.stop) ioloop.start() ioloop.start() will return after async_method has run its callback, whether that callback was invoked before or after ioloop.start. """ self._running = False self._stopped = True self._wake()
简单的设置标志位后,向管道发送"x"停止事件循环
总结:IOLoop差不多就是这些内容,利用python简单和高可读性,看网络模块的实现会让我们更加的专注于
实现,而不是繁琐的基础代码的使用过程。
后面将看看IOStream类,是建立在IOLoop的一个上层封装,实现了基本的buffer事件