zoukankan      html  css  js  c++  java
  • ReplicaSetController、ReplicationController

    cmd\kube-controller-manager\app\core.go
    func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicaset.NewReplicaSetController(
    		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
    		replicaset.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) --->pkg\controller\replicaset\replica_set.go
    	return nil, true, nil
    }
    
    func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicationcontroller.NewReplicationManager(
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
    		controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
    		replicationcontroller.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) --->pkg\controller\replication\replication_controller.go
    	return nil, true, nil
    }
    
    pkg\controller\replication\replication_controller.go
    func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
    	return &ReplicationManager{
    		*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
    			v1.SchemeGroupVersion.WithKind("ReplicationController"),
    			"replication_controller",
    			"replicationmanager",
    			podControlAdapter{controller.RealPodControl{
    				KubeClient: kubeClient,
    				Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
    			}},
    		),
    	}
    } --->pkg\controller\replication\replication_controller.go
    
    pkg\controller\replicaset\replica_set.go
    Run
    	// 开协程池进行任务监听
    	for i := 0; i < workers; i++ {
    		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) worker(ctx context.Context) {
    	for rsc.processNextWorkItem(ctx) {
    	}
    }
    
    
    func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
    	// 通过队列get保证无锁模式不重复处理
    	key, quit := rsc.queue.Get()
    	err := rsc.syncHandler(ctx, key.(string)) --->syncReplicaSet
    	// 处理失败会重新入队,只是会有频率限制
    	if err == nil {
    		rsc.queue.Forget(key)
    		return true
    	}
    	rsc.queue.AddRateLimited(key)
    	return true
    }
    
    
    **syncReplicaSet**
    	// 获取rs配置及selector标签
    	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    	// 获取所有处于活跃状态pod
    	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    	filteredPods := controller.FilterActivePods(allPods)
    	// 返回符合label的pod列表
    	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
    	// 开始运算调整
    	if rsNeedsSync && rs.DeletionTimestamp == nil {
    		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
    	}
    	// 因为缓存的缘故必须DeepCopy后才能修改信息
    	rs = rs.DeepCopy()
    	// 更新rs新的状态
    	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    	// 如果仍有不符合rs定义的副本数的情况,则会再次入队列延迟后处理
    	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
    		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
    		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
    		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
    	...
    	cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
    	return cm.ClaimPods(ctx, filteredPods) --->pkg\controller\controller_ref_manager.go
    }
    
    
    **manageReplicas**
    	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    	if diff < 0 {
    		// 小于0说明需要增加pod数量
    		diff *= -1
    		if diff > rsc.burstReplicas {
    			// 如果差异数量太多,则会按burstReplicas(默认500)创建
    			diff = rsc.burstReplicas
    		}
    		// 分小批单开go创建
    		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    		})
    	} else if diff > 0 {
    		// d大于0说明需要减少pod数量
    		if diff > rsc.burstReplicas {
    			diff = rsc.burstReplicas
    		}
    		// 获取要被清理的pod列表
    		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
    		errCh := make(chan error, diff)
    		var wg sync.WaitGroup
    		wg.Add(diff)
    		for _, pod := range podsToDelete {
    			go func(targetPod *v1.Pod) {
    				defer wg.Done()
    				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
    					if !apierrors.IsNotFound(err) {
    						klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
    						errCh <- err
    					}
    				}
    			}(pod)
    		}
    		wg.Wait()
    
    		select {
    		case err := <-errCh:
    			// 如果任何一个删除时出现报错则直接返回
    			if err != nil {
    				return err
    			}
    		default:
    		}
    	}
    
    
    func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
    	if diff < len(filteredPods) {
    		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
    		sort.Sort(podsWithRanks)
    		reportSortingDeletionAgeRatioMetric(filteredPods, diff)
    	}
    	return filteredPods[:diff]
    }
    
    
    // node节点上每有一个Active的pod则加一分
    func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
    	podsOnNode := make(map[string]int)
    	for _, pod := range relatedPods {
    		if controller.IsPodActive(pod) {
    			podsOnNode[pod.Spec.NodeName]++
    		}
    	}
    	ranks := make([]int, len(podsToRank))
    	for i, pod := range podsToRank {
    		ranks[i] = podsOnNode[pod.Spec.NodeName]
    	}
    	return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
    }
    
    pkg\controller\controller_ref_manager.go
    // 匹配label,返回符合的pod列表
    ClaimPods
    	match := func(obj metav1.Object) bool {
    		pod := obj.(*v1.Pod)
    		// Check selector first so filters only run on potentially matching Pods.
    		if !m.Selector.Matches(labels.Set(pod.Labels)) {
    			return false
    		}
    		for _, filter := range filters {
    			if !filter(pod) {
    				return false
    			}
    		}
    		return true
    	}
    
  • 相关阅读:
    Running ASP.NET Applications in Debian and Ubuntu using XSP and Mono
    .net extjs 封装
    ext direct spring
    install ubuntu tweak on ubuntu lts 10.04,this software is created by zhouding
    redis cookbook
    aptana eclipse plugin install on sts
    ubuntu open folderpath on terminal
    ubuntu install pae for the 32bit system 4g limited issue
    EXT Designer 正式版延长使用脚本
    用 Vagrant 快速建立開發環境
  • 原文地址:https://www.cnblogs.com/bfmq/p/15720682.html
Copyright © 2011-2022 走看看