zoukankan      html  css  js  c++  java
  • ReplicaSetController、ReplicationController

    cmd\kube-controller-manager\app\core.go
    func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicaset.NewReplicaSetController(
    		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
    		replicaset.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) --->pkg\controller\replicaset\replica_set.go
    	return nil, true, nil
    }
    
    func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicationcontroller.NewReplicationManager(
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
    		controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
    		replicationcontroller.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) --->pkg\controller\replication\replication_controller.go
    	return nil, true, nil
    }
    
    pkg\controller\replication\replication_controller.go
    func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
    	return &ReplicationManager{
    		*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
    			v1.SchemeGroupVersion.WithKind("ReplicationController"),
    			"replication_controller",
    			"replicationmanager",
    			podControlAdapter{controller.RealPodControl{
    				KubeClient: kubeClient,
    				Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
    			}},
    		),
    	}
    } --->pkg\controller\replication\replication_controller.go
    
    pkg\controller\replicaset\replica_set.go
    Run
    	// 开协程池进行任务监听
    	for i := 0; i < workers; i++ {
    		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) worker(ctx context.Context) {
    	for rsc.processNextWorkItem(ctx) {
    	}
    }
    
    
    func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
    	// 通过队列get保证无锁模式不重复处理
    	key, quit := rsc.queue.Get()
    	err := rsc.syncHandler(ctx, key.(string)) --->syncReplicaSet
    	// 处理失败会重新入队,只是会有频率限制
    	if err == nil {
    		rsc.queue.Forget(key)
    		return true
    	}
    	rsc.queue.AddRateLimited(key)
    	return true
    }
    
    
    **syncReplicaSet**
    	// 获取rs配置及selector标签
    	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    	// 获取所有处于活跃状态pod
    	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    	filteredPods := controller.FilterActivePods(allPods)
    	// 返回符合label的pod列表
    	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
    	// 开始运算调整
    	if rsNeedsSync && rs.DeletionTimestamp == nil {
    		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
    	}
    	// 因为缓存的缘故必须DeepCopy后才能修改信息
    	rs = rs.DeepCopy()
    	// 更新rs新的状态
    	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    	// 如果仍有不符合rs定义的副本数的情况,则会再次入队列延迟后处理
    	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
    		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
    		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
    		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
    	...
    	cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
    	return cm.ClaimPods(ctx, filteredPods) --->pkg\controller\controller_ref_manager.go
    }
    
    
    **manageReplicas**
    	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    	if diff < 0 {
    		// 小于0说明需要增加pod数量
    		diff *= -1
    		if diff > rsc.burstReplicas {
    			// 如果差异数量太多,则会按burstReplicas(默认500)创建
    			diff = rsc.burstReplicas
    		}
    		// 分小批单开go创建
    		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    		})
    	} else if diff > 0 {
    		// d大于0说明需要减少pod数量
    		if diff > rsc.burstReplicas {
    			diff = rsc.burstReplicas
    		}
    		// 获取要被清理的pod列表
    		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
    		errCh := make(chan error, diff)
    		var wg sync.WaitGroup
    		wg.Add(diff)
    		for _, pod := range podsToDelete {
    			go func(targetPod *v1.Pod) {
    				defer wg.Done()
    				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
    					if !apierrors.IsNotFound(err) {
    						klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
    						errCh <- err
    					}
    				}
    			}(pod)
    		}
    		wg.Wait()
    
    		select {
    		case err := <-errCh:
    			// 如果任何一个删除时出现报错则直接返回
    			if err != nil {
    				return err
    			}
    		default:
    		}
    	}
    
    
    func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
    	if diff < len(filteredPods) {
    		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
    		sort.Sort(podsWithRanks)
    		reportSortingDeletionAgeRatioMetric(filteredPods, diff)
    	}
    	return filteredPods[:diff]
    }
    
    
    // node节点上每有一个Active的pod则加一分
    func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
    	podsOnNode := make(map[string]int)
    	for _, pod := range relatedPods {
    		if controller.IsPodActive(pod) {
    			podsOnNode[pod.Spec.NodeName]++
    		}
    	}
    	ranks := make([]int, len(podsToRank))
    	for i, pod := range podsToRank {
    		ranks[i] = podsOnNode[pod.Spec.NodeName]
    	}
    	return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
    }
    
    pkg\controller\controller_ref_manager.go
    // 匹配label,返回符合的pod列表
    ClaimPods
    	match := func(obj metav1.Object) bool {
    		pod := obj.(*v1.Pod)
    		// Check selector first so filters only run on potentially matching Pods.
    		if !m.Selector.Matches(labels.Set(pod.Labels)) {
    			return false
    		}
    		for _, filter := range filters {
    			if !filter(pod) {
    				return false
    			}
    		}
    		return true
    	}
    
  • 相关阅读:
    hibernate中持久化对象的生命周期(转载)
    IDEA调试技巧之条件断点
    POI中不推荐的方法与其替代的方法
    visualvm监控类是否是多例模式
    IDEA中Maven项目使用Junit4单元测试的写法
    JPQL的关联查询
    poi的cellstyle陷阱,样式覆盖
    studio 连不上远程仓库的各种原因分析
    Concurrent usage detected
    我的SSH框架实例(附源码)
  • 原文地址:https://www.cnblogs.com/bfmq/p/15720682.html
Copyright © 2011-2022 走看看