zoukankan      html  css  js  c++  java
  • ReplicaSetController、ReplicationController

    cmd\kube-controller-manager\app\core.go
    func startReplicaSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicaset.NewReplicaSetController(
    		controllerContext.InformerFactory.Apps().V1().ReplicaSets(),
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.ClientBuilder.ClientOrDie("replicaset-controller"),
    		replicaset.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs)) --->pkg\controller\replicaset\replica_set.go
    	return nil, true, nil
    }
    
    func startReplicationController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
    	go replicationcontroller.NewReplicationManager(
    		controllerContext.InformerFactory.Core().V1().Pods(),
    		controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
    		controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
    		replicationcontroller.BurstReplicas,
    	).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs)) --->pkg\controller\replication\replication_controller.go
    	return nil, true, nil
    }
    
    pkg\controller\replication\replication_controller.go
    func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager {
    	return &ReplicationManager{
    		*replicaset.NewBaseController(informerAdapter{rcInformer}, podInformer, clientsetAdapter{kubeClient}, burstReplicas,
    			v1.SchemeGroupVersion.WithKind("ReplicationController"),
    			"replication_controller",
    			"replicationmanager",
    			podControlAdapter{controller.RealPodControl{
    				KubeClient: kubeClient,
    				Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replication-controller"}),
    			}},
    		),
    	}
    } --->pkg\controller\replication\replication_controller.go
    
    pkg\controller\replicaset\replica_set.go
    Run
    	// 开协程池进行任务监听
    	for i := 0; i < workers; i++ {
    		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) worker(ctx context.Context) {
    	for rsc.processNextWorkItem(ctx) {
    	}
    }
    
    
    func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
    	// 通过队列get保证无锁模式不重复处理
    	key, quit := rsc.queue.Get()
    	err := rsc.syncHandler(ctx, key.(string)) --->syncReplicaSet
    	// 处理失败会重新入队,只是会有频率限制
    	if err == nil {
    		rsc.queue.Forget(key)
    		return true
    	}
    	rsc.queue.AddRateLimited(key)
    	return true
    }
    
    
    **syncReplicaSet**
    	// 获取rs配置及selector标签
    	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    	// 获取所有处于活跃状态pod
    	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    	filteredPods := controller.FilterActivePods(allPods)
    	// 返回符合label的pod列表
    	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
    	// 开始运算调整
    	if rsNeedsSync && rs.DeletionTimestamp == nil {
    		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
    	}
    	// 因为缓存的缘故必须DeepCopy后才能修改信息
    	rs = rs.DeepCopy()
    	// 更新rs新的状态
    	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    	// 如果仍有不符合rs定义的副本数的情况,则会再次入队列延迟后处理
    	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
    		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
    		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
    		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    	}
    
    
    func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
    	...
    	cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
    	return cm.ClaimPods(ctx, filteredPods) --->pkg\controller\controller_ref_manager.go
    }
    
    
    **manageReplicas**
    	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    	if diff < 0 {
    		// 小于0说明需要增加pod数量
    		diff *= -1
    		if diff > rsc.burstReplicas {
    			// 如果差异数量太多,则会按burstReplicas(默认500)创建
    			diff = rsc.burstReplicas
    		}
    		// 分小批单开go创建
    		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
    			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
    		})
    	} else if diff > 0 {
    		// d大于0说明需要减少pod数量
    		if diff > rsc.burstReplicas {
    			diff = rsc.burstReplicas
    		}
    		// 获取要被清理的pod列表
    		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
    		errCh := make(chan error, diff)
    		var wg sync.WaitGroup
    		wg.Add(diff)
    		for _, pod := range podsToDelete {
    			go func(targetPod *v1.Pod) {
    				defer wg.Done()
    				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
    					if !apierrors.IsNotFound(err) {
    						klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
    						errCh <- err
    					}
    				}
    			}(pod)
    		}
    		wg.Wait()
    
    		select {
    		case err := <-errCh:
    			// 如果任何一个删除时出现报错则直接返回
    			if err != nil {
    				return err
    			}
    		default:
    		}
    	}
    
    
    func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
    	if diff < len(filteredPods) {
    		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
    		sort.Sort(podsWithRanks)
    		reportSortingDeletionAgeRatioMetric(filteredPods, diff)
    	}
    	return filteredPods[:diff]
    }
    
    
    // node节点上每有一个Active的pod则加一分
    func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
    	podsOnNode := make(map[string]int)
    	for _, pod := range relatedPods {
    		if controller.IsPodActive(pod) {
    			podsOnNode[pod.Spec.NodeName]++
    		}
    	}
    	ranks := make([]int, len(podsToRank))
    	for i, pod := range podsToRank {
    		ranks[i] = podsOnNode[pod.Spec.NodeName]
    	}
    	return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
    }
    
    pkg\controller\controller_ref_manager.go
    // 匹配label,返回符合的pod列表
    ClaimPods
    	match := func(obj metav1.Object) bool {
    		pod := obj.(*v1.Pod)
    		// Check selector first so filters only run on potentially matching Pods.
    		if !m.Selector.Matches(labels.Set(pod.Labels)) {
    			return false
    		}
    		for _, filter := range filters {
    			if !filter(pod) {
    				return false
    			}
    		}
    		return true
    	}
    
  • 相关阅读:
    ICONS-图标库
    图形资源
    vue项目中,如果修改了组件名称,vscode编辑器会在引入修改组件的名字处提示红色波浪线 The file is in the program because:Imported via xxx Root file specified for compilation .
    接口在dev环境报跨域问题(has been blocked by CORS policy:Response to preflight request doesn't pass access control check:No 'Access-Control-Allow-Origin' header ispresent on the requested resource.),qa环境正常
    阿里云occ的图片文件URL用浏览器直接打开无法访问,提示This XML file does noe appear to have any style information associated with it. The document tree is shown below.
    vue 项目使用element ui 中tree组件 check-strictly 用法(父子不互相关联的反显情况)
    高德地图进行线路规划绘制标记点操作(vue)
    vue中实现拖拽调整顺序功能
    2021-01-22 浏览器相关知识
    2021-01-22 js 相关知识点
  • 原文地址:https://www.cnblogs.com/bfmq/p/15720682.html
Copyright © 2011-2022 走看看