zoukankan      html  css  js  c++  java
  • 使用kubebuilder工具高效构建k8s CRD

    kubebuilder是一个使用CRD构建K8s API的SDK,主要功能是:
        提供代码库封装底层的K8s go-client
    kubebuilder init --domain fluid.io
    kubebuilder create api --group data --version v1alpha1 --kind DataBackup --namespaced true
    kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
    • config
    config中各个文件夹分别存了Kustomize、CustomResourceDefinitions、RBAC configuration、WebhookConfigurations等
    • api/v1alpha1
    groupversion_info.go包含了API schema定义:
    var (
         GroupVersion = schema.GroupVersion{Group: "data.fluid.io",  Version: "v1alpha1"}  //用于注册资源对象的GV
         SchemeBuilder = &scheme.Builder{GroupVersion:  GroupVersion}        //SchemeBuilder用于将go type添加到GVK scheme
         AddToScheme = SchemeBuilder.AddToScheme        //AddToScheme用于将GV中的type添加到scheme
    type CronJobSpec struct {
       Schedule string `json:"schedule"`
       StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
       ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
       Suspend *bool `json:"suspend,omitempty"`
       JobTemplate batchv1beta1.JobTemplateSpec `json:"jobTemplate"`
       SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
       FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
    type ConcurrencyPolicy string
    const (
       AllowConcurrent ConcurrencyPolicy = "Allow"
       ForbidConcurrent ConcurrencyPolicy = "Forbid"
       ReplaceConcurrent ConcurrencyPolicy = "Replace"
    type CronJobStatus struct {
       Active []corev1.ObjectReference `json:"active,omitempty"`
       LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
    type CronJob struct {
       metav1.TypeMeta   `json:",inline"`
       metav1.ObjectMeta `json:"metadata,omitempty"`
       Spec   CronJobSpec   `json:"spec,omitempty"`
       Status CronJobStatus `json:"status,omitempty"`
    type CronJobList struct {
       metav1.TypeMeta `json:",inline"`
       metav1.ListMeta `json:"metadata,omitempty"`
       Items           []CronJob `json:"items"`
    func init() {
       SchemeBuilder.Register(&CronJob{}, &CronJobList{})
    var cronjoblog = logf.Log.WithName("cronjob-resource")
    func (r *CronJob) SetupWebhookWithManager(mgr ctrl.Manager)  error {
        return ctrl.NewWebhookManagedBy(mgr).For(r).Complete()
    var _ webhook.Defaulter = &CronJob{}
    func (r *CronJob) Default() {
        cronjoblog.Info("default", "name", r.Name)
        if r.Spec.ConcurrencyPolicy == "" {
            r.Spec.ConcurrencyPolicy = AllowConcurrent
        if r.Spec.Suspend == nil {
            r.Spec.Suspend = new(bool)
        if r.Spec.SuccessfulJobsHistoryLimit == nil {
            r.Spec.SuccessfulJobsHistoryLimit = new(int32)
            *r.Spec.SuccessfulJobsHistoryLimit = 3
        if r.Spec.FailedJobsHistoryLimit == nil {
            r.Spec.FailedJobsHistoryLimit = new(int32)
            *r.Spec.FailedJobsHistoryLimit = 1
    使用webhook. Validator接口来验证自定义资源对象的值:
    var _ webhook.Validator = &CronJob{}
    // ValidateCreate implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateCreate() error {
         cronjoblog.Info("validate create", "name", r.Name)
         return r.validateCronJob()
    // ValidateUpdate implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateUpdate(old runtime.Object) error {
         cronjoblog.Info("validate update", "name", r.Name)
         return r.validateCronJob()
    // ValidateDelete implements webhook.Validator so a webhook will  be registered for the type
    func (r *CronJob) ValidateDelete() error {
         cronjoblog.Info("validate delete", "name", r.Name)
         // TODO(user): fill in your validation logic upon object  deletion.
         return nil
    func (r *CronJob) validateCronJob() error {
        var allErrs field.ErrorList
        if err := r.validateCronJobName(); err != nil {
            allErrs = append(allErrs, err)
        if err := r.validateCronJobSpec(); err != nil {
            allErrs = append(allErrs, err)
        if len(allErrs) == 0 {
            return nil
        return apierrors.NewInvalid(schema.GroupKind{Group:  "batch.tutorial.kubebuilder.io", Kind: "CronJob"}, r.Name, allErrs)
    func (r *CronJob) validateCronJobName() *field.Error {
        if len(r.ObjectMeta.Name) >  validationutils.DNS1035LabelMaxLength-11 {
            return  field.Invalid(field.NewPath("metadata").Child("name"), r.Name,  "must be no more than 52 characters")
        return nil
    func (r *CronJob) validateCronJobSpec() *field.Error {
        return validateScheduleFormat(
    func validateScheduleFormat(schedule string, fldPath  *field.Path) *field.Error {
        if _, err := cron.ParseStandard(schedule); err != nil {
            return field.Invalid(fldPath, schedule, err.Error())
        return nil
    • controllers
    type CronJobReconciler struct {
       Log    logr.Logger
       Scheme *runtime.Scheme
    注释中说:We can pre-assign some pairs at the top of our reconcile method to have those attached to all log lines in this reconciler.
    func (r *CronJobReconciler) Reconcile(req ctrl.Request)  (ctrl.Result, error) {
        ctx := context.Background()
        log := r.Log.WithValues("cronjob", req.NamespacedName)
    var cronJob batch.CronJob
    if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
       log.Error(err, "unable to fetch CronJob")
       return ctrl.Result{}, client.IgnoreNotFound(err)
    var childJobs kbatch.JobList
    err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name})
    if err != nil {
       log.Error(err, "unable to list child Jobs")
       return ctrl.Result{}, err
    var activeJobs []*kbatch.Job
    var successfulJobs []*kbatch.Job
    var failedJobs []*kbatch.Job
    var mostRecentTime *time.Time // find the last run so we can update the status
    isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
       for _, c := range job.Status.Conditions {
          if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
             return true, c.Type
       return false, ""
    // 从annotation中解析出调度时间
    getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
       timeRaw := job.Annotations[scheduledTimeAnnotation]
       if len(timeRaw) == 0 {
          return nil, nil
       timeParsed, err := time.Parse(time.RFC3339, timeRaw)
       if err != nil {
          return nil, err
       return &timeParsed, nil
    // 遍历
    for i, job := range childJobs.Items {
       _, finishedType := isJobFinished(&job)
       switch finishedType {
       case "": // ongoing
          activeJobs = append(activeJobs, &childJobs.Items[i])
       case kbatch.JobFailed:
          failedJobs = append(failedJobs, &childJobs.Items[i])
       case kbatch.JobComplete:
          successfulJobs = append(successfulJobs, &childJobs.Items[i])
       scheduledTimeForJob, err := getScheduledTimeForJob(&job)
       if err != nil {
          log.Error(err, "unable to parse schedule time for child job", "job", &job)
       if scheduledTimeForJob != nil {
          if mostRecentTime == nil {
             mostRecentTime = scheduledTimeForJob
          } else if mostRecentTime.Before(*scheduledTimeForJob) {
             mostRecentTime = scheduledTimeForJob
    if mostRecentTime != nil {
       cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
    } else {
       cronJob.Status.LastScheduleTime = nil
    cronJob.Status.Active = nil
    for _, activeJob := range activeJobs {
       jobRef, err := ref.GetReference(r.Scheme, activeJob)
       if err != nil {
          log.Error(err, "unable to make reference to active job", "job", activeJob)
       cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
    log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
    if err := r.Status().Update(ctx, &cronJob); err != nil {
       log.Error(err, "unable to update CronJob status")
       return ctrl.Result{}, err
    // 删除不一定要保证成功
    if cronJob.Spec.FailedJobsHistoryLimit != nil {
       sort.Slice(failedJobs, func(i, j int) bool {
          if failedJobs[i].Status.StartTime == nil {
             return failedJobs[j].Status.StartTime != nil
          return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
       for i, job := range failedJobs {
          if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
          if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
             log.Error(err, "unable to delete old failed job", "job", job)
          } else {
             log.V(0).Info("deleted old failed job", "job", job)
    if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
       sort.Slice(successfulJobs, func(i, j int) bool {
          if successfulJobs[i].Status.StartTime == nil {
             return successfulJobs[j].Status.StartTime != nil
          return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
       for i, job := range successfulJobs {
          if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
          if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
             log.Error(err, "unable to delete old successful job", "job", job)
          } else {
             log.V(0).Info("deleted old successful job", "job", job)
    if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
       log.V(1).Info("cronjob suspended, skipping")
       return ctrl.Result{}, nil
    getNextSchedule := func(cronJob *batch.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
       sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
       if err != nil {
          return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
       var earliestTime time.Time
       if cronJob.Status.LastScheduleTime != nil {
          earliestTime = cronJob.Status.LastScheduleTime.Time
       } else {
          earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
       if cronJob.Spec.StartingDeadlineSeconds != nil {
          // controller is not going to schedule anything below this point
          schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
          if schedulingDeadline.After(earliestTime) {
             earliestTime = schedulingDeadline
       if earliestTime.After(now) {
          return time.Time{}, sched.Next(now), nil
       starts := 0
       for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
          lastMissed = t
          if starts > 100 {
             return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
       return lastMissed, sched.Next(now), nil
    missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
    if err != nil {
       log.Error(err, "unable to figure out CronJob schedule")
       return ctrl.Result{}, nil
    scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())}  //保存结果
    log = log.WithValues("now", r.Now(), "next run", nextRun)
    6、Run a new job if it’s on schedule, not past the deadline, and not blocked by our concurrency policy
    If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
    if missedRun.IsZero() {
       log.V(1).Info("no upcoming scheduled times, sleeping until next")
       return scheduledResult, nil
    // make sure we're not too late to start the run
    log = log.WithValues("current run", missedRun)
    tooLate := false
    if cronJob.Spec.StartingDeadlineSeconds != nil {
       tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
    if tooLate {
       log.V(1).Info("missed starting deadline for last run, sleeping till next")
       // TODO(directxman12): events
       return scheduledResult, nil
    // 根据并行策略决定是否运行
    if cronJob.Spec.ConcurrencyPolicy == batch.ForbidConcurrent && len(activeJobs) > 0 {
       log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
       return scheduledResult, nil
    // 根据并行策略决定是否替换
    if cronJob.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
        for _, activeJob := range activeJobs {
            err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground))  //删除不成功无所谓
            if client.IgnoreNotFound(err) != nil {
                log.Error(err, "unable to delete active job", "job", activeJob)
                return ctrl.Result{}, err
    constructJobForCronJob := func(cronJob *batch.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
        name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())  //job名包含nominal start time,避免启动两次
        job := &kbatch.Job{
            ObjectMeta: metav1.ObjectMeta{
                Labels:      make(map[string]string),
                Annotations: make(map[string]string),
                Name:        name,
                Namespace:   cronJob.Namespace,
            Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
        for k, v := range cronJob.Spec.JobTemplate.Annotations {
            job.Annotations[k] = v
        job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
        for k, v := range cronJob.Spec.JobTemplate.Labels {
            job.Labels[k] = v
        if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
            return nil, err
        return job, nil
    // actually make the job
    job, err := constructJobForCronJob(&cronJob, missedRun)
    if err != nil {
       log.Error(err, "unable to construct job from template")
       // don't bother requeuing until we get a change to the spec
       return scheduledResult, nil
    // create the job on the cluster
    if err := r.Create(ctx, job); err != nil {
       log.Error(err, "unable to create Job for CronJob", "job", job)
       return ctrl.Result{}, err
    return scheduledResult, nil


    描述Job资源对象中的indexed value,indexer会自动关注namespace,我们只需从Job中提取owner name
    func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
       if r.Clock == nil {
          r.Clock = realClock{}
       if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj runtime.Object) []string {
          // 获取资源对象,解析owner
          job := rawObj.(*kbatch.Job)
          owner := metav1.GetControllerOf(job)
          if owner == nil {
             return nil
          if owner.APIVersion != apiGVStr || owner.Kind != "CronJob” {   //确定它是cronjob
             return nil
          return []string{owner.Name}
       }); err != nil {
          return err
       return ctrl.NewControllerManagedBy(mgr).For(&batch.CronJob{}).Owns(&kbatch.Job{}).Complete(r)


    • main.go
    scheme = runtime.NewScheme()
    setupLog = ctrl.Log.WithName("setup")




    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
         Scheme:             scheme,
         MetricsBindAddress: metricsAddr,
         Port:               9443,
         LeaderElection:     enableLeaderElection,
         LeaderElectionID:   "80807133.tutorial.kubebuilder.io",
    if err != nil {
         setupLog.Error(err, "unable to start manager")
    if err = (&controllers.CronJobReconciler{
        Client: mgr.GetClient(),
        Log:    ctrl.Log.WithName("controllers").WithName("CronJob"),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        setupLog.Error(err, "unable to create controller", "controller", "CronJob")
    if os.Getenv("ENABLE_WEBHOOKS") != "false" {
        if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil {
            setupLog.Error(err, "unable to create webhook", "webhook", "Captain")
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
    • makefile
    make install
    make uninstall
    • dockerfile
    FROM golang:1.15 as builder
    WORKDIR /workspace
    COPY go.mod go.mod
    COPY go.sum go.sum
    RUN go mod download
    COPY main.go main.go
    COPY api/ api/
    COPY controllers/ controllers/
    RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go  build -a -o manager main.go
    FROM gcr.io/distroless/static:nonroot
    COPY --from=builder /workspace/manager .
    USER nonroot:nonroot
    ENTRYPOINT ["/manager"]
  • 相关阅读:
    php 数组去重
    投票 页的做法 重点——学会进度条!!
    封装 类
    HPH-——>mysql 批量删除
    php->msql 多条件查询
    php-> msql 修改
    PHP ->masql 登录 增 删 改
    php 连接数据库
    Python 第十七章 序列化+os+sys+hashlib+collections
    Python 第十六章 自定义模块+time+datetime+random
  • 原文地址:https://www.cnblogs.com/yangyuliufeng/p/14217887.html
Copyright © 2011-2022 走看看