zoukankan      html  css  js  c++  java
  • Kubernetes源码阅读笔记——Controller Manager(之一)

    Controller Manager是Kubernetes的核心组件之一。我们知道,Kubernetes对集群的管理采用的是控制器模式,即针对各种资源运行多个controller(控制器)。控制器的逻辑是运行永不结束的循环,通过apiserver组件时刻获取集群某种资源的状态,并确保资源的当前状态与期望的状态相符合。

    下面我们就来通过阅读源码,看一下Kubernetes中Controller Manager的具体实现。

    Kubernetes中与Controller Manager相关的包有2个,分别是cmd/cotroller-manager和cmd/kube-controller-manager(暂时不明白为什么要分成两部分)。

    启动函数是kube-controller-manager下的controller-manager.go。下面我们先从启动函数入手。

    一、启动函数controller-manager.go

    启动函数很短:

    func main() {
        rand.Seed(time.Now().UnixNano())
    
        command := app.NewControllerManagerCommand()
    
        // TODO: once we switch everything over to Cobra commands, we can go back to calling
        // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
        // normalize func and add the go flag set by hand.
        // utilflag.InitFlags()
        logs.InitLogs()
        defer logs.FlushLogs()
    
        if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v
    ", err)
        os.Exit(1)
        }
    }

    其中,第一行是生成随机数的代码,log相关的两行是处理日志,最核心的内容在于command。

    进入NewControllerManagerCommand方法,我们发现这一方法位于cmd/kube-controller-manager/app/controllermanager.go中。这个方法的返回值是一个cobra.Command类型的指针。

    这里稍微提一下cobra。cobra是一个Go语言的开源项目,用于在命令行中注册新命令。可参考https://github.com/spf13/cobra。cobra的基本结构就是注册一个cobra.Command类型的指针,然后调用Execute命令执行。可以看到main函数就遵循了这样的结构。

    二、NewControllerManagerCommand方法

    cobra.Command是一个结构体,我们看到NewControllerManagerCommand方法里定义了最核心的Use、Long、Run三个字段:

    cmd/kube-controller-manager/app/controllermanager.go
    
    func NewControllerManagerCommand(){
        cmd := &cobra.Command{
            Use: "kube-controller-manager"
            Long: `...`
            Run: func(cmd *cobra.Command, args []string) {
                verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())
    
            c, err := s.Config(KnownControllers(),ControllersDisabledByDefault.List())
            if err != nil {
                fmt.Fprintf(os.Stderr, "%v
    ", err)
                os.Exit(1)
            }
    
            if err := Run(c.Complete(), wait.NeverStop); err != nil {
                fmt.Fprintf(os.Stderr, "%v
    ", err)
                os.Exit(1)
            }
            },
        }
    }

    Use是命令本身,即在命令行中输入kube-controller-manager,即可运行。Long是对命令的详细说明,而Run则是命令的具体执行内容,也是核心。

    Run后面有一段,是为kube-controller-manager命令配置flag的。

    我们来仔细解读一下Run。

    前两行PrintAndExitIfRequested和PrintFlags是处理打印版本和可用flag的,不重要。重点在后面的Config方法和Run方法。

    在创建cmd之前,NewControllerManagerCommand方法其实还有一行代码:

    cmd/kube-controller-manager/app/controllermanager.go
    
    func NewControllerManagerCommand(){
        s, err := options.NewKubeControllerManagerOptions()
    }

    进入这个NewKubeControllerManagerOptions方法看,发现方法位于cmd/kube-controller-manager/app/options/options.go内,作用是创建了一个使用默认配置的KubeControllerManagerOptions结构体,包含了DeploymentController、ReplicationController等多个controller的配置。

    利用这里创建的KubeControllerManagerOptions结构体,在Command的Run字段中执行了Config和Run两个操作。

    Config方法是配置集群的kubeconfig等基础配置,第一个参数KnownControllers()值得关注。

    三、KnownControllers方法

    KnownControllers是同一个go文件下的方法,作用是将NewControllerInitializers方法中返回的Map的键生成一个list。

    进入同文件下的NewControllerInitializers方法,我们发现:

    cmd/kube-controller-manager/app/controllermanager.go
    
    func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
        controllers := map[string]InitFunc{}
        controllers["endpoint"] = startEndpointController
        controllers["replicationcontroller"] = startReplicationController
        controllers["podgc"] = startPodGCController
        controllers["resourcequota"] = startResourceQuotaController
        controllers["namespace"] = startNamespaceController
        controllers["serviceaccount"] = startServiceAccountController
        controllers["garbagecollector"] = startGarbageCollectorController
        controllers["daemonset"] = startDaemonSetController
        ...
        ...
        return controllers
    }

    这一方法,将controller-manager中的所有controller都注册了进来。每个controller都以名字为键,启动函数为值,存储在Map中。因此可以说,这个NewControllerInitializers方法维护了controller-manager的元数据,是controller-manager的重要方法之一。

    将这些controller加载上配置后,就是下面核心的Run方法了。

    四、Run方法

    Run方法也在cmd/kube-controller-manager/app/controllermanager.go中,接收2个参数。第一个参数调用Config的Complete方法,对config再进行一次包装,第二个参数是一个单向channel,用于使方法阻塞,从而保持运行状态。

    cmd/kube-controller-manager/app/controllermanager.go
    
    func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
        // To help debugging, immediately log version
        ...
    
        // Setup any healthz checks we will want to use.
        ...
    
        // Start the controller manager HTTP server
        // unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
        ...
    
       run := func(ctx context.Context) {
           rootClientBuilder := controller.SimpleControllerClientBuilder{
              ClientConfig: c.Kubeconfig,
           }
           var clientBuilder controller.ControllerClientBuilder
           if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
              if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
                 // It'c possible another controller process is creating the tokens for us.
                 // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
                 klog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
              }
              clientBuilder = controller.SAControllerClientBuilder{
                 ClientConfig:         restclient.AnonymousClientConfig(c.Kubeconfig),
                 CoreClient:           c.Client.CoreV1(),
                 AuthenticationClient: c.Client.AuthenticationV1(),
                 Namespace:            "kube-system",
              }
           } else {
              clientBuilder = rootClientBuilder
           }
           controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
           if err != nil {
              klog.Fatalf("error building controller context: %v", err)
           }
           saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
    
           if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
              klog.Fatalf("error starting controllers: %v", err)
           }
    
           controllerContext.InformerFactory.Start(controllerContext.Stop)
           close(controllerContext.InformersStarted)
    
           select {}
        }
    
        if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
           run(context.TODO())
           panic("unreachable")
        }
    }

    前面几段分别是处理日志、健康检查、以及启动Controller Manager的HTTP server,不提。重点在后面的run。

    run中,首先调用CreateControllerContext方法,为controller的运行准备环境。方法创建了多个client,确保Controller Manager能连接上API Server,并返回一个ControllerContext结构体,为下面的controller使用。

    其次,调用StartControllers方法,开始正式运行controller。

    StartControllers方法位于同一文件中,执行逻辑很直观,就是将之前保存在NewControllerInitializers中的controller全部运行起来(除了特殊的ServiceAccountTokenController,它在前面的环境准备中先运行起来),方法是分别调用这些controller的启动函数。关于它们的启动函数,将在下一篇文章中分析。

    在所有的controller中,ServiceAccount Token Controller 最先启动,因为后面所有的controller运行都需要使用SATokenController创建的token。

    这样,controller-manager模块就算是正式启动了。

    run的后面还有一段,是处理高可用controller-manager的节点选举相关的,暂时不提。

    五、Informer

    Informer是Kubernetes中一个重要的概念。它本质上是一个client-go中的一个工具包,会将资源变化的信息通知缓存和listener。可参考https://blog.csdn.net/weixin_42663840/article/details/81980022

    每个controller都会启动自己的informer。Run方法最后controllerContext.InformerFactory.Start(controllerContext.Stop)这一行代码就是将controller的informerfactory启动起来。

    这个informerfactory,顾名思义是创建informer的工厂,在CreateControllerContext中初始化。事实上,这个factory目前还没有生成任何informer,因为在CreateControllerContext方法中,在创建这个工厂实例时并没有为这个实例配置任何额外的参数。直到后面分别启动每个Controller时,才会通过这个工厂生成具体的informer。

    进入Start方法:

    k8s.io/client-go/informers/factory.go
    
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			go informer.Run(stopCh)
    			f.startedInformers[informerType] = true
    		}
    	}
    }

    方法本质上调用了informer的Run方法。

    k8s.io/client-go/tools/cache/shard_informer.go
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
    
        fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
        cfg := &Config{
            Queue:            fifo,
            ListerWatcher:    s.listerWatcher,
            ObjectType:       s.objectType,
            FullResyncPeriod: s.resyncCheckPeriod,
            RetryOnError:     false,
            ShouldResync:     s.processor.shouldResync,
    
            Process: s.HandleDeltas,
        }
    
        func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
    
            s.controller = New(cfg)
            s.controller.(*controller).clock = s.clock
            s.started = true
        }()
    
        // Separate stop channel because Processor should be stopped strictly after controller
        ...
    
        defer func() {
            s.startedLock.Lock()
            defer s.startedLock.Unlock()
            s.stopped = true // Don't want any new listeners
        }()
        s.controller.Run(stopCh)
    }

    此方法创建一个FIFO队列,之后根据informer的配置实例化一个client-go包中的controller对象(此controller非controller manager的controller),然后调用controller的Run方法(配置中有一条Process: s.HandleDeltas很关键,后面会用到)。

    k8s.io/client-go/tools/cache/controller.go

    func (c *controller) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    	go func() {
    		<-stopCh
    		c.config.Queue.Close()
    	}()
    	r := NewReflector(
    		c.config.ListerWatcher,
    		c.config.ObjectType,
    		c.config.Queue,
    		c.config.FullResyncPeriod,
    	)
    	r.ShouldResync = c.config.ShouldResync
    	r.clock = c.clock
    
    	c.reflectorMutex.Lock()
    	c.reflector = r
    	c.reflectorMutex.Unlock()
    
    	var wg wait.Group
    	defer wg.Wait()
    
    	wg.StartWithChannel(stopCh, r.Run)
    
    	wait.Until(c.processLoop, time.Second, stopCh)
    }

    可见,controller的Run方法实例化一个Reflector对象,并将自己的reflector字段设置为这个对象,并调用对象的Run方法。Reflector也是k8s中的一个概念,作用在于通过List-Watch机制,与API Server连接,及时获取监听的k8s资源的变化。这一步通过调用reflector的Run方法来实现。Informer正是通过这一机制,在自身被动传达API Server发送的通知的同时,也会主动向API Server获取资源变化。Reflector下一篇继续研究。

    另一方面,controller调用processLoop方法,不断地从这个FIFO队列中取出元素,并调用Process方法进行处理。

    看一下processLoop方法:

    k8s.io/client-go/tools/cache/controller.go

    func (c *controller) processLoop() {
        for {
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
            if err != nil {
                if err == FIFOClosedError {
                    return
                }
                if c.config.RetryOnError {
                    // This is the safe way to re-enqueue.
                    c.config.Queue.AddIfNotPresent(obj)
                }
            }
        }
    }

    可以看到,processLoop方法本质上就是从FIFO队列中pop元素,并调用Process进行处理。而这个Process已经在前述sharedIndexInformer的Run方法中,通过Process: s.HandleDeltas这一行进行了定义。

    看一下HandleDeltas方法:

    k8s.io/client-go/tools/cache/shared_informers.go

    func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }

    可见,在这个方法中,informer通过indexer,与FIFO队列交互,执行添加、更新或删除等操作,并调用distribute方法,向listener传递信息(listener下一篇会提到。)

    关于Informer,下一篇文章会继续分析。

    下一篇文章请见https://www.cnblogs.com/00986014w/p/10273738.html

    五、总结

    总而言之,Controller Manager的大致逻辑就是,通过cobra创建一个kube-controller-manager命令,并运行它。这个命令的内容是启动Controller Manager。这个Controller Manager管理Kubernetes中所有的controller,在manager启动时,会调用这些controller的启动函数,启动这些controller,并启动controller对应的informer。

  • 相关阅读:
    maven项目部署到tomcat中没有classe文件的问题汇总
    Tomcat远程调试模式及利用Eclipse远程链接调试
    FastDFS 常见问题
    Linux Crontab 定时任务 命令详解
    EChart 关于图标控件的简单实用
    java 通过zxing生成二维码
    Mybatis typeAliases别名
    Mybatis 实现手机管理系统的持久化数据访问层
    Mybatis 实现传入参数是表名
    Mybatis关于like的字符串模糊处理
  • 原文地址:https://www.cnblogs.com/00986014w/p/10251844.html
Copyright © 2011-2022 走看看