zoukankan      html  css  js  c++  java
  • 7.深入k8s:任务调用Job与CronJob及源码分析

    转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com

    在使用job中,我会结合源码进行一定的讲解,我们也可以从源码中一窥究竟,一些细节k8s是如何处理的,从而感受k8s的魅力。源码版本是1.19

    img

    Job

    Job的基本使用

    Job主要是用来任务调用,可以一个或多个 Pod,并确保指定数量的 Pod 可以成功执行到进程正常结束。

    创建一个Job:

    apiVersion: batch/v1
    kind: Job
    metadata:
      name: pi
    spec:
      template:
        spec:
          containers:
          - name: pi
            image: perl
            command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
          restartPolicy: Never
      backoffLimit: 4
    

    这个Job会创建一个容器,然后执行命令进行π的计算,

    然后我们创建这个pod:

    $ kubectl create -f job.yaml
    
    $ kubectl describe jobs/pi
    
    Name:           pi
    Namespace:      default
    Selector:       controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
    Labels:         controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
                    job-name=pi
    Annotations:    Parallelism:  1
    Completions:    1
    ...
    Pods Statuses:  0 Running / 1 Succeeded / 0 Failed
    Pod Template:
      Labels:  controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
               job-name=pi
      Containers:
       pi:
        Image:      resouer/ubuntu-bc
        ...
    Events:
      Type    Reason            Age   From            Message
      ----    ------            ----  ----            -------
      Normal  SuccessfulCreate  29m   job-controller  Created pod: pi-g9fs4
      Normal  Completed         27m   job-controller  Job completed
    

    可以看到创建对象后,Pod模板中,被自动加上了一个controller-uid=< 一个随机字符串 > 这样的 Label。而这个 Job 对象本身,则被自动加上了这个 Label 对应的 Selector,从而 保证了 Job 与它所管理的 Pod 之间的匹配关系。这个uid避免了不同Job对象的Pod不会重合。

    $ kubectl get pod
    NAME           READY   STATUS      RESTARTS   AGE
    pi-g9fs4       0/1     Completed   0          33m
    
    $ kubectl describe pod pi-g9fs4
    ...
    Events:
      Type    Reason     Age   From                     Message
      ----    ------     ----  ----                     -------
      Normal  Scheduled  35m   default-scheduler        Successfully assigned default/pi-g9fs4 to 192.168.13.130
      Normal  Pulling    35m   kubelet, 192.168.13.130  Pulling image "resouer/ubuntu-bc"
      Normal  Pulled     35m   kubelet, 192.168.13.130  Successfully pulled image "resouer/ubuntu-bc"
      Normal  Created    35m   kubelet, 192.168.13.130  Created container pi
      Normal  Started    35m   kubelet, 192.168.13.130  Started container pi
    

    我们可以看到Pod在创建好运行完毕之后会进入到Completed状态。上面的yaml定义中restartPolicy=Never也保证了这个Pod只会运行一次。

    如果创建的Pod运行失败了,那么Job Controller会不断创建一个新的Pod:

    $ kubectl get pods
    NAME                                READY     STATUS              RESTARTS   AGE
    pi-55h89                            0/1       ContainerCreating   0          2s
    pi-tqbcz                            0/1       Error               0          5s
    

    参数说明

    spec.backoffLimit

    我们在上面的字段中定义了为4,表示重试次数为4。

    restartPolicy

    在运行过程中,可能发生各种系统问题导致的Pod运行失败,如果设置restartPolicy为OnFailure,那么在运行中发生的失败后Job Controller会重启Pod里面的容器,而不是创建新的Pod。

    还可以设置为Never,表示容器运行失败之后不会重启。更多具体的参见Pod生命周期

    spec.activeDeadlineSeconds

    表示最长运行时间,单位是秒。如:

    spec:
     backoffLimit: 5
     activeDeadlineSeconds: 100
    

    这样设置之后会进入pastActiveDeadline进行校验job.Spec.ActiveDeadlineSeconds是不是为空,不是空的话,会比较Pod的运行时间duration是否大于job.Spec.ActiveDeadlineSeconds设置的值,如果大于,那么会标记Pod终止的原因是DeadlineExceeded。

    在job Controller的源码中,我们可以看到这部分的逻辑:

    job Controller首先会去校验任务是不是处理次数是不是超过了BackoffLimit设置,如果没有超过的话就校验有没有设置ActiveDeadlineSeconds,如果设置了的话,就校验当前job运行时间是否超过了ActiveDeadlineSeconds设置的的时间,超过了那么会打上标记,表示这个job运行失败。

    ...
    	jobHaveNewFailure := failed > job.Status.Failed
    
    	exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
    		(int32(previousRetry)+1 > *job.Spec.BackoffLimit)
    
    	if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
    		// check if the number of pod restart exceeds backoff (for restart OnFailure only)
    		// OR if the number of failed jobs increased since the last syncJob
    		jobFailed = true
    		failureReason = "BackoffLimitExceeded"
    		failureMessage = "Job has reached the specified backoff limit"
    	} else if pastActiveDeadline(&job) {
    		jobFailed = true
    		failureReason = "DeadlineExceeded"
    		failureMessage = "Job was active longer than specified deadline"
    	}
    ...
    
    
    func pastActiveDeadline(job *batch.Job) bool {
    	if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
    		return false
    	}
    	now := metav1.Now()
    	start := job.Status.StartTime.Time
    	duration := now.Time.Sub(start)
    	allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
    	return duration >= allowedDuration
    }
    

    Job的并行任务

    在 Job 对象中,负责并行控制的参数有两个:

    1. spec.parallelism表示一个 Job 在任意时间最多可以启动多少个 Pod 同时运行;
    2. spec.completions表示Job 的最小完成数。

    举例:

    apiVersion: batch/v1
    kind: Job
    metadata:
      name: pi
    spec:
      parallelism: 2
      completions: 4
      template:
        spec:
          containers:
          - name: pi
            image: perl
            command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]
          restartPolicy: Never
      backoffLimit: 4
    

    在创建任务之后,我们可以看到最多只会有两个Pod同时运行:

    $ kubectl get pod
    
    NAME           READY   STATUS              RESTARTS   AGE
    pi-8fsrn       0/1     ContainerCreating   0          30s
    pi-job-67kwg   0/1     Completed           0          14h
    pi-wlbm5       0/1     ContainerCreating   0          30s
    

    每当有一个 Pod 完成计算进入 Completed 状态时,就会有一个新的 Pod 被自动创建出来,并且快速地从 Pending 状态进入到 ContainerCreating 状态。

    最终我们可以看到job的COMPLETIONS会标记全部完成:

    $ kubectl get job
    NAME     COMPLETIONS   DURATION   AGE
    pi       4/4           2m52s      2m52s
    

    Job Controller中会会根据配置的并发数来确认当前处于 active 的 pods 数量是否合理,如果不合理的话则进行调整。

    如果处于 active 状态的 pods 数大于 job 设置的并发数 job.Spec.Parallelism,则并发删除多余的 active pods。

    Job源码分析

    通过上面的使用例子,我们可以看到job的使用时非常的简单的,下面我们通过源码来理解一下这job的运行逻辑。

    核心源码位置在job_controller.go中Controller类的syncJob方法中:

    syncJob方法很长,我还是想要将这个方法拆开来进行说明。

    Controller#syncJob

    func (jm *Controller) syncJob(key string) (bool, error) {
    	...
    	job := *sharedJob
    
    	// if job was finished previously, we don't want to redo the termination
    	// 如果job已经跑完了,那么直接返回,避免重跑
    	if IsJobFinished(&job) {
    		return true, nil
    	}
    
    	// retrieve the previous number of retry
    	// 获取job的重试次数
    	previousRetry := jm.queue.NumRequeues(key)
    
    	
    	jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
    	//获取这个job的pod列表
    	pods, err := jm.getPodsForJob(&job)
    	if err != nil {
    		return false, err
    	}
    	//找到这个job中仍然活跃的pod
    	activePods := controller.FilterActivePods(pods)
    	active := int32(len(activePods))
    	//获取job中运行成功的pod数和运行失败的pod数
    	succeeded, failed := getStatus(pods)
    	conditions := len(job.Status.Conditions)
    	// job first start
    	//设置job 的启动时间
    	if job.Status.StartTime == nil {
    		now := metav1.Now()
    		job.Status.StartTime = &now
    		// enqueue a sync to check if job past ActiveDeadlineSeconds
    		if job.Spec.ActiveDeadlineSeconds != nil {
    			klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",
    				key, *job.Spec.ActiveDeadlineSeconds)
    			jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
    		}
    	}
    	...
    }
    

    这部分的代码会校验job是否已经跑完了,如果跑完了直接返回;

    然后获取job的重试次数,以及与job关联的pod列表,并计算出活跃的pod数量、运行成功的pod数量、以及失败的pod数量;

    接下来如果job是首次启动,那么需要设置job的启动时间。

    继续:

    func (jm *Controller) syncJob(key string) (bool, error) {
    	...
    	var manageJobErr error
    	jobFailed := false
    	var failureReason string
    	var failureMessage string
    	//failed次数超过了job.Status.Failed说明有新的pod运行失败了
    	jobHaveNewFailure := failed > job.Status.Failed
    	// new failures happen when status does not reflect the failures and active
    	// is different than parallelism, otherwise the previous controller loop
    	// failed updating status so even if we pick up failure it is not a new one
    	//如果有新的pod运行失败,并且活跃的pod不等于并行Parallelism数
    	//并且重试次数超过了BackoffLimit
    	exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
    		(int32(previousRetry)+1 > *job.Spec.BackoffLimit)
    	//重试次数是否超标
    	if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
    		// check if the number of pod restart exceeds backoff (for restart OnFailure only)
    		// OR if the number of failed jobs increased since the last syncJob
    		jobFailed = true
    		failureReason = "BackoffLimitExceeded"
    		failureMessage = "Job has reached the specified backoff limit"
    	//	job运行时间是否超过了ActiveDeadlineSeconds
    	} else if pastActiveDeadline(&job) {
    		jobFailed = true
    		failureReason = "DeadlineExceeded"
    		failureMessage = "Job was active longer than specified deadline"
    	}
    	...
    }
    

    这段代码是用来判断job是否运行失败,判断依据是job重试次数是否超过了BackoffLimit,以及job的运行时间是否超过了设置的ActiveDeadlineSeconds。

    上面这里会获取上一次运行的Failed次数和这次的job的failed次数进行比较,如果failed多了表示又产生了新的运行失败的pod。如果运行失败会标识出失败原因,以及设置jobFailed为true。

    在上面的代码中调用了pastBackoffLimitOnFailure方法和pastActiveDeadline方法,我们分别看一下:

    pastBackoffLimitOnFailure

    func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
    	//如果RestartPolicy为OnFailure,那么直接返回
    	if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
    		return false
    	}
    	result := int32(0)
    	for i := range pods {
    		po := pods[i]
    		//如果pod状态为Running或Pending
    		//获取到pod对应的重启次数以及Container状态,包含pod中的InitContainer
    		if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {
    			for j := range po.Status.InitContainerStatuses {
    				stat := po.Status.InitContainerStatuses[j]
    				result += stat.RestartCount
    			}
    			for j := range po.Status.ContainerStatuses {
    				stat := po.Status.ContainerStatuses[j]
    				result += stat.RestartCount
    			}
    		}
    	}
    	//如果BackoffLimit等于,那么只要重启了一次,则返回true
    	if *job.Spec.BackoffLimit == 0 {
    		return result > 0
    	}
    	//比较重启次数是否超过了BackoffLimit
    	return result >= *job.Spec.BackoffLimit
    }
    

    这个方法会校验job的RestartPolicy策略,不是OnFailure才继续往下执行。然后会遍历pod列表,将pod列表中的重启次数累加并与BackoffLimit进行比较,超过了则返回true。

    pastActiveDeadline

    func pastActiveDeadline(job *batch.Job) bool {
    	if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
    		return false
    	}
    	now := metav1.Now()
    	start := job.Status.StartTime.Time
    	duration := now.Time.Sub(start)
    	allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
    	return duration >= allowedDuration
    }
    

    这个方法会算出job的运行时间duration,然后和ActiveDeadlineSeconds进行比较,如果超过了则返回true。

    我们回到syncJob中继续往下:

    func (jm *Controller) syncJob(key string) (bool, error) {
    	...
    	//job运行失败
    	if jobFailed {
    		errCh := make(chan error, active)
    		//将job里面的active的pod删除
    		jm.deleteJobPods(&job, activePods, errCh)
    		select {
    		case manageJobErr = <-errCh:
    			if manageJobErr != nil {
    				break
    			}
    		default:
    		}
    
    		// update status values accordingly
    		//清空active数
    		failed += active
    		active = 0
    		job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
    		jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
    	} else {
    		//如果job需要同步,并且job没有被删除,则调用manageJob进行同步工作
    		if jobNeedsSync && job.DeletionTimestamp == nil {
    			active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
    		}
    		//完成数等于pod 运行成功的数量
    		completions := succeeded
    		complete := false
    		//如果没有设置Completions,那么只要有pod完成,那么job就算完成
    		if job.Spec.Completions == nil {
    			if succeeded > 0 && active == 0 {
    				complete = true
    			}
    		} else {
    			//如果实际完成数大于或等于Completions
    			if completions >= *job.Spec.Completions {
    				complete = true
    				//如果还有pod处于active状态,发送EventTypeWarning事件
    				if active > 0 {
    					jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
    				}
    				//如果实际完成数大于Completions,发送EventTypeWarning事件
    				if completions > *job.Spec.Completions {
    					jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
    				}
    			}
    		}
    		//job完成了则更新 job.Status.Conditions 和 job.Status.CompletionTime 字段
    		if complete {
    			job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
    			now := metav1.Now()
    			job.Status.CompletionTime = &now
    			jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
    		}
    	}
    	...
    }
    

    这一段中会根据jobFailed的状态进行判断。

    如果jobFailed为true则表示这个job运行失败,需要删除这个job关联的所有pod,并且清空active数。

    如果jobFailed为false则表示这个job处于非false状态。如果job需要同步,并且job没有被删除,则调用manageJob进行同步工作;

    接下来会对设置的Completions进行处理,如果Completions没有设置,那么只要有一个pod运行完毕,那么这个pod就算完成;

    如果实际完成的pod数量大于completions或仍然有pod处于active中,则发送相应的事件信息。最后更新job的状态为完成。

    我们接下来一口气看看manageJob中这个同步方法里面做了什么,这个方法是job管理pod运行数量的核心方法:

    Controller#manageJob

    func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
    	...
    	//如果处于 active 状态的 pods 数大于 job 设置的并发数 job.Spec.Parallelism
    	if active > parallelism {
    		//多出的个数
    		diff := active - parallelism
    		errCh = make(chan error, diff)
    		jm.expectations.ExpectDeletions(jobKey, int(diff))
    		klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) 
    		//pods 排序,以便可以优先删除一些pod:
    		// 判断 pod 状态:Not ready < ready
    		// 是否已经被调度:unscheduled< scheduled
    		//判断 pod phase :pending < running
    		sort.Sort(controller.ActivePods(activePods))
    
    		active -= diff
    		wait := sync.WaitGroup{}
    		wait.Add(int(diff))
    		for i := int32(0); i < diff; i++ {
    			//并发删除多余的 active pods
    			go func(ix int32) {
    				defer wait.Done()
    				if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
    					// Decrement the expected number of deletes because the informer won't observe this deletion
    					jm.expectations.DeletionObserved(jobKey)
    					if !apierrors.IsNotFound(err) {
    						klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
    						activeLock.Lock()
    						active++
    						activeLock.Unlock()
    						errCh <- err
    						utilruntime.HandleError(err)
    					}
    
    				}
    			}(i)
    		}
    		wait.Wait()
    	//若处于 active 状态的 pods 数小于 job 设置的并发数,则需要创建出新的 pod
    	} else if active < parallelism {
    		wantActive := int32(0)
    		//如果没有声明Completions,那么active的pod应该等于parallelism,如果有pod已经完成了,那么不再创建新的。
    		if job.Spec.Completions == nil { 
    			if succeeded > 0 {
    				wantActive = active
    			} else {
    				wantActive = parallelism
    			}
    		//	如果声明了Completions,那么需要比较Completions和succeeded
    		// 如果wantActive大于parallelism,那么需要创建的Pod数等于parallelism
    		} else {
    			// Job specifies a specific number of completions.  Therefore, number
    			// active should not ever exceed number of remaining completions.
    			wantActive = *job.Spec.Completions - succeeded
    			if wantActive > parallelism {
    				wantActive = parallelism
    			}
    		}
    		//计算出 diff 数
    		diff := wantActive - active
    		if diff < 0 {
    			utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
    			diff = 0
    		}
    		//表示已经有足够的pod,不需要再创建了
    		if diff == 0 {
    			return active, nil
    		}
    		jm.expectations.ExpectCreations(jobKey, int(diff))
    		errCh = make(chan error, diff)
    		klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
    
    		active += diff
    		wait := sync.WaitGroup{}
     
    		//创建的 pod 数依次为 1、2、4、8......,呈指数级增长
    		for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
    			errorCount := len(errCh)
    			wait.Add(int(batchSize))
    			for i := int32(0); i < batchSize; i++ {
    				//并发程创建pod
    				go func() {
    					defer wait.Done()
    					//创建pod
    					err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
    					if err != nil {
    						...
    					}
    					//创建失败的处理
    					if err != nil {
    						defer utilruntime.HandleError(err) 
    						klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
    						jm.expectations.CreationObserved(jobKey)
    						activeLock.Lock()
    						active--
    						activeLock.Unlock()
    						errCh <- err
    					}
    				}()
    			}
    			wait.Wait()
    			...
    			diff -= batchSize
    		}
    	} 
    	... 
    	return active, nil
    }
    

    这个方法的逻辑十分的清晰,我们下面撸一撸~

    这段代码在开始用一个if判断来校验active的pod是否超过了parallelism,如果超过了需要算出超过了多少,存在diff字段中;然后需要删除多余的pod,不过这个时候有个细节的地方,这里会根据pod的状态进行排序,会首先删除一些不是ready状态、unscheduled、pending状态的pod;

    若active的pod小于parallelism,那么首先需要判断Completions,如果没有被设置,并且已经有pod运行成功了,那么不需要创建新的pod,否则还是需要创建pod至parallelism指定个数;如果设置了Completions,那么还需要根据pod完成的数量来做一个判断需要创建多少新的pod;

    如果需要创建的pod数小于active的pod数,那么直接返回即可;

    接下来会在一个for循环中循环并发创建pod,不过创建的数量是依次指数递增,避免一下子创建太多pod。

    定时任务CronJob

    基本使用

    我们从一个例子开始,如下:

    apiVersion: batch/v1beta1
    kind: CronJob
    metadata:
      name: hello
    spec:
      schedule: "*/1 * * * *"
      jobTemplate:
        spec:
          template:
            spec:
              containers:
              - name: hello
                image: busybox
                args:
                - /bin/sh
                - -c
                - date; echo Hello from the Kubernetes cluster
              restartPolicy: OnFailure
    

    这个CronJob会每分钟创建一个Pod:

    $ kubectl get pod
    
    NAME                     READY   STATUS              RESTARTS   AGE
    hello-1596406740-tqnlb   0/1     ContainerCreating   0          8s
    

    cronjob会记录最近的调度时间:

    $ kubectl get cronjob hello
    
    NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
    hello   */1 * * * *   False     1        16s             2m33s
    

    spec.concurrencyPolicy

    如果设置的间隔时间太短,那么可能会导致任务还没执行完成又创建了新的Pod。所以我们可以通过修改spec.concurrencyPolicy来定义处理策略:

    • Allow,这也是默认情况,这意味着这些 Job 可以同时存在;
    • Forbid,这意味着不会创建新的 Pod,该创建周期被跳过;
    • Replace,这意味着新产生的 Job 会替换旧的、没有执行完的 Job。

    如果某一次 Job 创建失败,这次创建就会被标记为“miss”。当在指定的时间窗口内,miss 的数目达到 100 时,那么 CronJob 会停止再创建这个 Job。

    spec.startingDeadlineSeconds可以指定这个时间窗口。startingDeadlineSeconds=200意味着过去 200 s 里,如果 miss 的数目达到了 100 次,那么这个 Job 就不会被创建执行了。

    cronjob源码分析

    CronJob的源码在cronjob_controller.go中,主要实现是在Controller的syncAll方法中。

    下面我们看看CronJob是在源码中如何创建运行的:

    Controller#syncAll

    func (jm *Controller) syncAll() { 
    	//列出所有的job
    	jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
    		return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
    	} 
    	js := make([]batchv1.Job, 0)
    	//遍历jobListFunc然后将状态正常的job放入到js集合中
    	err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
    		jobTmp, ok := object.(*batchv1.Job)
    		if !ok {
    			return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
    		}
    		js = append(js, *jobTmp)
    		return nil
    	})
    	...
    	//列出所有的cronJobs
    	cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
    		return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
    	}
    	//遍历所有的jobs,根据ObjectMeta.OwnerReference字段确定该job是否由cronJob所创建
    	//key为uid,value为job集合
    	jobsByCj := groupJobsByParent(js)
    	klog.V(4).Infof("Found %d groups", len(jobsByCj))
    	//遍历cronJobs
    	err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
    		cj, ok := object.(*batchv1beta1.CronJob)
    		if !ok {
    			return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
    		}
    		//进行同步
    		syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
    		//清理所有已经完成的jobs
    		cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
    		return nil
    	})
    
    	if err != nil {
    		utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
    		return
    	}
    }
    

    syncAll方法会列出所有job以及对应的cronJobs,然后按照cronJobs来进行归类,然后遍历这个列表调用syncOne方法进行同步,之后再调用cleanupFinishedJobs清理所有已经完成的jobs。

    然后我们在看看syncOne是具体怎么处理job的:

    syncOne

    func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
    	nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
    
    	childrenJobs := make(map[types.UID]bool)
    	//遍历job列表
    	for _, j := range js {
    		childrenJobs[j.ObjectMeta.UID] = true
    		//查看这个job是否是在Active列表中
    		found := inActiveList(*cj, j.ObjectMeta.UID)
    		//如果这个job不是在Active列表中,并且这个job还没有跑完,发送一个异常事件。
    		if !found && !IsJobFinished(&j) {
    			recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) 
    		//	如果该job在Active列表中,并且已经跑完了,那么从Active列表移除
    		} else if found && IsJobFinished(&j) {
    			_, status := getFinishedStatus(&j)
    			deleteFromActiveList(cj, j.ObjectMeta.UID)
    			recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
    		}
    	}
     
    	//反向再遍历Active列表,如果存在上面记录的jobs,那么就移除
    	for _, j := range cj.Status.Active {
    		if found := childrenJobs[j.UID]; !found {
    			recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
    			deleteFromActiveList(cj, j.UID)
    		}
    	}
    	//上面做了cronJob的Active列表的修改,所以需要更新一下状态
    	updatedCJ, err := cjc.UpdateStatus(cj)
    	if err != nil {
    		klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
    		return
    	}
    	*cj = *updatedCJ
    	//cronJob已经被删除了,直接返回
    	if cj.DeletionTimestamp != nil { 
    		return
    	}
    	//cronJob处于suspend,直接返回
    	if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
    		klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
    		return
    	}
    	//获取最近的调度时间
    	times, err := getRecentUnmetScheduleTimes(*cj, now)
    	if err != nil {
    		recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
    		klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
    		return
    	} 
    	//等于0说明还没有开始调度
    	if len(times) == 0 {
    		klog.V(4).Infof("No unmet start times for %s", nameForLog)
    		return
    	}
    	if len(times) > 1 {
    		klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
    	}
    	//获取列表中的最后一次时间
    	scheduledTime := times[len(times)-1]
    	tooLate := false
    	//如果设置了StartingDeadlineSeconds,那么计算是否满足条件
    	if cj.Spec.StartingDeadlineSeconds != nil {
    		tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
    	}
    	if tooLate {
    		klog.V(4).Infof("Missed starting window for %s", nameForLog)
    		recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) 
    		return
    	}
    	//处理concurrencyPolicy策略
    	//如果设置的是Forbid,并且Active列表大于0,直接return
    	if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { 
    		klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
    		return
    	}
    	//如果设置的是Replace,则删除所有的Active列表,等后面重新创建
    	if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
    		for _, j := range cj.Status.Active {
    			klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
    
    			job, err := jc.GetJob(j.Namespace, j.Name)
    			if err != nil {
    				recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
    				return
    			}
    			if !deleteJob(cj, job, jc, recorder) {
    				return
    			}
    		}
    	}
    	//根据cronJob.spec.JobTemplate填充job的完整信息
    	jobReq, err := getJobFromTemplate(cj, scheduledTime)
    	if err != nil {
    		klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
    		return
    	}
    	//创建job
    	jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
    	if err != nil { 
    		if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    			recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
    		}
    		return
    	}
    	klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
    	recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
    
      
    	ref, err := getRef(jobResp)
    	if err != nil {
    		klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
    	} else {
    		//把创建好的job信息放入到Active列表中
    		cj.Status.Active = append(cj.Status.Active, *ref)
    	}
    	cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
    	if _, err := cjc.UpdateStatus(cj); err != nil {
    		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
    	}
    
    	return
    }
    

    在syncOne维护了cronJob的Active列表,在遍历cronJob对应的job列表的时候会判断该job是不是应该从Active列表中删除,操作完之后会更新cronJob的状态。

    然后会查看当千的cronJob是否已被删除、是否处于suspend状态、判断是否最近有job被调度,并获取最后一次调度时间判断是否满足StartingDeadlineSeconds条件等。

    接下来会根据ConcurrencyPolicy来判断是Forbid还是Replace。如果是Forbid那么直接略过此次调度,如果是Replace那么会删除所有的Active列表,等后面重新创建。

    最后调用CreateJob创建job。

    总结

    这篇文章我们首先介绍了Job和CronJob的具体使用方法,以及其中需要注意的参数配置,然后通过源码来解释相应的配置会产生什么样的结果。例如job来说,如果我们设置的completions小于parallelism,那么在实际运行的时候实际完成的pod数量是可能超过completions的等等。通过源码我们对job以及cronjob也有了一个更好的理解。

    Reference

    https://kubernetes.io/docs/concepts/workloads/controllers/job/

    https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/

    https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#example-states

    https://kubernetes.feisky.xyz/concepts/objects/cronjob

    https://kubernetes.feisky.xyz/concepts/objects/job

    《深入理解k8s》

    《k8s in Action》

  • 相关阅读:
    netty(4)高级篇-Websocket协议开发
    netty高级篇(3)-HTTP协议开发
    netty中级篇(2)
    netty入门篇(1)
    nio简介
    总账科目如何添加自定义属性?
    如何切换组织初次打开界面时,默认显示财务组织?
    超好用的免费Redis客户端
    Postman如何测试Webservice接口?
    创建Maven project 提示pom.xml 首行错误
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/13549614.html
Copyright © 2011-2022 走看看