zoukankan      html  css  js  c++  java
  • docker-containerd 启动流程分析

    一般在docker启动时,containerd的启动命令如下所示:

    root      2090  0.0  0.1 292780 11008 ?        Ssl  10月22   0:12 docker-containerd -l unix:///var/run/docker/libcontainerd/docker-containerd.sock --shim docker-containerd-shim 
    --metrics-interval=0 --start-timeout 2m --state-dir /var/run/docker/libcontainerd/containerd --runtime docker-runc

      

    1、containerd/containerd/main.go

    func daemon(context *cli.Context) error

    (1)、首先调用:

    sv, err := supervisor.New(
    
      context.String("state-dir"),
    
      context.String("runtime"),
    
      context.String("shim"),
    
      context.String("runtime-args"),
    
      context.String("start-timeout"),
    
      context.Int("retain-count"),
    
    )
    

      

    (2)、for循环10次,调用w := supervisor.NewWorker(sv, wg),再go w.Start()

    (3)、调用sv.Start(),启动supervisor

    (4)、调用server, err := startServer(listenParts[0], listenParts[1], sv),启动grpc server

    supervisor的数据结构定义如下所示:

    // Supervisor represents a container supervisor
    
    type Supervisor struct {
    
      // stateDir is the directory on the system to store container runtime state information
    
      stateDir    string
      // name of the OCI compatible runtime used to execute containers
    
      runtime    string
      runtimeArgs  []string
      shim      string
      containers   map[string]*containerInfo
      startTasks   chan *startTask
      // we need a lock around the subscribers map only because addtions and deletions from
    
      // the map via the API so we cannot really control the currency
    
      subscriberLock sync.RWMutex
      subscribers    map[chan Event]struct{}
      machine    Machine
      tasks       chan Task
      monitor      *Monitor
      eventLog    []Event
      eventLock     sync.Mutex
      timeout     time.Duration
    }
    

      

    2、containerd/supervisor/supervisor.go

    // New returns an initialized Process supervisor

    func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error)

    (1)、调用machine, err := CollectionMachineInformation(),获取当前宿主机的CPU数和RAM总量

    (2)、调用monitor, err := NewMonitor(),启动并返回一个监视器

    (3)、填充数据结构Supervisor:

    s := &Supervisor{
    
      stateDir:    stateDir,
    
      containers:   make(map[string]*ContainerInfo),
      startTasks:   startTasks,
      machine:    machine,
      subscriber:   make(map[chan Event]struct{}),
    
      tasks:      make(chan Task, defaultBufferSize),
      monitor:    monitor,
      runtime:    runtimeName,
      runtimeArgs:  runtimeArgs,
      shim:     shimName,
      timeout:    timeout,
    }
    

      

    (4)、调用setupEventLog(s, retainCount)设置event log

    (5)、生成两个goroutine,s.exitHandler()和s.oomHandler()

    (6)、最后,调用s.restore(),加载之前已经存在的容器

    3、containerd/supervisor/supervisor.go

    func (s *Supvervisor) restore() error

    (1)、遍历目录s.stateDir(其实就是/var/run/docker/libcontainerd/containerd)

    (2)、调用id := d.Name()获取容器id,再调用container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout),load的作用就是加载s.stateDir/id/state.json获取容器实例。之后,再遍历s.stateDir/id/下的pid 文件,加载容器中的process。

    (3)、调用processes, err := container.Processes(),加载容器中的process,如果process的状态为running,则调用s.monitorProcess(p)对其进行监控,并对其中不在运行的process进行处理。

    4、containerd/supervisor/supervisor.go

    // Start is a non-blocking call that runs the supervisor for monitoring container processes and executing new containers

    // This event loop is the only thing that is allowed to modify state of containers and processes, therefore it is save to do operations

    // in the handlers that modify state of the system or state of the Supervisor

    func (s *Supervisor) Start() error

    该函数所做的工作很简单,就是启动一个goroutine,再for i := range s.tasks,调用s.handlerTask(i)

    Task的数据结构如下所示:

    // Task executes an action returning an error chan with either nil or the error from excuting the task
    
    type Task interface {
    
      // ErrorCh returns a channel used to report and error from an async task
    
      ErrorCh() chan error
    
    }
    

      

    5、containerd/supervisor/supervisor.go

    func (s *Supervisor) handleTask(i Task)

    该函数根据i的类型,调用相应的处理函数进行处理。例如,i.(type)为*StartTask时,则调用s.start(t),若i.(type)为*DeleteTask时,则调用s.delete(t)。

    -----------------------------------------------------------------------  worker的工作 -------------------------------------------------------------------------

    worker的数据结构如下所示:

    type Work interface {
    
      Start()
    
    }
    
    type worker struct {
    
      wg  *sync.WaitGroup
      s   *Supervisor
    }
    

    4、containerd/supervisor/worker.go

    func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker

    这个函数只是简单地填充数据结构,return &worker{s: s, wg: wg}

    5、containerd/supervisor/worker.go

    // Start runs a loop in charge of starting new containers

    func (w *worker) Start()

    (1)、遍历w.s.startTasks,调用process, err := t.container.Start(t.checkPointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))

    (2)、调用w.s.monitor.MonitorOOM(t.Container)和w.s.monitorProcess(process)对container和process进行监控

    (3)、当我们从checkpoint restore一个容器的时候,不需要start process。因此,在t.CheckpointPath == ""的时候,调用process.Start()

    (4)、调用ContainerStartTimer.UpdateSince(started),started是当前的时间

    (5)、最后,调用t.Err <- nil, t.StartResponse <- StartResponse{Container: t.Container},和w.s.notifySubscribers(Event{Timestamp: time.Now, ID: t.container.ID(), Type: StateStart}),进行消息通知

    ---------------------------------------------------------------------------- monitor 分析 -----------------------------------------------------------------------

    Monitor的数据结构如下所示:

    // Monitor represents a runtime.Process monitor
    
    type Monitor struct {
    
      m       sync.Mutext
      receivers    map[int]interface{}
      exits      chan runtime.Process
      ooms     chan string
      epollFd    int
    }
    

      

    1、containerd/supervisor/monitor_linux.go

    // NewMonitor starts a new process monitor and returns it

    (1)、首先获取一个monitor实例,m := &Monitor{receivers: make(map[int]interface{}), exits: make(chan runtime.Process, 1024), oom: make(chan string, 1024)}

    (2)、调用fd, err := archutils.EpollCreate1(0),创建一个epoll fd,接着将fd赋值给m.epollFd

    (3)、生成一个goroutine,go m.start()

    2、containerd/supervisor/monitor_linux.go

    func (m *Monitor) start()

    (1)、该函数就是对各种syscall.EpollEvent进行处理,每次通过调用n, err := archutils.EpollWait(m.epollFd, events[:], -1),获取n个EpollEvent。

    (2)、再通过fd := int(events[i].Fd),r := m.receivers[fd]找到对应的runtimeProcess或者runtime.OOM。

    (3)、最后,t := r.(type),再分别对runtime.Process和runtime.OOM进行处理

    3、containerd/supervisor/monitor_linux.go

    // Monitor adds a process to the list of the one being monitored

    func (m *Monitor) Monitor(p runtime.Process) error

    (1)、调用fd := p.ExitFD() ---> ExitFD returns the fd of the exit pipe,再根据fd新建一个event := syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLHUP,}

    (2)、调用archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event)

    (3)、最后,调用EpollFdCounter.Inc(1),m.receivers[fd] = p

  • 相关阅读:
    IntelliJ IDEA常用统一设置2-Inspections检查设置(Linux/Mac/Windows)
    IntelliJ IDEA版本:Ultimate、Community、EAP版本的区别
    IntelliJ IDEA重构技巧收集
    Java泛型中的类型擦除机制简单理解
    阿里巴巴Java开发手册中的DO、DTO、BO、AO、VO、POJO定义
    Java中PO、BO、VO、DTO、POJO、DAO概念及其作用和项目实例图(转)
    Java使用logback记录日志时分级别保存文件
    Java中List,Set和Map详解及其区别和使用场景(转)
    Java中泛型的Class<Object>与Class<?>的区别(转)
    Java中泛型T和Class<T>以及Class<?>的理解(转)
  • 原文地址:https://www.cnblogs.com/YaoDD/p/5996525.html
Copyright © 2011-2022 走看看