zoukankan      html  css  js  c++  java
  • kubernetes源码阅读笔记——Kubelet(之一)

    Kubelet是Kubernetes集群中node节点的核心组件之一,其作用是管理运行在Pod中的容器,使其处于正常运行状态。

    Kubelet的启动函数代码位于cmd/kubelet/kubelet.go中,仍是通过cobra注册。

    cmd/kubelet/kubelet.go

    func main() { rand.Seed(time.Now().UnixNano()) command := app.NewKubeletCommand(server.SetupSignalHandler()) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v ", err) os.Exit(1) } }

    一、NewKubeletCommand

    在NewKubeletCommand命令中注册了kubelet命令。进入NewKubeletCommand方法:

    cmd/kubelet/app/server.go
    
    func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
        cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
        cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
        kubeletFlags := options.NewKubeletFlags()
        kubeletConfig, err := options.NewKubeletConfiguration()
        // programmer error
        if err != nil {
            klog.Fatal(err)
        }
    
        cmd := &cobra.Command{
            Use: componentKubelet,
            Long: `...`,
            // The Kubelet has special flag parsing requirements to enforce flag precedence rules,
            // so we do all our parsing manually in Run, below.
            // DisableFlagParsing=true provides the full set of flags passed to the kubelet in the
            // `args` arg to Run, without Cobra's interference.
            DisableFlagParsing: true,
            Run: func(cmd *cobra.Command, args []string) {
                // initial flag parse, since we disable cobra's flag parsing
                if err := cleanFlagSet.Parse(args); err != nil {
                    cmd.Usage()
                    klog.Fatal(err)
                }
    
                // check if there are non-flag arguments in the command line
                cmds := cleanFlagSet.Args()
                if len(cmds) > 0 {
                    cmd.Usage()
                    klog.Fatalf("unknown command: %s", cmds[0])
                }
    
                // short-circuit on help
                help, err := cleanFlagSet.GetBool("help")
                if err != nil {
                    klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
                }
                if help {
                    cmd.Help()
                    return
                }
    
                ...
    
                // load kubelet config file, if provided
                if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
                    kubeletConfig, err = loadConfigFile(configFile)
                    if err != nil {
                        klog.Fatal(err)
                    }
                    // We must enforce flag precedence by re-parsing the command line into the new object.
                    // This is necessary to preserve backwards-compatibility across binary upgrades.
                    // See issue #56171 for more details.
                    if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
                        klog.Fatal(err)
                    }
                    // update feature gates based on new config
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
    
                // We always validate the local configuration (command line + config file).
                // This is the default "last-known-good" config for dynamic config, and must always remain valid.
                if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
                    klog.Fatal(err)
                }
    
                // use dynamic kubelet config, if enabled
                var kubeletConfigController *dynamickubeletconfig.Controller
                if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                    var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                    dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                        func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                            // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
                            // so that we get a complete validation at the same point where we can decide to reject dynamic config.
                            // This fixes the flag-precedence component of issue #63305.
                            // See issue #56171 for general details on flag precedence.
                            return kubeletConfigFlagPrecedence(kc, args)
                        })
                    if err != nil {
                        klog.Fatal(err)
                    }
                    // If we should just use our existing, local config, the controller will return a nil config
                    if dynamicKubeletConfig != nil {
                        kubeletConfig = dynamicKubeletConfig
                        // Note: flag precedence was already enforced in the controller, prior to validation,
                        // by our above transform function. Now we simply update feature gates from the new config.
                        if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                            klog.Fatal(err)
                        }
                    }
                }
    
                // construct a KubeletServer from kubeletFlags and kubeletConfig
                kubeletServer := &options.KubeletServer{
                    KubeletFlags:         *kubeletFlags,
                    KubeletConfiguration: *kubeletConfig,
                }
    
                // use kubeletServer to construct the default KubeletDeps
                kubeletDeps, err := UnsecuredDependencies(kubeletServer)
                if err != nil {
                    klog.Fatal(err)
                }
    
                // add the kubelet config controller to kubeletDeps
                kubeletDeps.KubeletConfigController = kubeletConfigController
    
                ...
    
                // run the kubelet
                klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
                if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
                    klog.Fatal(err)
                }
            },
        }
    
        // keep cleanFlagSet separate, so Cobra doesn't pollute it with the global flags
        kubeletFlags.AddFlags(cleanFlagSet)
        options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
        options.AddGlobalFlags(cleanFlagSet)
        cleanFlagSet.BoolP("help", "h", false, fmt.Sprintf("help for %s", cmd.Name()))
    
        // ugly, but necessary, because Cobra's default UsageFunc and HelpFunc pollute the flagset with global flags
        const usageFmt = "Usage:
      %s
    
    Flags:
    %s"
        cmd.SetUsageFunc(func(cmd *cobra.Command) error {
            fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
            return nil
        })
        cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
            fmt.Fprintf(cmd.OutOrStdout(), "%s
    
    "+usageFmt, cmd.Long, cmd.UseLine(), cleanFlagSet.FlagUsagesWrapped(2))
        })
    
        return cmd
    }

    核心仍在于后面的Run字段。可以看到,kubelet在cobra默认的flag和help function处理之外,又在Run中添加了自己的flag和help function处理。

    在这些内容之后,有一段加载kubelet config的代码,因为kubelet在启动时可以使用本地的config,也可以动态加载自定义的config。

    之后,通过kubeletServer := &options.KubeletServer{ }这行代码,利用之前设置的config和flag创建一个kubeletServer的依赖配置。

    最后,调用Run方法,将依赖配置和依赖组件传入,运行这个kubelet。

    Run方法的核心则是调用同一个文件下的run方法。

    二、run

    进入run方法:

    cmd/kubelet/app/server.go
    
    func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
        ...
    
        // About to get clients and such, detect standaloneMode
        standaloneMode := true
        if len(s.KubeConfig) > 0 {
            standaloneMode = false
        }
    
        if kubeDeps == nil {
            kubeDeps, err = UnsecuredDependencies(s)
            if err != nil {
                return err
            }
        }
    
        if kubeDeps.Cloud == nil {
            if !cloudprovider.IsExternal(s.CloudProvider) {
                cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
                if err != nil {
                    return err
                }
                if cloud == nil {
                    klog.V(2).Infof("No cloud provider specified: %q from the config file: %q
    ", s.CloudProvider, s.CloudConfigFile)
                } else {
                    klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q
    ", s.CloudProvider, s.CloudConfigFile)
                }
                kubeDeps.Cloud = cloud
            }
        }
    
        ...
    
        // if in standalone mode, indicate as much by setting all clients to nil
        switch {
        case standaloneMode:
            kubeDeps.KubeClient = nil
            kubeDeps.DynamicKubeClient = nil
            kubeDeps.EventClient = nil
            kubeDeps.HeartbeatClient = nil
            klog.Warningf("standalone mode, no API client")
    
        case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil, kubeDeps.DynamicKubeClient == nil:
            clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
            if err != nil {
                return err
            }
            kubeDeps.OnHeartbeatFailure = closeAllConns
    
            kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
            if err != nil {
                return fmt.Errorf("failed to initialize kubelet client: %v", err)
            }
    
            kubeDeps.DynamicKubeClient, err = dynamic.NewForConfig(clientConfig)
            if err != nil {
                return fmt.Errorf("failed to initialize kubelet dynamic client: %v", err)
            }
    
            // make a separate client for events
            eventClientConfig := *clientConfig
            eventClientConfig.QPS = float32(s.EventRecordQPS)
            eventClientConfig.Burst = int(s.EventBurst)
            kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
            if err != nil {
                return fmt.Errorf("failed to initialize kubelet event client: %v", err)
            }
    
            // make a separate client for heartbeat with throttling disabled and a timeout attached
            heartbeatClientConfig := *clientConfig
            heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
            // if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
            if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
                leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
                if heartbeatClientConfig.Timeout > leaseTimeout {
                    heartbeatClientConfig.Timeout = leaseTimeout
                }
            }
            heartbeatClientConfig.QPS = float32(-1)
            kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
            if err != nil {
                return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
            }
    
            // CRDs are JSON only, and client renegotiation for streaming is not correct as per #67803
            csiClientConfig := restclient.CopyConfig(clientConfig)
            csiClientConfig.ContentType = "application/json"
            kubeDeps.CSIClient, err = csiclientset.NewForConfig(csiClientConfig)
            if err != nil {
                return fmt.Errorf("failed to initialize kubelet storage client: %v", err)
            }
        }
         if kubeDeps.Auth == nil {
               auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
               if err != nil {
                     return err
               }
               kubeDeps.Auth = auth
         }
    
         if kubeDeps.CAdvisorInterface == nil {
               imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
               kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
               if err != nil {
                     return err
               }
         }
    
    
        if kubeDeps.ContainerManager == nil {
            
               ...
    
            kubeDeps.ContainerManager, err = cm.NewContainerManager(......)
    
            if err != nil {
                return err
            }
        }
    
        if err := checkPermissions(); err != nil {
            klog.Error(err)
        }
    
        ...
    
        if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
            return err
        }
    
        if s.HealthzPort > 0 {
            healthz.DefaultHealthz()
            go wait.Until(func() {
                err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
                if err != nil {
                    klog.Errorf("Starting health server failed: %v", err)
                }
            }, 5*time.Second, wait.NeverStop)
        }
    
        if s.RunOnce {
            return nil
        }
    
        // If systemd is used, notify it that we have started
        go daemon.SdNotify(false, "READY=1")
    
        select {
        case <-done:
            break
        case <-stopCh:
            break
        }
    
        return nil
    }

    方法很长,这里捡重点说。开始一段是加载配置,略去。然后做了以下几件事:

    (1)如果kubelet的依赖组件还没配置全,则调用UnsecuredDepencencies方法为kubelet配置默认依赖组件。方法会返回一个Dependencies的结构体:

    cmd/kubelet/app/server.go
    
    // UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
    // is not valid.  It will not start any background processes, and does not include authentication/authorization
    func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
        // Initialize the TLS Options
        tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
        if err != nil {
            return nil, err
        }
    
        mounter := mount.New(s.ExperimentalMounterPath)
        var pluginRunner = exec.New()
        if s.Containerized {
            klog.V(2).Info("Running kubelet in containerized mode")
            ne, err := nsenter.NewNsenter(nsenter.DefaultHostRootFsPath, exec.New())
            if err != nil {
                return nil, err
            }
            mounter = mount.NewNsenterMounter(s.RootDirectory, ne)
            // an exec interface which can use nsenter for flex plugin calls
            pluginRunner = nsenter.NewNsenterExecutor(nsenter.DefaultHostRootFsPath, exec.New())
        }
    
        var dockerClientConfig *dockershim.ClientConfig
        if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
            dockerClientConfig = &dockershim.ClientConfig{
                DockerEndpoint:            s.DockerEndpoint,
                RuntimeRequestTimeout:     s.RuntimeRequestTimeout.Duration,
                ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
            }
        }
    
        return &kubelet.Dependencies{
            Auth:                nil, // default does not enforce auth[nz]
            CAdvisorInterface:   nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
            Cloud:               nil, // cloud provider might start background processes
            ContainerManager:    nil,
            DockerClientConfig:  dockerClientConfig,
            KubeClient:          nil,
            HeartbeatClient:     nil,
            CSIClient:           nil,
            EventClient:         nil,
            Mounter:             mounter,
            OOMAdjuster:         oom.NewOOMAdjuster(),
            OSInterface:         kubecontainer.RealOS{},
            VolumePlugins:       ProbeVolumePlugins(),
            DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
            TLSOptions:          tlsOptions}, nil
    }

    可以看到,Dependencies中包含了kubelet的所有依赖组件,包括与API Server交互的KubeClient,管理容器的ContainerManager,与Docker交互的DockerClient的配置等等;

    (2)判断kubelet是否运行在standalone模式,如果是,则将kubelet运行时依赖的4个client都设为空;如果不是且这4个client皆为空,则为这4个client创建配置;

    (3)为kubelet的Dependencies创建Auth、CAdvisorInterface和ContainerManager。

    (4)运行kubelet。这一步通过调用RunKubelet方法实现,后面详细分析。

    (5)通过select设置为持续运行。

    三、RunKubelet

    下面仔细看一看RunKubelet方法:

    cmd/kubelet/app/server.go
    
    // RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
    //   1 Integration tests
    //   2 Kubelet binary
    //   3 Standalone 'kubernetes' binary
    // Eventually, #2 will be replaced with instances of #3
    func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
        ...
    
        hostNetworkSources, err := kubetypes.GetValidatedSources(kubeServer.HostNetworkSources)
        if err != nil {
            return err
        }
    
        hostPIDSources, err := kubetypes.GetValidatedSources(kubeServer.HostPIDSources)
        if err != nil {
            return err
        }
    
        hostIPCSources, err := kubetypes.GetValidatedSources(kubeServer.HostIPCSources)
        if err != nil {
            return err
        }
    
        privilegedSources := capabilities.PrivilegedSources{
            HostNetworkSources: hostNetworkSources,
            HostPIDSources:     hostPIDSources,
            HostIPCSources:     hostIPCSources,
        }
        capabilities.Setup(kubeServer.AllowPrivileged, privilegedSources, 0)
    
        credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
        klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
    
        if kubeDeps.OSInterface == nil {
            kubeDeps.OSInterface = kubecontainer.RealOS{}
        }
    
        k, err := CreateAndInitKubelet(......)
        if err != nil {
            return fmt.Errorf("failed to create kubelet: %v", err)
        }
         if runOnce {
               if _, err := k.RunOnce(podCfg.Updates()); err != nil {
                     return fmt.Errorf("runonce failed: %v", err)
               }
               klog.Info("Started kubelet as runonce")
         } else {
               startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
               klog.Info("Started kubelet")
         }
         ...
    
        return nil
    }

    方法的核心就在于中间的创建并初始化这一段以及后面的运行这一段。runOnce是只运行一次kubelet,将特定的一组pod创建后即退出的运行模式,已基本废弃,目前主要是调用startKubelet方法持续运行kubelet。startKubelet将在后面的文章中涉及。

    进入CreateAndInitKubelet,发现它调用了NewMainKubelet、BirthCry、StartGarbageCollection三个方法。BirthCry做的事情就是记录一个启动的event,不提。StartGarbageCollection则是运行容器和镜像的垃圾回收机制,后面会提到。下一篇将重点对NewMainKubelet方法进行分析。https://www.cnblogs.com/00986014w/p/10895532.html

  • 相关阅读:
    Hibernate缓存策略
    Hibernate初探之一对多映射 及 myeclipse自动生成hibernate文件方法
    Hibernate初探之单表映射
    01箱包问题
    oracle备份恢复
    旅游参考
    Linux中find常见用法示例
    oracle 比较两个用户表结构的区别。
    SSH自动断开连接的原因
    在Oracle中查看客户端连接的IP信息 .
  • 原文地址:https://www.cnblogs.com/00986014w/p/10458231.html
Copyright © 2011-2022 走看看