zoukankan      html  css  js  c++  java
  • code-generator生成代码

    一、生成代码如下

    package main
    
    import (
       "flag"
       "fmt"
       crdv1beta1 "github.com/cnych/controller-demo/pkg/apis/stable/v1beta1"
       "k8s.io/apimachinery/pkg/api/errors"
       "k8s.io/apimachinery/pkg/util/runtime"
       "k8s.io/apimachinery/pkg/util/wait"
       "k8s.io/client-go/tools/cache"
       "k8s.io/client-go/util/workqueue"
       "os"
       "os/signal"
       "path/filepath"
       "syscall"
       "time"
    
       "github.com/golang/glog"
       "k8s.io/client-go/tools/clientcmd"
       "k8s.io/client-go/util/homedir"
       "k8s.io/klog"
    
       clientset "github.com/cnych/controller-demo/pkg/client/clientset/versioned"
       "github.com/cnych/controller-demo/pkg/client/informers/externalversions"
       informers "github.com/cnych/controller-demo/pkg/client/informers/externalversions/stable/v1beta1"
    )
    
    type Controller struct {
       informer  informers.CronTabInformer
       workqueue workqueue.RateLimitingInterface
    }
    
    
    var (
       onlyOneSignalHandler = make(chan struct{})
       shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
       kubeconfig *string
    )
    
    // SetupSignalHandler 注册 SIGTERM 和 SIGINT 信号
    // 返回一个 stop channel,该通道在捕获到第一个信号时被关闭
    // 如果捕捉到第二个信号,程序将直接退出
    func setupSignalHandler() (stopCh <-chan struct{}) {
       // 当调用两次的时候 panics
       close(onlyOneSignalHandler)
    
       stop := make(chan struct{})
       c := make(chan os.Signal, 2)
       // Notify 函数让 signal 包将输入信号转发到 c
       // 如果没有列出要传递的信号,会将所有输入信号传递到 c;否则只传递列出的输入信号
       signal.Notify(c, shutdownSignals...)
       go func() {
          <-c
          close(stop)
          <-c
          os.Exit(1) // 第二个信号,直接退出
       }()
    
       return stop
    }
    
    func main() {
    
       if home := homedir.HomeDir(); home != "" {
          kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
       } else {
          kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
       }
       flag.Parse()
    
       // 处理信号量
       stopCh := setupSignalHandler()
    
       // 处理入参
       cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
       if err != nil {
          glog.Fatalf("Error building kubeconfig: %s", err.Error())
       }
    
       crontabClient, err := clientset.NewForConfig(cfg)
       if err != nil {
          glog.Fatalf("Error building example clientset: %s", err.Error())
       }
    
       // informerFactory 工厂类, 这里注入我们通过代码生成的 client
       // clent 主要用于和 API Server 进行通信,实现 ListAndWatch
       crontabInformerFactory := externalversions.NewSharedInformerFactory(crontabClient, time.Second*30)
    
       // 实例化自定义控制器
       controller := NewController(crontabInformerFactory.Stable().V1beta1().CronTabs())
    
       // 启动 informer,开始List & Watch
       go crontabInformerFactory.Start(stopCh)
    
       if err = controller.Run(2, stopCh); err != nil {
          klog.Fatalf("Error running controller: %s", err.Error())
       }
    }
    
    
    func NewController(informer informers.CronTabInformer) *Controller {
       //使用client 和前面创建的 Informer,初始化了自定义控制器
       controller := &Controller{
          informer: informer,
          // WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
          workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CronTab"),
       }
    
       klog.Info("Setting up crontab event handlers")
    
       // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
       // 分别对应 API 对象的“添加”“更新”和“删除”事件。
       // 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
       informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
          AddFunc: controller.enqueueCronTab,
          UpdateFunc: func(old, new interface{}) {
             oldObj := old.(*crdv1beta1.CronTab)
             newObj := new.(*crdv1beta1.CronTab)
             // 如果资源版本相同则不处理
             if oldObj.ResourceVersion == newObj.ResourceVersion {
                return
             }
             controller.enqueueCronTab(new)
          },
          DeleteFunc: controller.enqueueCronTabForDelete,
       })
       return controller
    }
    
    func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
       defer runtime.HandleCrash()
       defer c.workqueue.ShutDown()
    
       // 记录开始日志
       klog.Info("Starting CronTab control loop")
       klog.Info("Waiting for informer caches to sync")
       if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
          return fmt.Errorf("failed to wait for caches to sync")
       }
    
       klog.Info("Starting workers")
       for i := 0; i < threadiness; i++ {
          go wait.Until(c.runWorker, time.Second, stopCh)
       }
    
       klog.Info("Started workers")
       <-stopCh
       klog.Info("Shutting down workers")
       return nil
    }
    
    // runWorker 是一个死循环,一直会调用 c.processNextWorkItem 从workqueue读取元素
    func (c *Controller) runWorker() {
       for c.processNextWorkItem() {
       }
    }
    
    
    // 从workqueue消费元素
    func (c *Controller) processNextWorkItem() bool {
       obj, shutdown := c.workqueue.Get()
       if shutdown {
          return false
       }
       err := func(obj interface{}) error {
          defer c.workqueue.Done(obj)
          var key string
          var ok bool
          if key, ok = obj.(string); !ok {
             c.workqueue.Forget(obj)
             runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
             return nil
          }
          if err := c.syncHandler(key); err != nil {
             return fmt.Errorf("error syncing '%s': %s", key, err.Error())
          }
          c.workqueue.Forget(obj)
          klog.Infof("Successfully synced '%s'", key)
          return nil
       }(obj)
    
       if err != nil {
          runtime.HandleError(err)
          return true
       }
       return true
    }
    
    // 从 Informer 维护的缓存中拿到了它所对应的 CronTab 对象
    func (c *Controller) syncHandler(key string) error {
       namespace, name, err := cache.SplitMetaNamespaceKey(key)
       if err != nil {
          runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
          return nil
       }
    
       crontab, err := c.informer.Lister().CronTabs(namespace).Get(name)
    
       //从缓存中拿不到这个对象,那就意味着这个 CronTab 对象的 Key 是通过前面的“删除”事件添加进工作队列的。
       if err != nil {
          if errors.IsNotFound(err) {
             // 对应的 crontab 对象已经被删除了
             klog.Warningf("[CronTabCRD] %s/%s does not exist in local cache, will delete it from CronTab ...",
                namespace, name)
             klog.Infof("[CronTabCRD] deleting crontab: %s/%s ...", namespace, name)
             return nil
          }
          runtime.HandleError(fmt.Errorf("failed to get crontab by: %s/%s", namespace, name))
          return err
       }
       klog.Infof("[CronTabCRD] try to process crontab: %#v ...", crontab)
       return nil
    }
    
    func (c *Controller) enqueueCronTab(obj interface{}) {
       var key string
       var err error
       if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
          runtime.HandleError(err)
          return
       }
       c.workqueue.AddRateLimited(key)
    }
    
    func (c *Controller) enqueueCronTabForDelete(obj interface{}) {
       var key string
       var err error
       key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
       if err != nil {
          runtime.HandleError(err)
          return
       }
       c.workqueue.AddRateLimited(key)
    }
    

      

  • 相关阅读:
    27. Remove Element
    26. Remove Duplicates from Sorted Array
    643. Maximum Average Subarray I
    674. Longest Continuous Increasing Subsequence
    1. Two Sum
    217. Contains Duplicate
    448. Find All Numbers Disappeared in an Array
    566. Reshape the Matrix
    628. Maximum Product of Three Numbers
    UVa 1349 Optimal Bus Route Design (最佳完美匹配)
  • 原文地址:https://www.cnblogs.com/wuchangblog/p/14289929.html
Copyright © 2011-2022 走看看