zoukankan      html  css  js  c++  java
  • oversser 热重启流程分析

      oversser在go中创建可监控、重启、自升级二进制文件的包。分为master模式和slave模式;master为父进程,创建监听套接字、设置isSlave标记等一些初始化工作,之后调用forkExec创建slave子进程来执行bin类型的二进制可执行文件;热升级时发送SIGUSR2信号给master进程,master会转给slave使其停止,之后master再创建slave子进程来执行升级后的bin文件;虽然子进程重新fork,但是此时父进程还是可以接受新的连接请求,保证业务不中断。

      不论是Run还是RunErr,底层调用都是runErr,热重启流程分析如下,见注释

    func runErr(c *Config) error {
        //os not supported
        if !supported {
            return fmt.Errorf("os (%s) not supported", runtime.GOOS)
        }
        //配置检查
        if err := validate(c); err != nil {
            return err
        }
        //环境变量合法性检查
        if sanityCheck() {
            return nil
        }
        //run either in master or slave mode
        if os.Getenv(envIsSlave) == "1" {
            currentProcess = &slave{Config: c}
        } else {
            currentProcess = &master{Config: c}//第一次启动会执行此分支
        }
        //master/slave 分别实现了currentProcess此接口
        return currentProcess.run()
    }

      master的run

    func (mp *master) run() error {
        mp.debugf("run")
        if err := mp.checkBinary(); err != nil {
            return err
        }
        if mp.Config.Fetcher != nil {
            if err := mp.Config.Fetcher.Init(); err != nil {
                mp.warnf("fetcher init failed (%s). fetcher disabled.", err)
                mp.Config.Fetcher = nil
            }
        }
        //处理信号,开启协程,对应的信号调用其对应的处理函数
        mp.setupSignalling()
        //处理监听套接字,监听请求到来
        if err := mp.retreiveFileDescriptors(); err != nil {
            return err
        }
        if mp.Config.Fetcher != nil {
            mp.printCheckUpdate = true
            mp.fetch()
            go mp.fetchLoop()
        }
        return mp.forkLoop()
    }
    
    //not a real fork
    func (mp *master) forkLoop() error {
        //loop, restart command
        for {
            if err := mp.fork(); err != nil {
                return err
            }
        }
    }
    
    func (mp *master) fork() error {
        mp.debugf("starting %s", mp.binPath)
        cmd := exec.Command(mp.binPath)
        //mark this new process as the "active" slave process.
        //this process is assumed to be holding the socket files.
        mp.slaveCmd = cmd
        mp.slaveID++
        //provide the slave process with some state
        e := os.Environ()
        e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash))
        e = append(e, envBinPath+"="+mp.binPath)
        e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID))
        e = append(e, envIsSlave+"=1")//标记为子进程
        e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles)))
        cmd.Env = e
        //inherit master args/stdfiles
        cmd.Args = os.Args
        cmd.Stdin = os.Stdin
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
        //include socket files
        cmd.ExtraFiles = mp.slaveExtraFiles
        if err := cmd.Start(); err != nil {//fork子进程,执行bin文件
            return fmt.Errorf("Failed to start slave process: %s", err)
        }
        //was scheduled to restart, notify success
        if mp.restarting {
            mp.restartedAt = time.Now()
            mp.restarting = false
            mp.restarted <- true
        }
        //convert wait into channel
        cmdwait := make(chan error)
        go func() {
            cmdwait <- cmd.Wait()
        }()
        //wait....
        select {
        case err := <-cmdwait:
            //program exited before releasing descriptors
            //proxy exit code out to master
            code := 0
            if err != nil {
                code = 1
                if exiterr, ok := err.(*exec.ExitError); ok {
                    if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
                        code = status.ExitStatus()
                    }
                }
            }
            mp.debugf("prog exited with %d", code)
            //if a restarts are disabled or if it was an
            //unexpected crash, proxy this exit straight
            //through to the main process
            if mp.NoRestart || !mp.restarting {
                os.Exit(code)
            }
        case <-mp.descriptorsReleased://子进程收到SIGUSR2腿出后,会给master发信号,master收到信号会置此变量,然后返回nil,重新fork
            //if descriptors are released, the program
            //has yielded control of its sockets and
            //a parallel instance of the program can be
            //started safely. it should serve state.Listeners
            //to ensure downtime is kept at <1sec. The previous
            //cmd.Wait() will still be consumed though the
            //result will be discarded.
        }
        return nil
    }

      收到重启信号的处理

    func (mp *master) setupSignalling() {
        //updater-forker comms
        mp.restarted = make(chan bool)
        mp.descriptorsReleased = make(chan bool)
        //read all master process signals
        signals := make(chan os.Signal)
        signal.Notify(signals)//master接受所有信号
        go func() {
            for s := range signals {
                mp.handleSignal(s)//执行对应信号处理程序
            }
        }()
    }
    
    func (mp *master) handleSignal(s os.Signal) {
        if s == mp.RestartSignal {
            //user initiated manual restart
            go mp.triggerRestart()//此信号为自定义信号,在调用处可指定,一般用SIGUSR2
        } else if s.String() == "child exited" {
            // will occur on every restart, ignore it
        } else
        //**during a restart** a SIGUSR1 signals
        //to the master process that, the file
        //descriptors have been released
        if mp.awaitingUSR1 && s == SIGUSR1 {//slave进程退出时发送SIGUSR1给master,master收到后设置descriptorsReleased标记后,master的fork中阻塞的select返回nil,重新fork
            mp.debugf("signaled, sockets ready")
            mp.awaitingUSR1 = false
            mp.descriptorsReleased <- true
        } else
        //while the slave process is running, proxy
        //all signals through
        if mp.slaveCmd != nil && mp.slaveCmd.Process != nil {
            mp.debugf("proxy signal (%s)", s)
            mp.sendSignal(s)
        } else
        //otherwise if not running, kill on CTRL+c
        if s == os.Interrupt {
            mp.debugf("interupt with no slave")
            os.Exit(1)
        } else {
            mp.debugf("signal discarded (%s), no slave process", s)
        }
    }
    
    func (mp *master) triggerRestart() {
        if mp.restarting {
            mp.debugf("already graceful restarting")
            return //skip
        } else if mp.slaveCmd == nil || mp.restarting {
            mp.debugf("no slave process")
            return //skip
        }
        mp.debugf("graceful restart triggered")
        mp.restarting = true
        mp.awaitingUSR1 = true
        mp.signalledAt = time.Now()
        mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate//向slave发送停止信号
        select {
        case <-mp.restarted:
            //success
            mp.debugf("restart success")
        case <-time.After(mp.TerminateTimeout):
            //times up mr. process, we did ask nicely!
            mp.debugf("graceful timeout, forcing exit")
            mp.sendSignal(os.Kill)
        }
    }

      总的来说:master fork出slave执行bin文件后,在另外开启的协程中循环处理信号

    1. 如果是RestartSignal信号,那么就开启一个协程调用triggerRestart方法,即执行重启逻辑。triggerRestart方法向slave发送一个RestartSignal信号
    2. 如果是其它信号,并且overseerslave进程已在运行,那么就发送这个信号给slave进程
    3. 如果slave进程没有开启,并且是Ctrl+c操作,那么就退出overseer
    4. 如果slave进程没有打开,并且是非Crtl+c操作发出的信号,那么就打印日志
    slave
    func (sp *slave) run() error {
        sp.id = os.Getenv(envSlaveID)
        sp.debugf("run")
        sp.state.Enabled = true
        sp.state.ID = os.Getenv(envBinID)
        sp.state.StartedAt = time.Now()
        sp.state.Address = sp.Config.Address
        sp.state.Addresses = sp.Config.Addresses
        sp.state.GracefulShutdown = make(chan bool, 1)
        sp.state.BinPath = os.Getenv(envBinPath)
        //由master pid找到master信息,以便在slava终止后向master发送SIGUSR1
        if err := sp.watchParent(); err != nil {
            return err
        }
        //初始化文件描述符
        if err := sp.initFileDescriptors(); err != nil {
            return err
        }
        //处理信号
        sp.watchSignal()
        //run program with state
        sp.debugf("start program")
        sp.Config.Program(sp.state)//执行prog
        return nil
    }
    
    func (sp *slave) watchSignal() {
        signals := make(chan os.Signal)
        signal.Notify(signals, sp.Config.RestartSignal)
        go func() {
            <-signals
            signal.Stop(signals)
            sp.debugf("graceful shutdown requested")
            //master wants to restart,
            close(sp.state.GracefulShutdown)
            //release any sockets and notify master
            if len(sp.listeners) > 0 {
                //perform graceful shutdown
                for _, l := range sp.listeners {
                    l.release(sp.Config.TerminateTimeout)
                }
                //signal release of held sockets, allows master to start
                //a new process before this child has actually exited.
                //early restarts not supported with restarts disabled.
                if !sp.NoRestart {
                    sp.masterProc.Signal(SIGUSR1)//释放完资源后向master发送SIGUSR1
                }
                //listeners should be waiting on connections to close...
            }
            //start death-timer
            go func() {
                time.Sleep(sp.Config.TerminateTimeout)
                sp.debugf("timeout. forceful shutdown")
                os.Exit(1)
            }()
        }()
    }

      总的来说:slave收到终止信号释放完资源后要向master发送SIGUSR1,master收到SIGUSR1后,设置mp.descriptorsReleased <- true,因为master的fork阻塞在select descriptorsReleased数据写入,master写入其数据后,fork中select返回nil,继续执行循环fork。

    实践

    //create another main() to run the overseer process
    //and then convert your old main() into a 'prog(state)'
    overseer.Run(overseer.Config{
        Program:   prog,//slave进程运行prog
        Addresses: []string{":5005", ":5006"},
        //Fetcher: &fetcher.File{Path: "my_app_next"},
        Debug: true, 
      RestartSignal: syscall.SIGUSR2, 
    })// splay log of overseer actions
    
    //prog(state) runs in a child process
    
    //convert your 'main()' into a 'prog(state)'
    //'prog()' is run in a child process
    func prog(state overseer.State) {
        fmt.Printf("app#%s (%s) listening...
    ", BuildID, state.ID)
        http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            duration, err := time.ParseDuration(r.FormValue("duration"))
            if err != nil {
                http.Error(w, err.Error(), 400)
                return
            }
            time.Sleep(duration)
            w.Write([]byte("Hello World"))
            fmt.Fprintf(w, "app#%s (%s) says hello
    ", BuildID, state.ID)
        }))
        http.Serve(state.Listener, nil)
        fmt.Printf("app#%s (%s) exiting...
    ", BuildID, state.ID)
    }

    日志如下,可对照上述流程梳理下逻辑

    /main &
    [1] 63416
    2021/08/10 14:34:59 [overseer master] run
    2021/08/10 14:34:59 [overseer master] starting /Users/user/test_go/main
    2021/08/10 14:34:59 [overseer slave#1] run
    2021/08/10 14:34:59 [overseer slave#1] start program
    app#1 (b12eb45d8f6dfa86a8529a6aa25f59569d74f16b) listening...

    //获取旧的api数据
    test_go % curl "http://127.0.0.1:5005/sleep?duration=60s" &
    [2] 63423


    //替换bin文件,热升级
    test_go % rm -rf main
    test_go % go build -ldflags '-X main.BuildID=2' main.go
    test_go % kill -USR2 63416


    2021/08/10 14:36:21 [overseer master] graceful restart triggered
    2021/08/10 14:36:21 [overseer slave#1] graceful shutdown requested
    2021/08/10 14:36:21 [overseer master] signaled, sockets ready
    2021/08/10 14:36:21 [overseer master] starting /Users/user/test_go/main
    2021/08/10 14:36:21 [overseer master] restart success
    2021/08/10 14:36:21 [overseer master] signal discarded (urgent I/O condition), no slave process


    //旧的api数据输出
    Hello Worldapp#1 (b12eb45d8f6dfa86a8529a6aa25f59569d74f16b) says array
    app#1 (b12eb45d8f6dfa86a8529a6aa25f59569d74f16b) exiting...
    [2] + done curl "http://127.0.0.1:5005/sleep?duration=60s"


    2021/08/10 14:36:22 [overseer master] proxy signal (urgent I/O condition)
    2021/08/10 14:36:22 [overseer master] proxy signal (urgent I/O condition)
    2021/08/10 14:36:22 [overseer master] proxy signal (urgent I/O condition)
    2021/08/10 14:36:22 [overseer master] proxy signal (urgent I/O condition)
    2021/08/10 14:36:23 [overseer slave#2] run
    2021/08/10 14:36:23 [overseer slave#2] start program
    app#2 (b12eb45d8f6dfa86a8529a6aa25f59569d74f16b) listening...


    //获取新的api数据
    test_go % curl "http://127.0.0.1:5005/sleep?duration=1s"
    Hello Worldapp#2 (b12eb45d8f6dfa86a8529a6aa25f59569d74f16b) says hello

    oversser github https://github.com/jpillora/overseer

  • 相关阅读:
    linux下文件结束符
    【转】跟我学Kafka之NIO通信机制
    【转】 详解Kafka生产者Producer配置
    【转】项目延期的⑦大因素
    (转)EOSIO开发(三)钱包、账户与账户权限之概念篇
    CentOS里alias命令
    (转)EOSIO开发(一)使用Docker构建本地环境
    Marathon自动扩缩容(marathon-lb-autoscale)
    (转)Springboot日志配置(超详细,推荐)
    Spring Boot下的lombok安装以及使用简介
  • 原文地址:https://www.cnblogs.com/tianzeng/p/15125562.html
Copyright © 2011-2022 走看看