一、生成代码如下
package main import ( "flag" "fmt" crdv1beta1 "github.com/cnych/controller-demo/pkg/apis/stable/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "os" "os/signal" "path/filepath" "syscall" "time" "github.com/golang/glog" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/klog" clientset "github.com/cnych/controller-demo/pkg/client/clientset/versioned" "github.com/cnych/controller-demo/pkg/client/informers/externalversions" informers "github.com/cnych/controller-demo/pkg/client/informers/externalversions/stable/v1beta1" ) type Controller struct { informer informers.CronTabInformer workqueue workqueue.RateLimitingInterface } var ( onlyOneSignalHandler = make(chan struct{}) shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} kubeconfig *string ) // SetupSignalHandler 注册 SIGTERM 和 SIGINT 信号 // 返回一个 stop channel,该通道在捕获到第一个信号时被关闭 // 如果捕捉到第二个信号,程序将直接退出 func setupSignalHandler() (stopCh <-chan struct{}) { // 当调用两次的时候 panics close(onlyOneSignalHandler) stop := make(chan struct{}) c := make(chan os.Signal, 2) // Notify 函数让 signal 包将输入信号转发到 c // 如果没有列出要传递的信号,会将所有输入信号传递到 c;否则只传递列出的输入信号 signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // 第二个信号,直接退出 }() return stop } func main() { if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() // 处理信号量 stopCh := setupSignalHandler() // 处理入参 cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { glog.Fatalf("Error building kubeconfig: %s", err.Error()) } crontabClient, err := clientset.NewForConfig(cfg) if err != nil { glog.Fatalf("Error building example clientset: %s", err.Error()) } // informerFactory 工厂类, 这里注入我们通过代码生成的 client // clent 主要用于和 API Server 进行通信,实现 ListAndWatch crontabInformerFactory := externalversions.NewSharedInformerFactory(crontabClient, time.Second*30) // 实例化自定义控制器 controller := NewController(crontabInformerFactory.Stable().V1beta1().CronTabs()) // 启动 informer,开始List & Watch go crontabInformerFactory.Start(stopCh) if err = controller.Run(2, stopCh); err != nil { klog.Fatalf("Error running controller: %s", err.Error()) } } func NewController(informer informers.CronTabInformer) *Controller { //使用client 和前面创建的 Informer,初始化了自定义控制器 controller := &Controller{ informer: informer, // WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据 workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CronTab"), } klog.Info("Setting up crontab event handlers") // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc) // 分别对应 API 对象的“添加”“更新”和“删除”事件。 // 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中 informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueCronTab, UpdateFunc: func(old, new interface{}) { oldObj := old.(*crdv1beta1.CronTab) newObj := new.(*crdv1beta1.CronTab) // 如果资源版本相同则不处理 if oldObj.ResourceVersion == newObj.ResourceVersion { return } controller.enqueueCronTab(new) }, DeleteFunc: controller.enqueueCronTabForDelete, }) return controller } func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() // 记录开始日志 klog.Info("Starting CronTab control loop") klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } klog.Info("Starting workers") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers") return nil } // runWorker 是一个死循环,一直会调用 c.processNextWorkItem 从workqueue读取元素 func (c *Controller) runWorker() { for c.processNextWorkItem() { } } // 从workqueue消费元素 func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false } err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil } if err := c.syncHandler(key); err != nil { return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj) if err != nil { runtime.HandleError(err) return true } return true } // 从 Informer 维护的缓存中拿到了它所对应的 CronTab 对象 func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } crontab, err := c.informer.Lister().CronTabs(namespace).Get(name) //从缓存中拿不到这个对象,那就意味着这个 CronTab 对象的 Key 是通过前面的“删除”事件添加进工作队列的。 if err != nil { if errors.IsNotFound(err) { // 对应的 crontab 对象已经被删除了 klog.Warningf("[CronTabCRD] %s/%s does not exist in local cache, will delete it from CronTab ...", namespace, name) klog.Infof("[CronTabCRD] deleting crontab: %s/%s ...", namespace, name) return nil } runtime.HandleError(fmt.Errorf("failed to get crontab by: %s/%s", namespace, name)) return err } klog.Infof("[CronTabCRD] try to process crontab: %#v ...", crontab) return nil } func (c *Controller) enqueueCronTab(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { runtime.HandleError(err) return } c.workqueue.AddRateLimited(key) } func (c *Controller) enqueueCronTabForDelete(obj interface{}) { var key string var err error key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } c.workqueue.AddRateLimited(key) }