kubebuilder是一个使用CRD构建K8s API的SDK,主要功能是:
提供脚手架工具初始化CRD工程,自动生成boilerplate代码和配置
提供代码库封装底层的K8s go-client
初始化并创建api、webhook:
kubebuilder init --domain fluid.io kubebuilder create api --group data --version v1alpha1 --kind DataBackup --namespaced true kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
-
config
config中各个文件夹分别存了Kustomize、CustomResourceDefinitions、RBAC configuration、WebhookConfigurations等
-
api/v1alpha1
groupversion_info.go包含了API schema定义:
var ( GroupVersion = schema.GroupVersion{Group: "data.fluid.io", Version: "v1alpha1"} //用于注册资源对象的GV SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} //SchemeBuilder用于将go type添加到GVK scheme AddToScheme = SchemeBuilder.AddToScheme //AddToScheme用于将GV中的type添加到scheme )
创建资源对象xxx后,xxx_type.go中会有相关的结构体定义:
type CronJobSpec struct { Schedule string `json:"schedule"` StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"` ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"` Suspend *bool `json:"suspend,omitempty"` JobTemplate batchv1beta1.JobTemplateSpec `json:"jobTemplate"` SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"` FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"` } type ConcurrencyPolicy string const ( AllowConcurrent ConcurrencyPolicy = "Allow" ForbidConcurrent ConcurrencyPolicy = "Forbid" ReplaceConcurrent ConcurrencyPolicy = "Replace" ) type CronJobStatus struct { Active []corev1.ObjectReference `json:"active,omitempty"` LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"` } type CronJob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec CronJobSpec `json:"spec,omitempty"` Status CronJobStatus `json:"status,omitempty"` } type CronJobList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []CronJob `json:"items"` }
将资源对象添加到Group:
func init() { SchemeBuilder.Register(&CronJob{}, &CronJobList{}) }
xxx_webhook.go:
首先为webhook设置一个名为cronjoblog的logger,此处使用controller-runtime/pkg/log包
var cronjoblog = logf.Log.WithName("cronjob-resource")
设置webhook
func (r *CronJob) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr).For(r).Complete() }
使用webhook.Defaulter接口来补全自定义资源对象的默认值:
var _ webhook.Defaulter = &CronJob{}
Default方法实现了webhook.Defaulter接口:
func (r *CronJob) Default() { cronjoblog.Info("default", "name", r.Name) if r.Spec.ConcurrencyPolicy == "" { r.Spec.ConcurrencyPolicy = AllowConcurrent } if r.Spec.Suspend == nil { r.Spec.Suspend = new(bool) } if r.Spec.SuccessfulJobsHistoryLimit == nil { r.Spec.SuccessfulJobsHistoryLimit = new(int32) *r.Spec.SuccessfulJobsHistoryLimit = 3 } if r.Spec.FailedJobsHistoryLimit == nil { r.Spec.FailedJobsHistoryLimit = new(int32) *r.Spec.FailedJobsHistoryLimit = 1 } }
使用webhook. Validator接口来验证自定义资源对象的值:
var _ webhook.Validator = &CronJob{}
实现:
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateCreate() error { cronjoblog.Info("validate create", "name", r.Name) return r.validateCronJob() } // ValidateUpdate implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateUpdate(old runtime.Object) error { cronjoblog.Info("validate update", "name", r.Name) return r.validateCronJob() } // ValidateDelete implements webhook.Validator so a webhook will be registered for the type func (r *CronJob) ValidateDelete() error { cronjoblog.Info("validate delete", "name", r.Name) // TODO(user): fill in your validation logic upon object deletion. return nil }
创建、更新时调用了此逻辑:
//验证name和spec func (r *CronJob) validateCronJob() error { var allErrs field.ErrorList if err := r.validateCronJobName(); err != nil { allErrs = append(allErrs, err) } if err := r.validateCronJobSpec(); err != nil { allErrs = append(allErrs, err) } if len(allErrs) == 0 { return nil } return apierrors.NewInvalid(schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"}, r.Name, allErrs) } func (r *CronJob) validateCronJobName() *field.Error { if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 { return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters") } return nil } func (r *CronJob) validateCronJobSpec() *field.Error { return validateScheduleFormat( r.Spec.Schedule, field.NewPath("spec").Child("schedule")) } func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error { if _, err := cron.ParseStandard(schedule); err != nil { return field.Invalid(fldPath, schedule, err.Error()) } return nil }
-
controllers
xxx_controller.go中是控制器相关逻辑
首先需要定义XxxReconciler结构体,用于reconcile一个Xxx对象
type CronJobReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Clock }
注释中说:We can pre-assign some pairs at the top of our reconcile method to have those attached to all log lines in this reconciler.
我理解的是controller-runtime的运行逻辑通过库logr被组织成了固定逻辑顺序,主要工作是向消息中添加kv对
Reconcile方法中是处理逻辑:
func (r *CronJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("cronjob", req.NamespacedName) ...... }
省略处为待用户实现的具体逻辑,以Cronjob为例,其逻辑分为以下步骤:
1、使用客户端获取资源对象
var cronJob batch.CronJob if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil { log.Error(err, "unable to fetch CronJob") return ctrl.Result{}, client.IgnoreNotFound(err) }
客户端的Get方法,第一个参数是ctx,第三个参数是用于存放的地址
2、列出所有属于此Cronjob的active的job,更新它们的状态
//使用客户端的List方法获取所有属于此Cronjob的job var childJobs kbatch.JobList err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}) if err != nil { log.Error(err, "unable to list child Jobs") return ctrl.Result{}, err } var activeJobs []*kbatch.Job var successfulJobs []*kbatch.Job var failedJobs []*kbatch.Job var mostRecentTime *time.Time // find the last run so we can update the status //如果Job是complete或者failer,就认为已经finish isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) { for _, c := range job.Status.Conditions { if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue { return true, c.Type } } return false, "" } // 从annotation中解析出调度时间 getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) { timeRaw := job.Annotations[scheduledTimeAnnotation] if len(timeRaw) == 0 { return nil, nil } timeParsed, err := time.Parse(time.RFC3339, timeRaw) if err != nil { return nil, err } return &timeParsed, nil } // 遍历 for i, job := range childJobs.Items { _, finishedType := isJobFinished(&job) switch finishedType { case "": // ongoing activeJobs = append(activeJobs, &childJobs.Items[i]) case kbatch.JobFailed: failedJobs = append(failedJobs, &childJobs.Items[i]) case kbatch.JobComplete: successfulJobs = append(successfulJobs, &childJobs.Items[i]) } scheduledTimeForJob, err := getScheduledTimeForJob(&job) if err != nil { log.Error(err, "unable to parse schedule time for child job", "job", &job) continue } if scheduledTimeForJob != nil { if mostRecentTime == nil { mostRecentTime = scheduledTimeForJob } else if mostRecentTime.Before(*scheduledTimeForJob) { mostRecentTime = scheduledTimeForJob } } } if mostRecentTime != nil { cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime} } else { cronJob.Status.LastScheduleTime = nil } cronJob.Status.Active = nil for _, activeJob := range activeJobs { jobRef, err := ref.GetReference(r.Scheme, activeJob) if err != nil { log.Error(err, "unable to make reference to active job", "job", activeJob) continue } cronJob.Status.Active = append(cronJob.Status.Active, *jobRef) } log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs)) //使用客户端的Update方法更新此Cronjob的status if err := r.Status().Update(ctx, &cronJob); err != nil { log.Error(err, "unable to update CronJob status") return ctrl.Result{}, err }
3、清理旧的job
// 删除不一定要保证成功 if cronJob.Spec.FailedJobsHistoryLimit != nil { sort.Slice(failedJobs, func(i, j int) bool { if failedJobs[i].Status.StartTime == nil { return failedJobs[j].Status.StartTime != nil } return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime) }) for i, job := range failedJobs { if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit { break } if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil { log.Error(err, "unable to delete old failed job", "job", job) } else { log.V(0).Info("deleted old failed job", "job", job) } } } if cronJob.Spec.SuccessfulJobsHistoryLimit != nil { sort.Slice(successfulJobs, func(i, j int) bool { if successfulJobs[i].Status.StartTime == nil { return successfulJobs[j].Status.StartTime != nil } return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime) }) for i, job := range successfulJobs { if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit { break } if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil { log.Error(err, "unable to delete old successful job", "job", job) } else { log.V(0).Info("deleted old successful job", "job", job) } } }
4、检查CronJob是否处于suspend状态,如果是则不进行任何job的创建,直接返回
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend { log.V(1).Info("cronjob suspended, skipping") return ctrl.Result{}, nil }
5、进行下一个调度工作
//计算储下一次需要创建Job的时间 getNextSchedule := func(cronJob *batch.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) { sched, err := cron.ParseStandard(cronJob.Spec.Schedule) if err != nil { return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err) } var earliestTime time.Time if cronJob.Status.LastScheduleTime != nil { earliestTime = cronJob.Status.LastScheduleTime.Time } else { earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time } if cronJob.Spec.StartingDeadlineSeconds != nil { // controller is not going to schedule anything below this point schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)) if schedulingDeadline.After(earliestTime) { earliestTime = schedulingDeadline } } if earliestTime.After(now) { return time.Time{}, sched.Next(now), nil } starts := 0 for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { lastMissed = t starts++ if starts > 100 { return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.") } } return lastMissed, sched.Next(now), nil } missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now()) if err != nil { log.Error(err, "unable to figure out CronJob schedule") return ctrl.Result{}, nil } scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} //保存结果 log = log.WithValues("now", r.Now(), "next run", nextRun)
6、Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy
If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
if missedRun.IsZero() { log.V(1).Info("no upcoming scheduled times, sleeping until next") return scheduledResult, nil } // make sure we're not too late to start the run log = log.WithValues("current run", missedRun) tooLate := false if cronJob.Spec.StartingDeadlineSeconds != nil { tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now()) } if tooLate { log.V(1).Info("missed starting deadline for last run, sleeping till next") // TODO(directxman12): events return scheduledResult, nil } // 根据并行策略决定是否运行 if cronJob.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(activeJobs) > 0 { log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs)) return scheduledResult, nil } // 根据并行策略决定是否替换 if cronJob.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { for _, activeJob := range activeJobs { err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)) //删除不成功无所谓 if client.IgnoreNotFound(err) != nil { log.Error(err, "unable to delete active job", "job", activeJob) return ctrl.Result{}, err } } } constructJobForCronJob := func(cronJob *batch.CronJob, scheduledTime time.Time) (*kbatch.Job, error) { name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix()) //job名包含nominal start time,避免启动两次 job := &kbatch.Job{ ObjectMeta: metav1.ObjectMeta{ Labels: make(map[string]string), Annotations: make(map[string]string), Name: name, Namespace: cronJob.Namespace, }, Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(), } for k, v := range cronJob.Spec.JobTemplate.Annotations { job.Annotations[k] = v } job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339) for k, v := range cronJob.Spec.JobTemplate.Labels { job.Labels[k] = v } if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil { return nil, err } return job, nil } // actually make the job job, err := constructJobForCronJob(&cronJob, missedRun) if err != nil { log.Error(err, "unable to construct job from template") // don't bother requeuing until we get a change to the spec return scheduledResult, nil } // create the job on the cluster if err := r.Create(ctx, job); err != nil { log.Error(err, "unable to create Job for CronJob", "job", job) return ctrl.Result{}, err }
7、返回结果,进行Requeue
return scheduledResult, nil
这里我理解的是:为了让reconciler快速找到job,需要index。
描述Job资源对象中的indexed value,indexer会自动关注namespace,我们只需从Job中提取owner name
当Job改变、删除的时候,也要能自动通知CronJob的控制器
新建manager:
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error { if r.Clock == nil { r.Clock = realClock{} } if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj runtime.Object) []string { // 获取资源对象,解析owner job := rawObj.(*kbatch.Job) owner := metav1.GetControllerOf(job) if owner == nil { return nil } if owner.APIVersion != apiGVStr || owner.Kind != "CronJob” { //确定它是cronjob return nil } return []string{owner.Name} }); err != nil { return err } return ctrl.NewControllerManagedBy(mgr).For(&batch.CronJob{}).Owns(&kbatch.Job{}).Complete(r) }
具体向manager添加controller的逻辑参考controller-runtime框架
-
main.go
首先为Controller新建Scheme:
scheme = runtime.NewScheme()
设置一个名为cronjoblog的logger
setupLog = ctrl.Log.WithName("setup")
在init函数中执行这两句话:
utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(batchv1.AddToScheme(scheme))
main函数:
首先解析metricsAddr、enableLeaderElection等参数
调用SetLogger函数:
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
设置mgr:
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, LeaderElection: enableLeaderElection, LeaderElectionID: "80807133.tutorial.kubebuilder.io", }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) }
向manager添加controller,该controller包含一个XxxReconciler结构体:
if err = (&controllers.CronJobReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("CronJob"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CronJob") os.Exit(1) }
实际调用了刚刚编写的SetupWithManager方法
根据环境变量判断是否启用webhook
webhook可以和controller分离
if os.Getenv("ENABLE_WEBHOOKS") != "false" { if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "Captain") os.Exit(1) } }
启动controller:
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }
-
makefile
makefile中集成了安装、卸载CRD的功能
make install make uninstall
-
dockerfile
默认的dockerfile:
FROM golang:1.15 as builder WORKDIR /workspace COPY go.mod go.mod COPY go.sum go.sum RUN go mod download COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -o manager main.go FROM gcr.io/distroless/static:nonroot WORKDIR / COPY --from=builder /workspace/manager . USER nonroot:nonroot ENTRYPOINT ["/manager"]