zoukankan      html  css  js  c++  java
  • 使用kubebuilder工具高效构建k8s CRD

    kubebuilder是一个使用CRD构建K8s API的SDK,主要功能是:
        提供脚手架工具初始化CRD工程,自动生成boilerplate代码和配置
        提供代码库封装底层的K8s go-client
     
    初始化并创建api、webhook:
    kubebuilder init --domain fluid.io
    kubebuilder create api --group data --version v1alpha1 --kind DataBackup --namespaced true
    kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
    • config
    config中各个文件夹分别存了Kustomize、CustomResourceDefinitions、RBAC configuration、WebhookConfigurations等
    • api/v1alpha1
    groupversion_info.go包含了API schema定义:
    var (
         GroupVersion = schema.GroupVersion{Group: "data.fluid.io",  Version: "v1alpha1"}  //用于注册资源对象的GV
         SchemeBuilder = &scheme.Builder{GroupVersion:  GroupVersion}        //SchemeBuilder用于将go type添加到GVK scheme
         AddToScheme = SchemeBuilder.AddToScheme        //AddToScheme用于将GV中的type添加到scheme
    )
    创建资源对象xxx后,xxx_type.go中会有相关的结构体定义:
    type CronJobSpec struct {
       Schedule string `json:"schedule"`
       StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
       ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
       Suspend *bool `json:"suspend,omitempty"`
       JobTemplate batchv1beta1.JobTemplateSpec `json:"jobTemplate"`
       SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
       FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
    }
     
    type ConcurrencyPolicy string
     
    const (
       AllowConcurrent ConcurrencyPolicy = "Allow"
       ForbidConcurrent ConcurrencyPolicy = "Forbid"
       ReplaceConcurrent ConcurrencyPolicy = "Replace"
    )
     
    type CronJobStatus struct {
       Active []corev1.ObjectReference `json:"active,omitempty"`
       LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
    }
     
    type CronJob struct {
       metav1.TypeMeta   `json:",inline"`
       metav1.ObjectMeta `json:"metadata,omitempty"`
       Spec   CronJobSpec   `json:"spec,omitempty"`
       Status CronJobStatus `json:"status,omitempty"`
    }
     
    type CronJobList struct {
       metav1.TypeMeta `json:",inline"`
       metav1.ListMeta `json:"metadata,omitempty"`
       Items           []CronJob `json:"items"`
    }
    将资源对象添加到Group:
    func init() {
       SchemeBuilder.Register(&CronJob{}, &CronJobList{})
    }
     
    xxx_webhook.go:
    首先为webhook设置一个名为cronjoblog的logger,此处使用controller-runtime/pkg/log包
    var cronjoblog = logf.Log.WithName("cronjob-resource")
    设置webhook
    func (r *CronJob) SetupWebhookWithManager(mgr ctrl.Manager)  error {
        return ctrl.NewWebhookManagedBy(mgr).For(r).Complete()
    }
    使用webhook.Defaulter接口来补全自定义资源对象的默认值:
    var _ webhook.Defaulter = &CronJob{}
    Default方法实现了webhook.Defaulter接口:
    func (r *CronJob) Default() {
        cronjoblog.Info("default", "name", r.Name)
        if r.Spec.ConcurrencyPolicy == "" {
            r.Spec.ConcurrencyPolicy = AllowConcurrent
        }
        if r.Spec.Suspend == nil {
            r.Spec.Suspend = new(bool)
        }
        if r.Spec.SuccessfulJobsHistoryLimit == nil {
            r.Spec.SuccessfulJobsHistoryLimit = new(int32)
            *r.Spec.SuccessfulJobsHistoryLimit = 3
        }
        if r.Spec.FailedJobsHistoryLimit == nil {
            r.Spec.FailedJobsHistoryLimit = new(int32)
            *r.Spec.FailedJobsHistoryLimit = 1
        }
    }
    使用webhook. Validator接口来验证自定义资源对象的值:
    var _ webhook.Validator = &CronJob{}
    实现:
    // ValidateCreate implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateCreate() error {
         cronjoblog.Info("validate create", "name", r.Name)
         return r.validateCronJob()
    }
    // ValidateUpdate implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateUpdate(old runtime.Object) error {
         cronjoblog.Info("validate update", "name", r.Name)
         return r.validateCronJob()
    }
    // ValidateDelete implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateDelete() error {
         cronjoblog.Info("validate delete", "name", r.Name)
         // TODO(user): fill in your validation logic upon object  deletion.
         return nil
    }
    创建、更新时调用了此逻辑:
    //验证name和spec
    func (r *CronJob) validateCronJob() error {
        var allErrs field.ErrorList
        if err := r.validateCronJobName(); err != nil {
            allErrs = append(allErrs, err)
        }
        if err := r.validateCronJobSpec(); err != nil {
            allErrs = append(allErrs, err)
        }
        if len(allErrs) == 0 {
            return nil
        }
        return apierrors.NewInvalid(schema.GroupKind{Group:  "batch.tutorial.kubebuilder.io", Kind: "CronJob"}, r.Name, allErrs)
    }
     
    func (r *CronJob) validateCronJobName() *field.Error {
        if len(r.ObjectMeta.Name) >  validationutils.DNS1035LabelMaxLength-11 {
            return  field.Invalid(field.NewPath("metadata").Child("name"), r.Name,  "must be no more than 52 characters")
        }
        return nil
    }
     
    func (r *CronJob) validateCronJobSpec() *field.Error {
        return validateScheduleFormat(
            r.Spec.Schedule,
            field.NewPath("spec").Child("schedule"))
    }
     
    func validateScheduleFormat(schedule string, fldPath  *field.Path) *field.Error {
        if _, err := cron.ParseStandard(schedule); err != nil {
            return field.Invalid(fldPath, schedule, err.Error())
        }
        return nil
    }
    • controllers
    xxx_controller.go中是控制器相关逻辑
    首先需要定义XxxReconciler结构体,用于reconcile一个Xxx对象
    type CronJobReconciler struct {
       client.Client
       Log    logr.Logger
       Scheme *runtime.Scheme
       Clock
    }
    注释中说:We can pre-assign some pairs at the top of our reconcile method to have those attached to all log lines in this reconciler.
    我理解的是controller-runtime的运行逻辑通过库logr被组织成了固定逻辑顺序,主要工作是向消息中添加kv对
     
    Reconcile方法中是处理逻辑:
    func (r *CronJobReconciler) Reconcile(req ctrl.Request)  (ctrl.Result, error) {
        ctx := context.Background()
        log := r.Log.WithValues("cronjob", req.NamespacedName)
        ......
    }
    省略处为待用户实现的具体逻辑,以Cronjob为例,其逻辑分为以下步骤:
    1、使用客户端获取资源对象
    var cronJob batch.CronJob
    if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
       log.Error(err, "unable to fetch CronJob")
       return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    客户端的Get方法,第一个参数是ctx,第三个参数是用于存放的地址
    2、列出所有属于此Cronjob的active的job,更新它们的状态
    //使用客户端的List方法获取所有属于此Cronjob的job
    var childJobs kbatch.JobList
    err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name})
    if err != nil {
       log.Error(err, "unable to list child Jobs")
       return ctrl.Result{}, err
    }
     
    var activeJobs []*kbatch.Job
    var successfulJobs []*kbatch.Job
    var failedJobs []*kbatch.Job
    var mostRecentTime *time.Time // find the last run so we can update the status
     
    //如果Job是complete或者failer,就认为已经finish
    isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
       for _, c := range job.Status.Conditions {
          if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
             return true, c.Type
          }
       }
       return false, ""
    }
     
    // 从annotation中解析出调度时间
    getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
       timeRaw := job.Annotations[scheduledTimeAnnotation]
       if len(timeRaw) == 0 {
          return nil, nil
       }
       timeParsed, err := time.Parse(time.RFC3339, timeRaw)
       if err != nil {
          return nil, err
       }
       return &timeParsed, nil
    }
     
    // 遍历
    for i, job := range childJobs.Items {
       _, finishedType := isJobFinished(&job)
       switch finishedType {
       case "": // ongoing
          activeJobs = append(activeJobs, &childJobs.Items[i])
       case kbatch.JobFailed:
          failedJobs = append(failedJobs, &childJobs.Items[i])
       case kbatch.JobComplete:
          successfulJobs = append(successfulJobs, &childJobs.Items[i])
       }
       scheduledTimeForJob, err := getScheduledTimeForJob(&job)
       if err != nil {
          log.Error(err, "unable to parse schedule time for child job", "job", &job)
          continue
       }
       if scheduledTimeForJob != nil {
          if mostRecentTime == nil {
             mostRecentTime = scheduledTimeForJob
          } else if mostRecentTime.Before(*scheduledTimeForJob) {
             mostRecentTime = scheduledTimeForJob
          }
       }
    }
     
    if mostRecentTime != nil {
       cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
    } else {
       cronJob.Status.LastScheduleTime = nil
    }
     
    cronJob.Status.Active = nil
     
    for _, activeJob := range activeJobs {
       jobRef, err := ref.GetReference(r.Scheme, activeJob)
       if err != nil {
          log.Error(err, "unable to make reference to active job", "job", activeJob)
          continue
       }
       cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
    }
     
    log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
     
    //使用客户端的Update方法更新此Cronjob的status
    if err := r.Status().Update(ctx, &cronJob); err != nil {
       log.Error(err, "unable to update CronJob status")
       return ctrl.Result{}, err
    }
    3、清理旧的job
    // 删除不一定要保证成功
    if cronJob.Spec.FailedJobsHistoryLimit != nil {
       sort.Slice(failedJobs, func(i, j int) bool {
          if failedJobs[i].Status.StartTime == nil {
             return failedJobs[j].Status.StartTime != nil
          }
          return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
       })
       for i, job := range failedJobs {
          if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
             break
          }
          if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
             log.Error(err, "unable to delete old failed job", "job", job)
          } else {
             log.V(0).Info("deleted old failed job", "job", job)
          }
       }
    }
     
    if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
       sort.Slice(successfulJobs, func(i, j int) bool {
          if successfulJobs[i].Status.StartTime == nil {
             return successfulJobs[j].Status.StartTime != nil
          }
          return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
       })
       for i, job := range successfulJobs {
          if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
             break
          }
          if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
             log.Error(err, "unable to delete old successful job", "job", job)
          } else {
             log.V(0).Info("deleted old successful job", "job", job)
          }
       }
    }
    4、检查CronJob是否处于suspend状态,如果是则不进行任何job的创建,直接返回
    if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
       log.V(1).Info("cronjob suspended, skipping")
       return ctrl.Result{}, nil
    }
    5、进行下一个调度工作
    //计算储下一次需要创建Job的时间
    getNextSchedule := func(cronJob *batch.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
       sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
       if err != nil {
          return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
       }
       var earliestTime time.Time
       if cronJob.Status.LastScheduleTime != nil {
          earliestTime = cronJob.Status.LastScheduleTime.Time
       } else {
          earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
       }
       if cronJob.Spec.StartingDeadlineSeconds != nil {
          // controller is not going to schedule anything below this point
          schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
          if schedulingDeadline.After(earliestTime) {
             earliestTime = schedulingDeadline
          }
       }
       if earliestTime.After(now) {
          return time.Time{}, sched.Next(now), nil
       }
     
       starts := 0
       for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
          lastMissed = t
          starts++
          if starts > 100 {
             return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
          }
       }
       return lastMissed, sched.Next(now), nil
    }
     
    missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
    if err != nil {
       log.Error(err, "unable to figure out CronJob schedule")
       return ctrl.Result{}, nil
    }
     
    scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())}  //保存结果
    log = log.WithValues("now", r.Now(), "next run", nextRun)
    6、Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy
    If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
    if missedRun.IsZero() {
       log.V(1).Info("no upcoming scheduled times, sleeping until next")
       return scheduledResult, nil
    }
     
    // make sure we're not too late to start the run
    log = log.WithValues("current run", missedRun)
    tooLate := false
    if cronJob.Spec.StartingDeadlineSeconds != nil {
       tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
    }
    if tooLate {
       log.V(1).Info("missed starting deadline for last run, sleeping till next")
       // TODO(directxman12): events
       return scheduledResult, nil
    }
     
    // 根据并行策略决定是否运行
    if cronJob.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(activeJobs) > 0 {
       log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
       return scheduledResult, nil
    }
     
    // 根据并行策略决定是否替换
    if cronJob.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
        for _, activeJob := range activeJobs {
            err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground))  //删除不成功无所谓
            if client.IgnoreNotFound(err) != nil {
                log.Error(err, "unable to delete active job", "job", activeJob)
                return ctrl.Result{}, err
            }
       }
    }
     
    constructJobForCronJob := func(cronJob *batch.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
        name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())  //job名包含nominal start time,避免启动两次
        job := &kbatch.Job{
            ObjectMeta: metav1.ObjectMeta{
                Labels:      make(map[string]string),
                Annotations: make(map[string]string),
                Name:        name,
                Namespace:   cronJob.Namespace,
            },
            Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
       }
        for k, v := range cronJob.Spec.JobTemplate.Annotations {
            job.Annotations[k] = v
        }
        job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
        for k, v := range cronJob.Spec.JobTemplate.Labels {
            job.Labels[k] = v
        }
        if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
            return nil, err
        }
        return job, nil
    }
     
    // actually make the job
    job, err := constructJobForCronJob(&cronJob, missedRun)
    if err != nil {
       log.Error(err, "unable to construct job from template")
       // don't bother requeuing until we get a change to the spec
       return scheduledResult, nil
    }
     
    // create the job on the cluster
    if err := r.Create(ctx, job); err != nil {
       log.Error(err, "unable to create Job for CronJob", "job", job)
       return ctrl.Result{}, err
    }
    7、返回结果,进行Requeue
    return scheduledResult, nil
    

    这里我理解的是:为了让reconciler快速找到job,需要index。

    描述Job资源对象中的indexed value,indexer会自动关注namespace,我们只需从Job中提取owner name
    当Job改变、删除的时候,也要能自动通知CronJob的控制器
     
    新建manager:
    func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
       if r.Clock == nil {
          r.Clock = realClock{}
       }
       if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj runtime.Object) []string {
          // 获取资源对象,解析owner
          job := rawObj.(*kbatch.Job)
          owner := metav1.GetControllerOf(job)
          if owner == nil {
             return nil
          }
          if owner.APIVersion != apiGVStr || owner.Kind != "CronJob” {   //确定它是cronjob
             return nil
          }
          return []string{owner.Name}
       }); err != nil {
          return err
       }
       return ctrl.NewControllerManagedBy(mgr).For(&batch.CronJob{}).Owns(&kbatch.Job{}).Complete(r)
    }
    

    具体向manager添加controller的逻辑参考controller-runtime框架

    • main.go
    首先为Controller新建Scheme:
    scheme = runtime.NewScheme()
    设置一个名为cronjoblog的logger
    setupLog = ctrl.Log.WithName("setup")
    在init函数中执行这两句话:
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))
    utilruntime.Must(batchv1.AddToScheme(scheme))
    

      

    main函数:
    首先解析metricsAddr、enableLeaderElection等参数
    调用SetLogger函数:
    ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
    

      

    设置mgr:
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
         Scheme:             scheme,
         MetricsBindAddress: metricsAddr,
         Port:               9443,
         LeaderElection:     enableLeaderElection,
         LeaderElectionID:   "80807133.tutorial.kubebuilder.io",
    })
    if err != nil {
         setupLog.Error(err, "unable to start manager")
         os.Exit(1)
    }
    向manager添加controller,该controller包含一个XxxReconciler结构体:
    if err = (&controllers.CronJobReconciler{
        Client: mgr.GetClient(),
        Log:    ctrl.Log.WithName("controllers").WithName("CronJob"),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "CronJob")
        os.Exit(1)
    }
    实际调用了刚刚编写的SetupWithManager方法
     
    根据环境变量判断是否启用webhook
    webhook可以和controller分离
    if os.Getenv("ENABLE_WEBHOOKS") != "false" {
        if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil {
            setupLog.Error(err, "unable to create webhook", "webhook", "Captain")
            os.Exit(1)
        }
    }
    启动controller:
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
        }
    }
    • makefile
    makefile中集成了安装、卸载CRD的功能
    make install
    make uninstall
    • dockerfile
    默认的dockerfile:
    FROM golang:1.15 as builder
    WORKDIR /workspace
    COPY go.mod go.mod
    COPY go.sum go.sum
    RUN go mod download
    COPY main.go main.go
    COPY api/ api/
    COPY controllers/ controllers/
    RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go  build -a -o manager main.go
     
    FROM gcr.io/distroless/static:nonroot
    WORKDIR /
    COPY --from=builder /workspace/manager .
    USER nonroot:nonroot
    ENTRYPOINT ["/manager"]
     
  • 相关阅读:
    Django实现自定义template页面并在admin site的app模块中加入自定义跳转链接
    django中将model转换为dict的方法
    django后台显示图片 而不是图片地址
    Django admin 继承user表后密码为明文,继承UserAdmin,重写其方法
    Android API之Telephony.Sms
    com.android.providers.telephony.MmsSmsDatabaseHelper
    在发送信息时应用PendingIntent.FLAG_UPDATE_CURRENT
    Android开发之旅(吴秦)
    Android API之android.content.BroadcastReceiver
    Receiver not registered.
  • 原文地址:https://www.cnblogs.com/yangyuliufeng/p/14217887.html
Copyright © 2011-2022 走看看