zoukankan      html  css  js  c++  java
  • Kubernetes源码阅读笔记——Controller Manager(之二)

    上一篇文章中,我们看到了Controller Manager的基本运行逻辑,但是还有一些问题没有解决,我们将在本篇文章中进行分析。

    一、ListAndWatch

    首先是Informer。上一篇中写道,启动Informer本质上是调用了controller的reflector的Run方法。下面我们进入reflector的Run方法看看:

    k8s.io/cient-go/tools/cache/reflector.go
    
    func (r *Reflector) Run(stopCh <-chan struct{}) {
        glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
        wait.Until(func() {
            if err := r.ListAndWatch(stopCh); err != nil {
                utilruntime.HandleError(err)
            }
        }, r.period, stopCh)
    }

    可以看到,方法通过调用wait.Until,每过period时间段就执行一次ListAndWatch方法。

    进入ListAndWatch方法:

    k8s.io/client-go/tools/cache/reflector.go
    
    // ListAndWatch first lists all items and get the resource version at the moment of call,
    // and then use the resource version to watch.
    // It returns error if ListAndWatch didn't even try to initialize watch.
    func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
        glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
        var resourceVersion string
    
        // Explicitly set "0" as resource version - it's fine for the List()
        // to be served from cache and potentially be delayed relative to
        // etcd contents. Reflector framework will catch up via Watch() eventually.
        options := metav1.ListOptions{ResourceVersion: "0"}
        r.metrics.numberOfLists.Inc()
        start := r.clock.Now()
        list, err := r.listerWatcher.List(options)
        if err != nil {
            return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
        }
        r.metrics.listDuration.Observe(time.Since(start).Seconds())
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        r.metrics.numberOfItemsInList.Observe(float64(len(items)))
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        r.setLastSyncResourceVersion(resourceVersion)
    
        resyncerrc := make(chan error, 1)
        cancelCh := make(chan struct{})
        defer close(cancelCh)
        go func() {
            resyncCh, cleanup := r.resyncChan()
            defer func() {
                cleanup() // Call the last one written into cleanup
            }()
            for {
                select {
                case <-resyncCh:
                case <-stopCh:
                    return
                case <-cancelCh:
                    return
                }
                if r.ShouldResync == nil || r.ShouldResync() {
                    glog.V(4).Infof("%s: forcing resync", r.name)
                    if err := r.store.Resync(); err != nil {
                        resyncerrc <- err
                        return
                    }
                }
                cleanup()
                resyncCh, cleanup = r.resyncChan()
            }
        }()
    
        for {
            // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
            select {
            case <-stopCh:
                return nil
            default:
            }
    
            timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
            options = metav1.ListOptions{
                ResourceVersion: resourceVersion,
                // We want to avoid situations of hanging watchers. Stop any wachers that do not
                // receive any events within the timeout window.
                TimeoutSeconds: &timeoutSeconds,
            }
    
            r.metrics.numberOfWatches.Inc()
            w, err := r.listerWatcher.Watch(options)
            if err != nil {
                switch err {
                case io.EOF:
                    // watch closed normally
                case io.ErrUnexpectedEOF:
                    glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
                default:
                    utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
                }
                // If this is "connection refused" error, it means that most likely apiserver is not responsive.
                // It doesn't make sense to re-list all objects because most likely we will be able to restart
                // watch where we ended.
                // If that's the case wait and resend watch request.
                if urlError, ok := err.(*url.Error); ok {
                    if opError, ok := urlError.Err.(*net.OpError); ok {
                        if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                            time.Sleep(time.Second)
                            continue
                        }
                    }
                }
                return nil
            }
    
            if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
                if err != errorStopRequested {
                    glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
                }
                return nil
            }
        }
    }

    这个方法的注释已经写明。首先,用默认的ResourceVersion “0” 调用List方法,列出资源,并调用ListAccessor方法进行验证。

    其次,调用GetResourceVersion方法取得资源的实际的ResourceVersion,并调用syncWith方法进行同步。

    再次,用go func()后面的一大段代码处理reflector缓存的同步。

    最后,在一个for循环里调用Watch方法,对资源的变化进行watch,并调用watchHandler方法对变化进行相应处理。

    Watch方法本质上是在调用ListWatch的WatchFunc。这一字段是在创建具体的Controller对象时向informer添加的,后面会详细介绍。总之,Watch的本质与用户通过client-go工具连接API Server是一样的。

    进入watchHandler方法:

    k8s.io/client-go/tools/cache/reflector.go
    
    // watchHandler watches w and keeps *resourceVersion up to date.
    func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
        start := r.clock.Now()
        eventCount := 0
    
        defer w.Stop()
        // update metrics
        ...
    
    loop:
        for {
            select {
            case <-stopCh:
                return errorStopRequested
            case err := <-errc:
                return err
            case event, ok := <-w.ResultChan():
                if !ok {
                    break loop
                }
                if event.Type == watch.Error {
                    return apierrs.FromObject(event.Object)
                }
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
                meta, err := meta.Accessor(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                    continue
                }
                newResourceVersion := meta.GetResourceVersion()
                switch event.Type {
                case watch.Added:
                    err := r.store.Add(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Modified:
                    err := r.store.Update(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                    }
                case watch.Deleted:
                    // TODO: Will any consumers need access to the "last known
                    // state", which is passed in event.Object? If so, may need
                    // to change this.
                    err := r.store.Delete(event.Object)
                    if err != nil {
                        utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                    }
                default:
                    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                }
                *resourceVersion = newResourceVersion
                r.setLastSyncResourceVersion(newResourceVersion)
                eventCount++
            }
        }
    
        watchDuration := r.clock.Now().Sub(start)
        ...
        return nil
    }

    这个方法中,最重要的是中间的case选择。对于watch到的结果,按照Added、Modified、Deleted分别调用store的Add、Update、Delete方法,在FIFO队列中进行更新。这个FIFO队列就是之前在执行informer的Run方法时所创建的。

    Informer的大致逻辑就是这样,通过list-watch机制,与API Server建立连接,监听资源的变化,在缓存中进行更新。

    二、StartControllers

    上一篇文章中提到,Controller Manager通过在StartControllers方法中调用启动函数来启动所有的Controller。我们先来看看StartControllers方法:

    cmd/kube-controller-manager/app/controllermanager.go
    
    func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
        ...
    
        for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }
    
        time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
    
        klog.V(1).Infof("Starting %q", controllerName)
        debugHandler, started, err := initFn(ctx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        ...
    
        return nil
    }

    我们看到,方法通过对controllers中每个元素执行各自的initFn来执行启动函数,因为每个Controller都是以[string]function的格式保存在map中的,所以直接执行自身的function即可。

    下面我们进入一个Controller的启动函数看看。以deployment为例。从NewControllerInitializers中找到controllers["deployment"]=startDeploymentController,进入startDeploymentController,可以看到方法位于app/apps.go中:

    cmd/kube-controller-manager/app/apps.go
    
    func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
        if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
        }
        dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
        )
        if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
        }
        go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
        return nil, true, nil
    }

    我们看到,方法调用了NewDeploymentController方法创建一个Deployment,并通过GO协程调用Run方法运行它。

    我们先来看一下NewDeploymentController方法。

    三、NewDeploymentController

    pkg/controller/deployment/deployment_controller.go
    
    func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
        eventBroadcaster := record.NewBroadcaster()
        eventBroadcaster.StartLogging(klog.Infof)
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
    
        if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
        }
        dc := &DeploymentController{
        client:        client,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
        }
        dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
        }
    
        dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
        })
        rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
        })
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
        })
    
        dc.syncHandler = dc.syncDeployment
        dc.enqueueDeployment = dc.enqueue
    
        dc.dLister = dInformer.Lister()
        dc.rsLister = rsInformer.Lister()
        dc.podLister = podInformer.Lister()
        dc.dListerSynced = dInformer.Informer().HasSynced
        dc.rsListerSynced = rsInformer.Informer().HasSynced
        dc.podListerSynced = podInformer.Informer().HasSynced
        return dc, nil
    }

    我们看到,这个方法大致做了三件事。

    (1)创建了一个Broadcaster,用于做kubernetes中event资源相关的处理。

    (2)创建DeploymentController

    我们知道,Deployment并不直接操作集群中的pod,而是通过操作ReplicaSet来间接操作pod,因此我们可以看到,DeploymentController结构体中也包含rsControl这一字段,用于操作ReplicaSet。而queue字段则维护了一个deployment的队列,用于将需要更新状态的deployment元素存入,后面会讲到。

    (3)添加回调函数

    Informer的AddEventHandler方法为deployment controller的所有informer添加了回调函数,对deployment和ReplicaSet的添加、更新、删除进行相应的处理。

    下面我们详细分析一下添加回调函数操作,这里分成三部分说,一是Informer的具体创建,二是函数被调用的逻辑,三是回调函数本身。

    四、Informer()

    在Deployment Controller中包含了deployment、replicaset、pod三个informer,都是各自的Informer()方法创建各自的informer。

    以方法中dInformer.Informer()方法为例。方法本质上调用了InformerFor方法:

    k8s.io/client-go/informers/apps/v1/deployment.go
    
    func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
    	return f.factory.InformerFor(&apps_v1.Deployment{}, f.defaultInformer)
    }

    InformerFor方法比较直观,就是调用defaultInformer方法,将生成的informer存入factory的informer字段中。

    defaultInformer方法则是调用了NewFilteredDeploymentInformer方法:

    k8s.io/client-go/informers/apps/v1/deployment.go

    func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).List(options) }, WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).Watch(options) }, }, &apps_v1.Deployment{}, resyncPeriod, indexers, ) }

    这个方法直接就返回了一个NewSharedIndexInformer结构体。可以看到,这个结构体内部,定义了这个Informer的List和Watch函数,与一般用户通过client-go连接API Server并无二致。

    五、AddEventHandler

    首先看调用的逻辑。AddEventHandler方法除了为Informer添加新的回调函数外,还同时向这些新回调函数如何分发消息,使这些回调函数被调用。AddEventHandler方法只有一行,即调用AddEventHandlerWithResyncPeriod方法。

    进入AddEventHandlerWithResyncPeriod方法:

    k8s.io/client-go/tools/cache/shared_informer.go
    
    func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
    
        if s.stopped {
            glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
            return
        }
    
        if resyncPeriod > 0 {
              ...
        }
    
        listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    
        if !s.started {
            s.processor.addListener(listener)
            return
        }
    
        // in order to safely join, we have to
        // 1. stop sending add/update/delete notifications
        // 2. do a list against the store
        // 3. send synthetic "Add" events to the new handler
        // 4. unblock
        s.blockDeltas.Lock()
        defer s.blockDeltas.Unlock()
    
        s.processor.addListener(listener)
        for _, item := range s.indexer.List() {
            listener.add(addNotification{newObj: item})
        }
    }

    我们看到,这里引入了一个listener的概念。listener是informer用于获取Thread Safe Store元素并处理的机制。通过调用addListener方法,为informer添加并运行listener。

    随后,又遍历Thread Safe Store中的元素,将所有的消息都发送给新添加的EventHandler。这里的目的是让新EventHandler先处理一遍Thread Safe Store中已有的元素,不然它们就只能接收到新消息,没有机会处理旧消息了。

    六、addListener

    listener的添加和运行都通过addListener方法进行。进入addListener方法:

    k8s.io/client-go/tools/cache/shared_informer.go
    
    func (p *sharedProcessor) addListener(listener *processorListener) {
    	p.listenersLock.Lock()
    	defer p.listenersLock.Unlock()
    
    	p.addListenerLocked(listener)
    	if p.listenersStarted {
    		p.wg.Start(listener.run)
    		p.wg.Start(listener.pop)
    	}
    }

    addListener方法通过goroutine的方式,调用listener的run和pop方法,运行listener。

    分别进入run和pop方法:

    k8s.io/client-go/tools/cache/shared_informer.go
    
    func (p *processorListener) run() {
        stopCh := make(chan struct{})
        wait.Until(func() {
            // this gives us a few quick retries before a long pause and then a few more quick retries
            err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
                for next := range p.nextCh {
                    switch notification := next.(type) {
                    case updateNotification:
                        p.handler.OnUpdate(notification.oldObj, notification.newObj)
                    case addNotification:
                        p.handler.OnAdd(notification.newObj)
                    case deleteNotification:
                        p.handler.OnDelete(notification.oldObj)
                    default:
                        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                    }
                }
                // the only way to get here is if the p.nextCh is empty and closed
                return true, nil
            })
    
            // the only way to get here is if the p.nextCh is empty and closed
            if err == nil {
                close(stopCh)
            }
        }, 1*time.Minute, stopCh)
    }
    k8s.io/client-go/tools/cache/shared_informer.go
    
    func (p *processorListener) pop() {
        defer utilruntime.HandleCrash()
        defer close(p.nextCh) // Tell .run() to stop
    
        var nextCh chan<- interface{}
        var notification interface{}
        for {
            select {
            case nextCh <- notification:
                // Notification dispatched
                var ok bool
                notification, ok = p.pendingNotifications.ReadOne()
                if !ok { // Nothing to pop
                    nextCh = nil // Disable this select case
                }
            case notificationToAdd, ok := <-p.addCh:
                if !ok {
                    return
                }
                if notification == nil { // No notification to pop (and pendingNotifications is empty)
                    // Optimize the case - skip adding to pendingNotifications
                    notification = notificationToAdd
                    nextCh = p.nextCh
                } else { // There is already a notification waiting to be dispatched
                    p.pendingNotifications.WriteOne(notificationToAdd)
                }
            }
        }
    }

    pop方法用到好几个channel,本质上仍是从channel中取出一个notification,并通过run方法进行处理。可以看到,在run方法中有对于notification的类型选择,并分别调用OnUpdate、OnAdd、OnDelete方法进行处理,而这三个方法则分别调用了前面AddEventHandler方法中传进来的回调函数,即:

    pkg/controller/deployment/deployment_controller.go

    func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { ... dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) ... }

    那么这些notification从何而来呢?答案是,通过listener的add方法添加,即前述AddEventHandlerWithResyncPeriod方法的最后一句:

    k8s.io/client-go/tools/cache/shared_informer.go
    
    func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
            ...
    	for _, item := range s.indexer.List() {
    		listener.add(addNotification{newObj: item})
    	}
    }

    add方法只有一行,就是将信息传入listener的channel中。而信息则是通过indexer的List方法,从Thread Safe Store中获得。此外,上一篇文章中也提到过,informer会不断从FIFO中pop元素,进行处理后向listener发送信息。所以对于EventHandler来说,一方面在刚注册时,从Thread Safe Store中一次性获得所有旧的消息,一方面从FIFO队列中一个一个地获得消息。

    至此,informer的逻辑就很清晰了。首先通过ListAndWatch方法,将watch到的资源状态的变化存入FIFO队列。之后,在启动controller后,通过factory生成相应的informer,并通过AddEventHandler方法添加相应的回调函数。AddEventHandler方法会为informer添加listener,将Thread Safe Store队列中的内容取出,根据其状态调用相应的回调函数进行处理。同时FIFO队列中的元素也会一个一个pop出来,供informer消费。

    剩下的内容,将在下一篇文章中分析。下一篇文章链接https://www.cnblogs.com/00986014w/p/10570488.html

  • 相关阅读:
    使用 HtmlInputHidden 控件在本页面保持状态和跨页面传值
    asp.net页面回传与js调用服务端事件、PostBack的原理详解
    关于.net委托的一篇妙文
    C# 基础25问
    存储过程分页
    C#中的格式化字符串
    大批量数据的插入之终极性能提升SqlBulkCopy
    统计某个字符串中指定字符串出现的次数
    powerdesigner 15打开pdm文件弹出安装打印机窗口的解决方法
    Convert.ToInt32(),Int.Parse(),Int.TryParse()的区别
  • 原文地址:https://www.cnblogs.com/00986014w/p/10273738.html
Copyright © 2011-2022 走看看