zoukankan      html  css  js  c++  java
  • Python Tornado框架(ioloop对象分析)

    网上都说nginx和lighthttpd是高性能web服务器,而tornado也是著名的高抗负载应用,它们间有什么相似处呢?上节提到的ioloop对象是如何循环的呢?往下看。

    首先关于TCP服务器的开发上节已经提过,很明显那个三段式的示例是个效率很低的(因为只有一个连接被端开新连接才能被接受)。要想开发高性能的服务器,就得在这accept上下功夫。

    首先,新连接的到来一般是经典的三次握手,只有当服务器收到一个SYN时才说明有一个新连接(还没建立),这时监听fd是可读的可以调用accept,此前服务器可以干点别的,这就是SELECT/POLL/EPOLL的思路。而只有三次握手成功后,accept才会返回,此时监听fd是读完成状态,似乎服务器在此之前可以转身去干别的,等到读完成再调用accept就不会有延迟了,这就是AIO的思路,不过在*nix平台上好像支持不是很广。。。再有,accept得到的新fd,不一定是可读的(客户端请求还没到达),所以可以等新fd可读时在read()(可能会有一点延迟),也可以用AIO等读完后再read就不会延迟了。同样类似,对于write,close也有类似的事件。

    总的思路就是,在我们关心的fd上注册关心的多个事件,事件发生了就启动回调,没发生就看点别的。这是单线程的,多线程的复杂一点,但差不多。nginx和lightttpd以及tornado都是类似的方式,只不过是多进程和多线程或单线程的区别而已。为简便,我们只分析tornado单线程的情况。

    关于ioloop.py的代码,主要有两个要点。一个是configurable机制,一个就是epoll循环。先看epoll循环吧。IOLoop 类的start是循环所在,但它必须被子类覆盖实现,因此它的start在PollIOLoop里。略过循环外部的多线程上下文环境的保存与恢复,单看循环:

    while True:
    poll_timeout = 3600.0
    
    # Prevent IO event starvation by delaying new callbacks
    # to the next iteration of the event loop.
    with self._callback_lock:
    	callbacks = self._callbacks
    	self._callbacks = []
    for callback in callbacks:
    	self._run_callback(callback)
    
    if self._timeouts:
    	now = self.time()
    	while self._timeouts:
    		if self._timeouts[0].callback is None:
    			# the timeout was cancelled
    			heapq.heappop(self._timeouts)
    		elif self._timeouts[0].deadline <= now:
    			timeout = heapq.heappop(self._timeouts)
    			self._run_callback(timeout.callback)
    		else:
    			seconds = self._timeouts[0].deadline - now
    			poll_timeout = min(seconds, poll_timeout)
    			break
    
    if self._callbacks:
    	# If any callbacks or timeouts called add_callback,
    	# we don't want to wait in poll() before we run them.
    	poll_timeout = 0.0
    
    if not self._running:
    	break
    
    if self._blocking_signal_threshold is not None:
    	# clear alarm so it doesn't fire while poll is waiting for
    	# events.
    	signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
    try:
    	event_pairs = self._impl.poll(poll_timeout)
    except Exception as e:
    	# Depending on python version and IOLoop implementation,
    	# different exception types may be thrown and there are
    	# two ways EINTR might be signaled:
    	# * e.errno == errno.EINTR
    	# * e.args is like (errno.EINTR, 'Interrupted system call')
    	if (getattr(e, 'errno', None) == errno.EINTR or
    		(isinstance(getattr(e, 'args', None), tuple) and
    		 len(e.args) == 2 and e.args[0] == errno.EINTR)):
    		continue
    	else:
    		raise
    
    if self._blocking_signal_threshold is not None:
    	signal.setitimer(signal.ITIMER_REAL,
    					 self._blocking_signal_threshold, 0)
    
    # Pop one fd at a time from the set of pending fds and run
    # its handler. Since that handler may perform actions on
    # other file descriptors, there may be reentrant calls to
    # this IOLoop that update self._events
    self._events.update(event_pairs)
    while self._events:
    	fd, events = self._events.popitem()
    	try:
    		self._handlers[fd](fd, events)
    	except (OSError, IOError) as e:
    		if e.args[0] == errno.EPIPE:
    			# Happens when the client closes the connection
    			pass
    		else:
    			app_log.error("Exception in I/O handler for fd %s",
    						  fd, exc_info=True)
    	except Exception:
    		app_log.error("Exception in I/O handler for fd %s",
    					  fd, exc_info=True)

    首先是设定超时时间。然后在互斥锁下取出上次循环遗留下的回调列表(在add_callback添加对象),把这次列表置空,然后依次执行列表里的回调。这里的_run_callback就没什么好分析的了。紧接着是检查上次循环遗留的超时列表,如果列表里的项目有回调而且过了截止时间,那肯定超时了,就执行对应的超时回调。然后检查是否又有了事件回调(因为很多回调函数里可能会再添加回调),如果是,则不在poll循环里等待,如注释所述。接下来最关键的一句是event_pairs = self._impl.poll(poll_timeout),这句里的_impl是epoll,在platform/epoll.py里定义,总之就是一个等待函数,当有事件(超时也算)发生就返回。然后把事件集保存下来,对于每个事件,self._handlers[fd](fd, events)根据fd找到回调,并把fd和事件做参数回传。如果fd是监听的fd,那么这个回调handler就是accept_handler函数,详见上节代码。如果是新fd可读,一般就是_on_headers 或者 _on_requet_body了,详见前几节。我好像没看到可写时的回调?以上,就是循环的流程了。可能还是看的糊里糊涂的,因为很多对象怎么来的都不清楚,configurable也还没有看。看完下面的分析,应该就可以了。

    Configurable类在util.py里被定义。类里有一段注释,已经很明确的说明了它的设计意图和用法。它是可配置接口的父类,可配置接口对外提供一致的接口标识,但它的子类实现可以在运行时进行configure。一般在跨平台时由于子类实现有多种选择,这时候就可以使用可配置接口,例如select和epoll。首先注意 Configurable 的两个函数: configurable_base 和 configurable_default, 两函数都需要被子类(即可配置接口类)覆盖重写。其中,base函数一般返回接口类自身,default返回接口的默认子类实现,除非接口指定了__impl_class。IOLoop及其子类实现都没有初始化函数也没有构造函数,其构造函数继承于Configurable,如下:

    def __new__(cls, **kwargs):
    	base = cls.configurable_base()
    	args = {}
    	if cls is base:
    		impl = cls.configured_class()
    		if base.__impl_kwargs:
    			args.update(base.__impl_kwargs)
    	else:
    		impl = cls
    	args.update(kwargs)
    	instance = super(Configurable, cls).__new__(impl)
    	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
    	# singleton magic.  If we get rid of that we can switch to __init__
    	# here too.
    	instance.initialize(**args)
    	return instance
    

    当子类对象被构造时,子类__new__被调用,因此参数里的cls指的是Configurabel的子类(可配置接口类,如IOLoop)。先是得到base,查看IOLoop的代码发现它返回的是自身类。由于base和cls是一样的,所以调用configured_class()得到接口的子类实现,其实就是调用base(现在是IOLoop)的configurable_default,总之就是返回了一个子类实现(epoll/kqueue/select之一),顺便把__impl_kwargs合并到args里。接着把kwargs并到args里。然后调用Configurable的父类(Object)的__new__方法,生成了一个impl的对象,紧接着把args当参数调用该对象的initialize(继承自PollIOloop,其initialize下段进行分析),返回该对象。

    所以,当构造IOLoop对象时,实际得到的是EPollIOLoop或其它相似子类。另外,Configurable 还提供configure方法来给接口指定实现子类和参数。可以看的出来,Configurable类主要提供构造方法,相当于对象工厂根据配置来生产对象,同时开放configure接口以供配置。而子类按照约定调整配置即可得到不同对象,代码得到了复用。

    解决了构造,来看看IOLoop的instance方法。先检查类是否有成员_instance,一开始肯定没有,于是就构造了一个IOLoop对象(即EPollIOLoop对象)。以后如果再调用instance,得到的则是已有的对象,这样就确保了ioloop在全局是单例。再看epoll循环时注意到self._impl,Configurable 和 IOLoop 里都没有, 这是在哪儿定义的呢? 为什么IOLoop的start跑到PollIOLoop里,应该是EPollIOLoop才对啊。 对,应该看出来了,EPollIOLoop 就是PollIOLoop的子类,所以方法被继承了是很常见的哈。

    从上一段的构造流程里可以看到,EPollIOLoop对象的initialize方法被调用了,看其代码发现它调用了其父类(PollIOLoop)的它方法, 并指定了impl=select.epoll(), 然后在父类的方法里就把它保存了下来,所以self._impl.poll就等效于select.epoll().poll().PollIOLoop里还有一些注册,修改,删除监听事件的方法,其实就是对self._impl的封装调用。就如上节的 add_accept_handler 就是调用ioloop的add_handler方法把监听fd和accept_handler方法进行关联。

    IOLoop基本是个事件循环,因此它总是被其它模块所调用。而且为了足够通用,基本上对回调没多大限制,一个可执行对象即可。事件分发就到此结束了,和IO事件密切相关的另一个部分是IOStream,看看它是如何读写的。

    IOLoop instance()方法的讲解

    Tornado 的源码写得有点难懂,需要你理解好 socket、epoll 这样的东西才能充分理解。需要深入到 Tornado 的源码,ioloop.py 这个文件很关键。

    接下来,我们继续读 ioloop.py 这个文件。

    IOLoop 是基于 epoll 实现的底层网络I/O的核心调度模块,用于处理 socket 相关的连接、响应、异步读写等网络事件。每个 Tornado 进程都会初始化一个全局唯一的 IOLoop 实例,在 IOLoop 中通过静态方法 instance() 进行封装,获取 IOLoop 实例直接调用此方法即可。

    @staticmethod
    def instance():
    	"""Returns a global `IOLoop` instance.
    
    	Most applications have a single, global `IOLoop` running on the
    	main thread.  Use this method to get this instance from
    	another thread.  To get the current thread's `IOLoop`, use `current()`.
    	"""
    	if not hasattr(IOLoop, "_instance"):
    		with IOLoop._instance_lock:
    			if not hasattr(IOLoop, "_instance"):
    				# New instance after double check
    				IOLoop._instance = IOLoop()
    	return IOLoop._instance
    

    Tornado 服务器启动时会创建监听 socket,并将 socket 的 file descriptor 注册到 IOLoop 实例中,IOLoop 添加对 socket 的IOLoop.READ 事件监听并传入回调处理函数。当某个 socket 通过 accept 接受连接请求后调用注册的回调函数进行读写。接下来主要分析IOLoop 对 epoll 的封装和 I/O 调度具体实现。

    epoll是Linux内核中实现的一种可扩展的I/O事件通知机制,是对POISX系统中 select 和 poll 的替代,具有更高的性能和扩展性,FreeBSD中类似的实现是kqueue。Tornado中基于Python C扩展实现的的epoll模块(或kqueue)对epoll(kqueue)的使用进行了封装,使得IOLoop对象可以通过相应的事件处理机制对I/O进行调度。具体可以参考前面小节的 预备知识:我读过的对epoll最好的讲解 。

    IOLoop模块对网络事件类型的封装与epoll一致,分为READ / WRITE / ERROR三类,具体在源码里呈现为:

    # Our events map exactly to the epoll events
    NONE = 0
    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP
    

     回到前面章节的 开始用Tornado:从Hello World开始 里面的示例,

    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()
    

    前两句是启动服务器,启动服务器之后,还需要启动 IOLoop 的实例,这样可以启动事件循环机制,配合非阻塞的 HTTP Server 工作。更多关于 IOLoop的与Http服务器的细节,在 Tornado对Web请求与响应的处理机制 这里有介绍到。

    这就是 IOLoop 的 instance() 方法的一些细节,接下来我们再看看 start() 的细节。

     IOLoop start()里的核心调度

    IOLoop的初始化

    初始化过程中选择 epoll 的实现方式,Linux 平台为 epoll,BSD 平台为 kqueue,其他平台如果安装有C模块扩展的 epoll 则使用 tornado对 epoll 的封装,否则退化为 select。

    def __init__(self, impl=None):
        self._impl = impl or _poll()
        #省略部分代码
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)
    
    def add_handler(self, fd, handler, events):
        """Registers the given handler to receive the given events for fd."""
        self._handlers[fd] = stack_context.wrap(handler)
        self._impl.register(fd, events | self.ERROR)
    

    在 IOLoop 初始化的过程中创建了一个 Waker 对象,将 Waker 对象 fd 的读端注册到事件循环中并设定相应的回调函数(这样做的好处是当事件循环阻塞而没有响应描述符出现,需要在最大 timeout 时间之前返回,就可以向这个管道发送一个字符)。

    Waker 的使用:一种是在其他线程向 IOLoop 添加 callback 时使用,唤醒 IOLoop 同时会将控制权转移给 IOLoop 线程并完成特定请求。唤醒的方法向管道中写入一个字符'x'。另外,在 IOLoop的stop 函数中会调用self._waker.wake(),通过向管道写入'x'停止事件循环。

    add_handler 函数使用了stack_context 提供的 wrap 方法。wrap 返回了一个可以直接调用的对象并且保存了传入之前的堆栈信息,在执行时可以恢复,这样就保证了函数的异步调用时具有正确的运行环境。

    IOLoop的start方法

    IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用 start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

    • 为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
    • 超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback 调用函数;如果没有超时需要重新设定 poll_timeout 的值。
    • 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
    • epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
    while True:
        poll_timeout = 3600.0
    
        with self._callback_lock:
            callbacks = self._callbacks
            self._callbacks = []
        for callback in callbacks:
            self._run_callback(callback)
    
        # 超时处理
        if self._timeouts:
            now = time.time()
            while self._timeouts:
                if self._timeouts[0].callback is None:
                    # the timeout was cancelled
                    heapq.heappop(self._timeouts)
                elif self._timeouts[0].deadline <= now:
                    timeout = heapq.heappop(self._timeouts)
                    self._run_callback(timeout.callback)
                else:
                    seconds = self._timeouts[0].deadline - now
                    poll_timeout = min(seconds, poll_timeout)
                    break
    
        if self._callbacks:
            # If any callbacks or timeouts called add_callback,
            # we don't want to wait in poll() before we run them.
            poll_timeout = 0.0
    
        if not self._running:
            break
    
        if self._blocking_signal_threshold is not None:
            # clear alarm so it doesn't fire while poll is waiting for events.
            signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
        # epoll阻塞,当有事件通知或超时返回event_pairs
        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
            # 异常处理,省略
    
        # 对epoll返回event_pairs事件的处理
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)
            except Exception e:
                # 异常处理,省略
    

    3.0后的一些改动

    Tornado3.0以后 IOLoop 模块的一些改动。

    IOLoop 成为 util.Configurable 的子类,IOLoop 中绝大多数成员方法都作为抽象接口,具体实现由派生类 PollIOLoop 完成。IOLoop 实现了 Configurable 中的 configurable_base 和 configurable_default 这两个抽象接口,用于初始化过程中获取类类型和类的实现方法(即 IOLoop 中 poller 的实现方式)。

    在 Tornado3.0+ 中针对不同平台,单独出 poller 相应的实现,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均继承于 PollIOLoop。下边的代码是 configurable_default 方法根据平台选择相应的 epoll 实现。初始化 IOLoop 的过程中会自动根据平台选择合适的 poller 的实现方法。

    @classmethod
    def configurable_default(cls):
    	if hasattr(select, "epoll"):
    		from tornado.platform.epoll import EPollIOLoop
    		return EPollIOLoop
    	if hasattr(select, "kqueue"):
    		# Python 2.6+ on BSD or Mac
    		from tornado.platform.kqueue import KQueueIOLoop
    		return KQueueIOLoop
    	from tornado.platform.select import SelectIOLoop
    	return SelectIOLoop
    

    IOLoop与Configurable类

    IOLoop 是 tornado 的核心。程序中主函数通常调用 tornado.ioloop.IOLoop.instance().start() 来启动IOLoop,但是看了一下 IOLoop 的实现,start 方法是这样的:

    def start(self):
    	"""Starts the I/O loop.
    
    	The loop will run until one of the callbacks calls `stop()`, which
    	will make the loop stop after the current event iteration completes.
    	"""
    	raise NotImplementedError()
    

    也就是说 IOLoop 是个抽象的基类,具体工作是由它的子类负责的。由于是 Linux 平台,所以应该用 Epoll,对应的类是 PollIOLoop。PollIOLoop 的 start 方法开始了事件循环。

    问题来了,tornado.ioloop.IOLoop.instance() 是怎么返回 PollIOLoop 实例的呢?刚开始有点想不明白,后来看了一下 IOLoop 的代码就豁然开朗了。

    IOLoop 继承自 Configurable,后者位于 tornado/util.py。

    A configurable interface is an (abstract) class whose constructor acts as a factory function for one of its implementation subclasses. The implementation subclass as well as optional keyword arguments to its initializer can be set globally at runtime with configure.

    Configurable 类实现了一个工厂方法,也就是设计模式中的“工厂模式”,看一下__new__函数的实现:

    def __new__(cls, **kwargs):
    	base = cls.configurable_base()
    	args = {}
    	if cls is base:
    		impl = cls.configured_class()
    		if base.__impl_kwargs:
    			args.update(base.__impl_kwargs)
    	else:
    		impl = cls
    	args.update(kwargs)
    	instance = super(Configurable, cls).__new__(impl)
    	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
    	# singleton magic.  If we get rid of that we can switch to __init__
    	# here too.
    	instance.initialize(**args)
    	return instance
    

     当创建一个Configurable类的实例的时候,其实创建的是configurable_class()返回的类的实例。

    @classmethod
    def configured_class(cls):
    	"""Returns the currently configured class."""
    	base = cls.configurable_base()
    	if cls.__impl_class is None:
    		base.__impl_class = cls.configurable_default()
    	return base.__impl_class
    

     最后,就是返回的configurable_default()。此函数在IOLoop中的实现如下:

    @classmethod
    def configurable_default(cls):
    	if hasattr(select, "epoll"):
    		from tornado.platform.epoll import EPollIOLoop
    		return EPollIOLoop
    	if hasattr(select, "kqueue"):
    		# Python 2.6+ on BSD or Mac
    		from tornado.platform.kqueue import KQueueIOLoop
    		return KQueueIOLoop
    	from tornado.platform.select import SelectIOLoop
    	return SelectIOLoop
    

     EPollIOLoop 是 PollIOLoop 的子类。至此,这个流程就理清楚了。

    对socket封装的IOStream机制概览

    IOStream对socket读写进行了封装,分别提供读、写缓冲区实现对socket的异步读写。当socket被accept之后HTTPServer的_handle_connection会被回调并初始化IOStream对象,进一步通过IOStream提供的功能接口完成socket的读写。文章接下来将关注IOStream实现读写的细节。

    IOStream的初始化

    IOStream初始化过程中主要完成以下操作:

    1. 绑定对应的socket
    2. 绑定ioloop
    3. 创建读缓冲区_read_buffer,一个python deque容器
    4. 创建写缓冲区_write_buffer,同样也是一个python deque容器

    IOStream提供的主要功能接口

    主要的读写接口包括以下四个:

    class IOStream(object):
    	def read_until(self, delimiter, callback): 
    	def read_bytes(self, num_bytes, callback, streaming_callback=None): 
    	def read_until_regex(self, regex, callback): 
    	def read_until_close(self, callback, streaming_callback=None): 
    	def write(self, data, callback=None):
    
    • read_until和read_bytes是最常用的读接口,它们工作的过程都是先注册读事件结束时调用的回调函数,然后调用_try_inline_read方法。_try_inline_read首先尝试_read_from_buffer,即从上一次的读缓冲区中取数据,如果有数据直接调用 self._run_callback(callback, self._consume(data_length)) 执行回调函数,_consume消耗掉了_read_buffer中的数据;否则即_read_buffer之前没有未读数据,先通过_read_to_buffer将数据从socket读入_read_buffer,然后再执行_read_from_buffer操作。read_until和read_bytes的区别在于_read_from_buffer过程中截取数据的方法不同,read_until读取到delimiter终止,而read_bytes则读取num_bytes个字节终止。执行过程如下图所示
    • read_until_regex相当于delimiter为某一正则表达式的read_until。
    • read_until_close主要用于IOStream流关闭前后的读取:如果调用read_until_close时stream已经关闭,那么将会_consume掉_read_buffer中的所有数据;否则_read_until_close标志位设为True,注册_streaming_callback回调函数,调用_add_io_state添加io_loop.READ状态。
    • write首先将data按照数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,然后调用handle_write向socket发送数据。

    其他内部功能接口

    • def _handle_events(self, fd, events): 通常为IOLoop对象add_handler方法传入的回调函数,由IOLoop的事件机制来进行调度。
    • def _add_io_state(self, state): 为IOLoop对象的handler注册IOLoop.READ或IOLoop.WRITE状态,handler为IOStream对象的_handle_events方法。
    • def _consume(self, loc): 合并读缓冲区loc个字节,从读缓冲区删除并返回这些数据。

    Tornado的多进程管理分析

    Tornado的多进程管理我们可以参看process.py这个文件。

    在编写多进程的时候我们一般都用python自带的multiprocessing,使用方法和threading基本一致,只需要继承里面的Process类以后就可以编写多进程程序了,这次我们看看tornado是如何实现他的multiprocessing,可以说实现的功能不多,但是更加简单高效。

    我们只看fork_process里面的代码:

    global _task_id
        assert _task_id is None
        if num_processes is None or num_processes <= 0:
            num_processes = cpu_count()
        if ioloop.IOLoop.initialized():
            raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                               "has already been initialized. You cannot call "
                               "IOLoop.instance() before calling start_processes()")
        logging.info("Starting %d processes", num_processes)
        children = {}
    

     这一段很简单,就是在没有传入进程数的时候使用默认的cpu个数作为将要生成的进程个数。

    def start_child(i):
    	pid = os.fork()
    	if pid == 0:
    		# child process
    		_reseed_random()
    		global _task_id
    		_task_id = i
    		return i
    	else:
    		children[pid] = i
    		return None
    

    这是一个内函数,作用就是生成子进程。fork是个很有意思的方法,他会同时返回两种状态,为什么呢?其实fork相当于在原有的一条路(父进程)旁边又修了一条路(子进程)。如果这条路修成功了,那么在原有的路上(父进程)你就看到旁边来了另外一条路(子进程),所以也就是返回新生成的那条路的名字(子进程的pid),但是在另外一条路上(子进程),你看到的是自己本身修建成功了,也就返回自己的状态码(返回结果是0)。

    所以if pid==0表示这时候cpu已经切换到子进程了,相当于我们在新生成的这条路上面做事(返回任务id);else表示又跑到原来的路上做事了,在这里我们记录下新生成的子进程,这时候children[pid]=i里面的pid就是新生成的子进程的pid,而 i 就是刚才在子进程里面我们返回的任务id(其实就是用来代码子进程的id号)。

    for i in range(num_processes):
    	id = start_child(i)
    	if id is not None:
    		return id
    

     if id is not None表示如果我们在刚刚生成的那个子进程的上下文里面,那么就什么都不干,直接返回子进程的任务id就好了,啥都别想了,也别再折腾。如果还在父进程的上下文的话那么就继续生成子进程。

    num_restarts = 0
        while children:
            try:
                pid, status = os.wait()
            except OSError, e:
                if e.errno == errno.EINTR:
                    continue
                raise
            if pid not in children:
                continue
            id = children.pop(pid)
            if os.WIFSIGNALED(status):
                logging.warning("child %d (pid %d) killed by signal %d, restarting",
                                id, pid, os.WTERMSIG(status))
            elif os.WEXITSTATUS(status) != 0:
                logging.warning("child %d (pid %d) exited with status %d, restarting",
                                id, pid, os.WEXITSTATUS(status))
            else:
                logging.info("child %d (pid %d) exited normally", id, pid)
                continue
            num_restarts += 1
            if num_restarts > max_restarts:
                raise RuntimeError("Too many child restarts, giving up")
            new_id = start_child(id)
            if new_id is not None:
                return new_id
    

    剩下的这段代码都是在父进程里面做的事情(因为之前在子进程的上下文的时候已经返回了,当然子进程并没有结束)。

    pid, status = os.wait()的意思是等待任意子进程退出或者结束,这时候我们就把它从我们的children表里面去除掉,然后通过status判断子进程退出的原因。

    如果子进程是因为接收到kill信号或者抛出exception了,那么我们就重新启动一个子进程,用的当然还是刚刚退出的那个子进程的任务号。如果子进程是自己把事情做完了才退出的,那么就算了,等待别的子进程退出吧。

    我们看到在重新启动子进程的时候又使用了

    if new_id is not None:
        return new_id
    

     主要就是退出子进程的空间,只在父进程上面做剩下的事情,不然刚才父进程的那些代码在子进程里面也会同样的运行,就会形成无限循环了,我没试过,不如你试试?

  • 相关阅读:
    java路径两种写法"/"和"\"以及 ./和../以及/之间的区别?
    几张图轻松理解String.intern()和String
    面向对象编程三大特性------封装、继承、多态
    markdown操作手册
    index索引的一些简单理解
    Mac 上flink的安装与启动
    C语言实现俄罗斯方块游戏
    Maven_学习、搭建、应用
    PHP学习笔记---高级知识
    软件设计师笔记---寻址方式
  • 原文地址:https://www.cnblogs.com/jasonwang-2016/p/5950548.html
Copyright © 2011-2022 走看看