现象
用户通过在deployment中配置nodeName字段来直接绑定pod到特定的节点做相关测试验证,此时该deployment对应的pod不断被创建出来,短时间被创建出数千个pod,且状态均为Outofpods
Kubernetes release-1.16
总结
用户通过在deployment中直接配置nodeName字段尝试绑定特定节点运行pod,而default调度器会忽略nodeName字段不为空的pod,由kubelet listAndWatch到pod之后筛选nodeName为自己的hostname的pod,放到syncLoop循环处理,其中具有一个Admit过程通过把scheduler的generalPredicate算法在(node, pod)上面执行一次,如果失败且pod不是critical pod,那么这个pod会被reject,更新pod的status.phase为failed并上报event,同时rs controller的reconcile逻辑中会忽略status.phase为failed的pod,因此会创建出一个新pod,导致整个过程不断循环。
分析
default scheduler
如果pod中直接指定nodeName,default scheduler将直接忽略该pod,如下:Run方法中通过scheduler.New创建一个Scheduler对象,New方法中通过AddAllEventHandlers方法为scheduler的podInformer设置EventHandler,其中通过assignedPod方法判断的nodeName是否为空以及responsibleForPod方法判断pod是否配置其他调度器来调度
可以看到为Pod Informer设置了两次EventHandler,第一个为更新scheduler的pod缓存,第二个则为把pod添加进调度队列
informer中的sharedProcess通过持有多个processorListener,支持配置多个EventHandler,细节可参考:https://www.cnblogs.com/orchidzjl/p/14768781.html
# cmd/kube-scheduler/app/server.go @Run
func AddAllEventHandlers(
sched *Scheduler,
schedulerName string,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
serviceInformer coreinformers.ServiceInformer,
storageClassInformer storageinformersv1.StorageClassInformer,
csiNodeInformer storageinformersv1beta1.CSINodeInformer,
) {
// scheduled pod cache
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,
UpdateFunc: sched.updatePodInCache,
DeleteFunc: sched.deletePodFromCache,
},
},
)
// unscheduled pod queue
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
//如果Pod的spec.nodeName字段不为空或者spec.schedulerName不是default
return !assignedPod(t) && responsibleForPod(t, schedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,
UpdateFunc: sched.updatePodInSchedulingQueue,
DeleteFunc: sched.deletePodFromSchedulingQueue,
},
},
kubelet
我们知道Kubelet结构体中的syncLoopIteration方法接收多个channel数据源的信息,并调用相应的handler处理,其中configCh是kubelet获取pod的数据源,比如通过informer从api中拉取到一个新的Pod,对应的handler为HandlePodAdditions
configCh的数据来源有多个,如下makePodSourceConfig方法为configCh添加多个数据源,其中一个就是api,通过配置spec.nodeName FieldSelector的ListOptions来向api获取所有ns下指定nodeName的pod
# pkg/kubelet/kubelet.go@makePodSourceConfig
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
开启 syncLoop循环处理目标pod
# pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
return true
}
HandlePodAdditions方法中会根据CreationTime对待处理的pod排序逐个处理,并通过podIsTerminated判断该pod是否需要处理,如果需要则通过canAdmitPod调用各个handler的Admit方法判断,如果存在一个失败,则调用rejectPod直接更新pod为failed
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not terminated.
// We failed pods that we rejected, so activePods include all admitted pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
// canAdmitPod将判断该Pod是否可以被kubelet创建,如果Admit失败,那么通过rejectPod方法处理
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
}
}
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manage.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: "Pod " + message})
}
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
// notRunning方法检查pod中所有ContainerStatus,如果存在一个podstatus.State.Terminated == nil && status.State.Waiting == nil则认为该pod(至少一个容器)running
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
HandlePodAdditions接下来通过canAdmitPod方法判断该Pod是否可以被kubelet创建,其通过kubelet对象的admitHandlers获取注册好的handler对象,并逐一调用这些对象的Admit方法检查pod
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
// pods为kubelet中 正在运行的 && 已经被admit确认过没问题的 && 不是terminated的 pod
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
return true, "", ""
}
而kubelet对象的admitHandlers是在如下NewMainKubelet方法中注册,默认注册两个admitHandlers
# pkg/kubelet/kubelet.go @NewMainKubelet
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
lifecycle.NewPredicateAdmitHandler方法通过criticalPodAdmissionHandler生成一个admitHandler
- 在该handler中需要根据node信息来admit pod,比如node的resources allocatable是否满足pod resources、node的 labels是否match pod的nodeSelector label
klet.getNodeAnyWay:总是能返回一个Node对象的方法,先从apiserver(其实是informer cache)中获取node,如果获取不到则直接返回一个initial node:该node的信息由kubelet的启动参数初始化,比如label只包含Hostname、OS、Arch等
// getNodeAnyWay() must return a *v1.Node which is required by RunGeneralPredicates().
// The *v1.Node is obtained as follows:
// Return kubelet's nodeInfo for this node, except on error or if in standalone mode,
// in which case return a manufactured nodeInfo representing a node with no pods,
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
if kl.kubeClient != nil {
if n, err := kl.nodeInfo.GetNodeInfo(string(kl.nodeName)); err == nil {
return n, nil
}
}
return kl.initialNode()
}
- 在该handler中admit pod失败后,如果改pod类型是critical pod,则需要根据AdmissionFailureHandler来决定如何处理该pod,否则直接reject pod(即设置pod的status.phase为failed)
preemption.NewCriticalPodAdmissionHandler创建出一个CriticalPodAdmissionHandler对象,该对象的HandleAdmissionFailure方法知道当一个critical类型的pod被admit失败之后该如何处理
predicateAdmitHandler的Admit方法是真正admit一个pod的逻辑
func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
//通过kulet的getNodeAnyWay获取一个Node
node, err := w.getNodeAnyWayFunc()
//待处理的pod
admitPod := attrs.Pod
//节点上已经被admit过可行的其他非terminated的pod
pods := attrs.OtherPods
//获取一个调度器中用到的NodeInfo对象,该对象知道节点的requestedResource、allocatableResource等信息
nodeInfo := schedulernodeinfo.NewNodeInfo(pods...)
//利用node的status来初始化scheduler NodeInfo对象中的allocatableResource等信息
nodeInfo.SetNode(node)
// Remove the requests of the extended resources that are missing in the
// node info. This is required to support cluster-level resources, which
// are extended resources unknown to nodes.
//
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
// node-level extended resource it requires is not found, then kubelet will
// not fail admission while it should. This issue will be addressed with
// the Resource Class API in the future.
//去除那些集群层面但是节点并不感知的extended resources
podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
//利用调度器模块中的GeneralPredicates方法检查pod是否满足其中的所有predicate方法
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
if !fit {
//HandleAdmissionFailure中判断pod如果是critical pod,且失败原因仅为相关资源不足,则尝试驱逐其他pod来释放资源
fit, reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
if err != nil {
message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
klog.Warningf("Failed to admit pod %v - %s", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: "UnexpectedAdmissionError",
Message: message,
}
}
}
if !fit {
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *predicates.PredicateFailureError:
reason = re.PredicateName
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
//如果GeneralPredicates返回不合适的原因是资源不足,那么对于非critical的pod来说就返回对应的PodAdmitResult
case *predicates.InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message = re.Error()
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
case *predicates.FailureReason:
reason = re.GetReason()
message = fmt.Sprintf("Failure: %s", re.GetReason())
klog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(admitPod), message)
return PodAdmitResult{
Admit: fit,
Reason: reason,
Message: message,
}
}
}
GeneralPredicates中通过noncriticalPredicates和EssentialPredicates两组预选算法来判断pod是否合适,其中noncriticalPredicates中就包含PodFitsResources方法,而EssentialPredicates包含PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector
// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need
func GeneralPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
var predicateFails []PredicateFailureReason
for _, predicate := range []FitPredicate{noncriticalPredicates, noncriticalPredicates} {
fit, reasons, err := predicate(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
}
return len(predicateFails) == 0, predicateFails, nil
}
PodFitsResources坚持pod需要的资源和NodeInfo是否能够满足
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
node := nodeInfo.Node()
// 如果node中当前运行的pod+1>最大允许的pod,则添加一个failed reason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
// aliyun gpu share之类的extended resource
var podRequest *schedulernodeinfo.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok {
podRequest = predicateMeta.podRequest
if predicateMeta.ignoredExtendedResources != nil {
ignoredExtendedResources = predicateMeta.ignoredExtendedResources
}
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
}
// cpu
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
// memory
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
// ephemeralStorage
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
return len(predicateFails) == 0, predicateFails, nil
}
HandleAdmissionFailure方法判断pod如果是critical pod,且失败原因仅为相关资源不足,则尝试驱逐其他pod来释放资源,其中IsCriticalPod方法判断pod是否critical pod
// IsCriticalPod returns true if pod's priority is greater than or equal to SystemCriticalPriority.
func IsCriticalPod(pod *v1.Pod) bool {
//静态pod
if IsStaticPod(pod) {
return true
}
//pod设置了优先级且大于2倍的用户配置的Highest User Definable Priority
if pod.Spec.Priority != nil && IsCriticalPodBasedOnPriority(*pod.Spec.Priority) {
return true
}
return false
}
// HandleAdmissionFailure gracefully handles admission rejection, and, in some cases,
// to allow admission of the pod despite its previous failure.
func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []predicates.PredicateFailureReason) (bool, []predicates.PredicateFailureReason, error) {
//判断是否critical pod
if !kubetypes.IsCriticalPod(admitPod) {
return false, failureReasons, nil
}
// InsufficientResourceError is not a reason to reject a critical pod.
// Instead of rejecting, we free up resources to admit it, if no other reasons for rejection exist.
nonResourceReasons := []predicates.PredicateFailureReason{}
resourceReasons := []*admissionRequirement{}
for _, reason := range failureReasons {
if r, ok := reason.(*predicates.InsufficientResourceError); ok {
resourceReasons = append(resourceReasons, &admissionRequirement{
resourceName: r.ResourceName,
quantity: r.GetInsufficientAmount(),
})
} else {
nonResourceReasons = append(nonResourceReasons, reason)
}
}
if len(nonResourceReasons) > 0 {
// Return only reasons that are not resource related, since critical pods cannot fail admission for resource reasons.
return false, nonResourceReasons, nil
}
//如果admit失败的reason都是因为InsufficientResource,那么尝试驱逐pod来释放相关资源
err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
// if no error is returned, preemption succeeded and the pod is safe to admit.
return err == nil, nil, err
}
rs controller
前面分析了kubelet中会通过scheduler的generalPredicate中的PodFitResources方法来判断pod是否合适在节点中运行,如果predicate失败且不是critical pod,那么kubelet会向api更新该pod的status.phase为failed,而现象中pod是不断的被rs controller创建出来,过程如下:
controller-manager启动所有注册的controller
# cmd/kube-controller-manager/app/controllermanager.go @Run
// Run方法通过StartControllers启动所有注册好的controller
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
//NewControllerInitializers方法返回一个map,分别对应每个controller的启动方法
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
......
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
......
return controllers
}
启动rs controller
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
//向factor中添加rs和pod informer
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
rs controller的reconcile方法,其中会list出所有的pod,并过滤掉那些inactive的pod,如pod的Status.Phase为Succeed或者Failed的或者被标记为删除的(DeletionTimestamp != nil),所以对以一个status.phase为failed的pod,rs会忽略并创建一个新的pod
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
//从informer cache中获取rs
namespace, name, err := cache.SplitMetaNamespaceKey(key)
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
//判断当前rs是否需要处理
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
//直接列出所有的pod,而不是根据rs的selector
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
// Ignore inactive pods.
//过滤掉inactive的pod,
filteredPods := controller.FilterActivePods(allPods)
// NOTE: filteredPods are pointing to objects from cache - if you need to
// modify them, you need to copy it first.
//rs尝试收养那些可能匹配的pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
var manageReplicasErr error
//调整rs的副本数
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
// 更新rs status
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
}
// FilterActivePods returns pods that have not terminated.
func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
if IsPodActive(p) {
result = append(result, p)
} else {
klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
}
}
return result
}