zoukankan      html  css  js  c++  java
  • gunicorn Arbiter 源码解析

      如前文所述,Arbiter是gunicorn master进程的核心。Arbiter主要负责管理worker进程,包括启动、监控、杀掉Worker进程;同时,Arbiter在某些信号发生的时候还可以热更新(reload)App应用,或者在线升级gunicorn。Arbiter的核心代码在一个文件里面,代码量也不大,源码在此:https://github.com/benoitc/gunicorn
      
    Arbiter主要有以下方法:
    setup:
        处理配置项,最重要的是worker数量和worker工作模型
     
    init_signal
        注册信号处理函数
     
    handle_xxx:
        各个信号具体的处理函数
     
    kill_worker,kill_workers:
        向worker进程发信号
     
    spawn_worker, spawn_workers:
        fork出新的worker进程
     
    murder_workers:
        杀掉一段时间内未响应的worker进程
     
    manage_workers:
        根据配置文件的worker数量,以及当前active的worker数量,决定是要fork还是kill worker进程
     
    reexec
        接收到信号SIGUSR2调用,在线升级gunicorn
     
    reload:
        接收到信号SIGHUP调用,会根据新的配置新启动worker进程,并杀掉之前的worker进程
     
    sleep
        在没有信号处理的时候,利用select的timeout进行sleep,可被唤醒
     
    wakeup
        通过向管道写消息,唤醒进程
     
    run
        主循环
     
      Arbiter真正被其他代码(Application)调用的函数只有__init__和run方法,在一句代码里:
        Arbiter(self).run()
      上面代码中的self即为Application实例,其中__init__调用setup进行配置项设置。下面是run方法伪代码
    def run()
        self.init_signal()
        self.LISTENERS = create_sockets(self.cfg, self.log)
        self.manage_workers()
        while True:
            if no signal in SIG_QUEUE
                self.sleep()
            else:
                handle_signal()
     关于fork子进程
      fork子进程的代码在 spawn_worker, 源码如下:
      
     1     def spawn_worker(self):
     2         self.worker_age += 1
     3         worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
     4                                    self.app, self.timeout / 2.0,
     5                                    self.cfg, self.log)
     6         self.cfg.pre_fork(self, worker)
     7         pid = os.fork()
     8         if pid != 0:
     9             self.WORKERS[pid] = worker
    10             return pid
    11 
    12         # Process Child
    13         worker_pid = os.getpid()
    14         try:
    15             util._setproctitle("worker [%s]" % self.proc_name)
    16             self.log.info("Booting worker with pid: %s", worker_pid)
    17             self.cfg.post_fork(self, worker)
    18             worker.init_process()
    19             sys.exit(0)
    20         except SystemExit:
    21             raise
    22         except AppImportError as e:
    23             self.log.debug("Exception while loading the application",
    24                            exc_info=True)
    25             print("%s" % e, file=sys.stderr)
    26             sys.stderr.flush()
    27             sys.exit(self.APP_LOAD_ERROR)
    28         except:
    29             self.log.exception("Exception in worker process"),
    30             if not worker.booted:
    31                 sys.exit(self.WORKER_BOOT_ERROR)
    32             sys.exit(-1)
    33         finally:
    34             self.log.info("Worker exiting (pid: %s)", worker_pid)
    35             try:
    36                 worker.tmp.close()
    37                 self.cfg.worker_exit(self, worker)
    38             except:
    39                 self.log.warning("Exception during worker exit:
    %s",
    40                                   traceback.format_exc())
    Arbiter.spawn_worker
      主要流程:
        (1)加载worker_class并实例化(默认为同步模型 SyncWorker)
        (2)父进程(master进程)fork之后return,之后的逻辑都在子进程中运行
        (3)调用worker.init_process 进入循环,worker的所有工作都在这个循环中
        (4)循环结束之后,调用sys.exit(0)
        (5)最后,在finally中,记录worker进程的退出
        
        下面是我自己写的一点代码,把主要的fork流程简化了一下
     1 # prefork.py
     2 import sys
     3 import socket
     4 import select
     5 import os
     6 import time
     7  
     8 def do_sub_process():
     9     pid = os.fork()
    10     if pid < 0:
    11         print 'fork error'
    12         sys.exit(-1)
    13     elif pid > 0:
    14         print 'fork sub process %d'  % pid
    15         return
    16  
    17     # must be child process
    18     time.sleep(1)
    19     print 'sub process will exit', os.getpid(), os.getppid()
    20     sys.exit(0)
    21  
    22 def main():
    23     sub_num = 2
    24     for i in range(sub_num):
    25         do_sub_process()
    26     time.sleep(10)
    27     print 'main process will exit', os.getpid()
    28  
    29 if __name__ == '__main__':
    30     main()
    在测试环境下输出:
      fork sub process 9601
      fork sub process 9602
      sub process will exit 9601 9600
      sub process will exit 9602 9600
      main process will exit 9600
     
      需要注意的是第20行调用了sys.exit, 保证子进程的结束,否则会继续main函数中for循环,以及之后的逻辑。注释掉第19行重新运行,看输出就明白了。
     
    关于kill子进程
      master进程要kill worker进程就很简单了,直接发信号,源码如下:
      
     1     def kill_worker(self, pid, sig):
     2         """
     3         Kill a worker
     4 
     5         :attr pid: int, worker pid
     6         :attr sig: `signal.SIG*` value
     7          """
     8         try:
     9             os.kill(pid, sig)
    10         except OSError as e:
    11             if e.errno == errno.ESRCH:
    12                 try:
    13                     worker = self.WORKERS.pop(pid)
    14                     worker.tmp.close()
    15                     self.cfg.worker_exit(self, worker)
    16                     return
    17                 except (KeyError, OSError):
    18                     return
    19             raise
     
    关于sleep与wakeup
      我们再来看看Arbiter的sleep和wakeup。Arbiter在没有信号需要处理的时候会"sleep",当然,不是真正调用time.sleep,否则信号来了也不能第一时间处理。这里得实现比较巧妙,利用了管道和select的timeout。看代码就知道了
            def sleep(self):
            """
            Sleep until PIPE is readable or we timeout.
            A readable PIPE means a signal occurred.
            """
                ready = select.select([self.PIPE[0]], [], [], 1.0) # self.PIPE = os.pipe()
                if not ready[0]: 
                    return
                while os.read(self.PIPE[0], 1):
                    pass

      代码里面的注释写得非常清楚,要么PIPE可读立即返回,要么等待超时。管道可读是因为有信号发生。这里看看pipe函数

      os.pipe()

    Create a pipe. Return a pair of file descriptors (r,w) usable for reading and writing, respectively.

     
      那我们看一下什么时候管道可读:肯定是往管道写入的东西,这就是wakeup函数的功能
            def wakeup(self):
                """
                Wake up the arbiter by writing to the PIPE
                """
                os.write(self.PIPE[1], b'.')

    最后附上Arbiter的信号处理

    • QUITINT: Quick shutdown
    • TERM: Graceful shutdown. Waits for workers to finish their current requests up to the graceful timeout.
    • HUP: Reload the configuration, start the new worker processes with a new configuration and gracefully shutdown older workers. If the application is not preloaded (using the --preloadoption), Gunicorn will also load the new version.
    • TTIN: Increment the number of processes by one
    • TTOU: Decrement the number of processes by one
    • USR1: Reopen the log files
    • USR2Upgrade the Gunicorn on the fly. A separate TERM signal should be used to kill the old process. This signal can also be used to use the new versions of pre-loaded applications.
    • WINCH: Gracefully shutdown the worker processes when Gunicorn is daemonized.
     
     
    reference:
  • 相关阅读:
    Ribbon 和 Eureka 积分
    zabbix 实现curl 显示器
    《算法入门经典大赛——培训指南》第二章考试
    今天你还抽象?
    Big Data Security Part One: Introducing PacketPig
    Big Data Analytics for Security(Big Data Analytics for Security Intelligence)
    CA
    通过Shell和Redis来实现集群业务中日志的实时收集分析
    用Maven编译Apache flume-ng 1.5.0源码及问题解决
    java8-concurrency-tutorial-thread-executor-examples
  • 原文地址:https://www.cnblogs.com/xybaby/p/6297089.html
Copyright © 2011-2022 走看看