controller-runtime框架是社区封装的一个控制器处理的框架
pkg/controllers/controller.go中,定义了Controller接口:
type Controller interface { reconcile.Reconciler Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error Start(ctx context.Context) error GetLogger() logr.Logger }
以及创建controller时的参数:
type Options struct { MaxConcurrentReconciles int Reconciler reconcile.Reconciler RateLimiter ratelimiter.RateLimiter //限速队列的速率限制 Log logr.Logger //controller使用的logger }
pkg/internal/controller/controller.go中定义的Controller结构体,实现了Controller接口:
type Controller struct { Name string //用于跟踪、记录和监控中控制器的唯一标识 MaxConcurrentReconciles int //可以运行的最大并发Reconciles数量,默认值为1 Do reconcile.Reconciler MakeQueue func() workqueue.RateLimitingInterface //一旦控制器准备好启动,MakeQueue就会为这个控制器构造工作队列。队列通过监听来自Infomer的事件,添加对象到队列中进行处理 Queue workqueue.RateLimitingInterface SetFields func(i interface{}) error //SetFields将依赖关系注入到其他对象,比如Sources、EventHandlers以及Predicates mu sync.Mutex // 控制器同步信号量 JitterPeriod time.Duration // 允许测试减少JitterPeriod,使其更快完成 Started bool //控制器是否已经启动 ctx context.Context startWatches []watchDescription Log logr.Logger }
①Do是pkg/reconcile/reconcile.go中定义的Reconcile接口:
type Reconciler interface { Reconcile(context.Context, Request) (Result, error) }
②Informer、Indexer的数据通过startWatches属性做了一层封装,以方便在控制器启动的时候启动,该属性是一个watchDescription切片
一个watchDescription包含所有启动watch操作所需的信息(一个sources、一个handler以及predicates切片):
type watchDescription struct { src source.Source handler handler.EventHandler predicates []predicate.Predicate }
③Queue是client-go中提供的限速队列,放入到队列中的元素不是以前默认的元素唯一的KEY,而是经过封装的reconcile.Request对象:
type Request struct { types.NamespacedName } type NamespacedName struct { Namespace string Name string }
pkg/client/client.go中的client结构体,提供了Create、List、Delete等方法,通过它们可以很方便地得到资源对象
Controller最核心的方法Watch:
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { c.mu.Lock() defer c.mu.Unlock() // 注入Cache到参数中 if err := c.SetFields(src); err != nil { return err } if err := c.SetFields(evthdler); err != nil { return err } for _, pr := range prct { if err := c.SetFields(pr); err != nil { return err } } if !c.Started { // Controller还没启动 c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) // watches会被保存到控制器结构体中,直到调用Start(...) 函数 return nil //把watches存放到本地然后返回 } c.Log.Info("Starting EventSource", "source", src) return src.Start(c.ctx, evthdler, c.Queue, prct...) }
第一个参数是pkg/source/source.go中定义的Source接口,它是事件的源:
type Source interface { Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error }
Source的具体实现在pkg/source/source.go中的Kind结构体:
type Kind struct { Type client.Object //要watch的资源对象类型,例如&v1.Pod{} cache cache.Cache //watch时使用的cache }
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { if ks.Type == nil { //必须指明Kind.Type return fmt.Errorf("must specify Kind.Type") } if ks.cache == nil { //start前cache必须准备完成 return fmt.Errorf("must call CacheInto on Kind before calling Start") } i, err := ks.cache.GetInformer(ctx, ks.Type) //从cache中获取informer if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { log.Error(err, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind) } return err } i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) //添加EventHandler return nil }
AddEventHandler方法中的参数是一个internal.EventHandler结构体,它的成员结构体EventHandler实现了client-go中提供的 ResourceEventHandler接口,也就是实现了OnAdd、OnUpdate、OnDelete等回调函数
pkg/builder/controller.go中的doWatch()方法实际调用了Watch,调用时传入的就是Kind类型的对象
Watch最终就是调用了传入的Kind类型的对象的Start方法
func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil }
调用Watch时传入的handler.EventHandler是一个handler.EnqueueRequestForObject结构体,它实现了Create、Update、Delete、Generic四个方法。
type EnqueueRequestForObject struct{} func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } // Update implements EventHandler func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { if evt.ObjectOld != nil { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectOld.GetName(), Namespace: evt.ObjectOld.GetNamespace(), }}) } else { enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt) } if evt.ObjectNew != nil { q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.ObjectNew.GetName(), Namespace: evt.ObjectNew.GetNamespace(), }}) } else { enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt) } } // Delete implements EventHandler func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) } // Generic implements EventHandler func (e *EnqueueRequestForObject) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) }
四个方法均会将资源对象的Name和Namespace组成reconcile.Request放入队列中
Controller的核心方法Start:
func (c *Controller) Start(ctx context.Context) error { c.mu.Lock() if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } // Set the internal context. c.ctx = ctx c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed err := func() error { defer c.mu.Unlock() defer utilruntime.HandleCrash() for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.Log.Info("Starting Controller") for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { continue } if err := syncingSource.WaitForSync(ctx); err != nil { // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err } } c.startWatches = nil if c.JitterPeriod == 0 { c.JitterPeriod = 1 * time.Second } // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles)) for i := 0; i < c.MaxConcurrentReconciles; i++ { go wait.UntilWithContext(ctx, func(ctx context.Context) { for c.processNextWorkItem(ctx) { } }, c.JitterPeriod) } c.Started = true return nil }() if err != nil { return err } <-ctx.Done() c.Log.Info("Stopping workers") return nil }
先等待资源对象的Informer同步完成,然后启动workers来处理资源对象,而且worker函数都是一样的实现方式:
func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { return false // Stop working } defer c.Queue.Done(obj) ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) c.reconcileHandler(ctx, obj) return true } processNextWorkItem从工作队列中弹出一个元素,并尝试通过调用reconcileHandler来处理它 reconcileHandler方法是真正执行元素业务处理的地方,包含了事件处理以及错误处理: func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { c.updateMetrics(time.Since(reconcileStartTS)) }() // Make sure that the the object is a valid request. req, ok := obj.(reconcile.Request) if !ok { c.Queue.Forget(obj) c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) return } log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) ctx = logf.IntoContext(ctx, log) if result, err := c.Do.Reconcile(ctx, req); err != nil { c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "error").Inc() return } else if result.RequeueAfter > 0 { c.Queue.Forget(obj) c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue_after").Inc() return } else if result.Requeue { c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "requeue").Inc() return } c.Queue.Forget(obj) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, "success").Inc() }
真正的事件处理通过c.Do.Reconcile(ctx, req)暴露给开发者;就算直接调用c.Reconcile(ctx, req),还是会调用c.Do.Reconcile(ctx, req)
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) ctx = logf.IntoContext(ctx, log) return c.Do.Reconcile(ctx, req) }
每种CRD必须定义一个实现了reconcile.Reconcile接口的XxxReconcile结构体,结构体需要包含client.Client(manager定义了GetClient方法),便于从req中获取资源对象;包含runtime.Scheme,以提供资源对象Kind和Go type的映射(manager定义了runtime.GetScheme方法)
所以对于开发者来说,只需要在此结构体的Reconcile方法中去处理业务逻辑就可以了:
根据方法的返回值来判断是否需要将元素重新加入限速队列进行处理
方法的返回值为reconcile.Result类型:
type Result struct { Requeue bool RequeueAfter time.Duration }
如果返回error!=nil,则将元素重新添加到队列中
如果返回的result.RequeueAfter > 0,则先将元素忘记,然后在result.RequeueAfter时间后加入到队列中
如果返回result.Requeue = true,则直接将元素重新加入到限速队列中
如果正常返回reconcile.Result{},则直接忘记这个元素
controller-runtime中的Manager是一个用于初始化共享依赖关系的接口
type Manager interface { // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. Add will inject any dependencies for which the argument // implements the inject interface - e.g. inject.Client. // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled). Add(Runnable) error // Elected is closed when this manager is elected leader of a group of // managers, either because it won a leader election or because no leader // election was configured. Elected() <-chan struct{} // SetFields will set any dependencies on an object for which the object has implemented the inject // interface - e.g. inject.Client. SetFields(interface{}) error // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics. // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be // sensitive and shouldn't be exposed publicly. // If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as // Runnable to the manager via Add method. AddMetricsExtraHandler(path string, handler http.Handler) error // AddHealthzCheck allows you to add Healthz checker AddHealthzCheck(name string, check healthz.Checker) error // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. // // If LeaderElection is used, the binary must be exited immediately after this returns, // otherwise components that need leader election might continue to run after the leader // lock was lost. Start(ctx context.Context) error // GetConfig returns an initialized Config GetConfig() *rest.Config // GetScheme returns an initialized Scheme GetScheme() *runtime.Scheme // GetClient returns a client configured with the Config. This client may // not be a fully "direct" client -- it may read from a cache, for // instance. See Options.NewClient for more information on how the default // implementation works. GetClient() client.Client // GetFieldIndexer returns a client.FieldIndexer configured with the client GetFieldIndexer() client.FieldIndexer // GetCache returns a cache.Cache GetCache() cache.Cache // GetEventRecorderFor returns a new EventRecorder for the provided name GetEventRecorderFor(name string) record.EventRecorder // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper // GetAPIReader returns a reader that will be configured to use the API server. // This should be used sparingly and only when the client does not fit your // use case. GetAPIReader() client.Reader // GetWebhookServer returns a webhook.Server GetWebhookServer() *webhook.Server // GetLogger returns this manager's logger. GetLogger() logr.Logger }
具体实现在controllerManager结构体:
type controllerManager struct { // config is the rest.config used to talk to the apiserver. Required. config *rest.Config // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults // to scheme.scheme. scheme *runtime.Scheme // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. // These Runnables are managed by lead election. leaderElectionRunnables []Runnable // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable cache cache.Cache // TODO(directxman12): Provide an escape hatch to get individual indexers // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). client client.Client // apiReader is the reader that will make requests to the api server and not the cache. apiReader client.Reader // fieldIndexes knows how to add field indexes over the Cache used by this controller, // which can later be consumed via field selectors from the injected client. fieldIndexes client.FieldIndexer // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // leaderElectionReleaseOnCancel defines if the manager should step back from the leader lease // on shutdown leaderElectionReleaseOnCancel bool // mapper is used to map resources to kind, and map kind and version. mapper meta.RESTMapper // metricsListener is used to serve prometheus metrics metricsListener net.Listener // metricsExtraHandlers contains extra handlers to register on http server that serves metrics. metricsExtraHandlers map[string]http.Handler // healthProbeListener is used to serve liveness probe healthProbeListener net.Listener // Readiness probe endpoint name readinessEndpointName string // Liveness probe endpoint name livenessEndpointName string // Readyz probe handler readyzHandler *healthz.Handler // Healthz probe handler healthzHandler *healthz.Handler mu sync.Mutex started bool startedLeader bool healthzStarted bool errChan chan error // Logger is the logger that should be used by this manager. // If none is set, it defaults to log.Log global logger. logger logr.Logger // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that // it must be deferred until after gracefulShutdown is done. leaderElectionCancel context.CancelFunc // stop procedure engaged. In other words, we should not add anything else to the manager stopProcedureEngaged bool // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. elected chan struct{} startCache func(ctx context.Context) error // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. host string // CertDir is the directory that contains the server key and certificate. // if not set, webhook server would look up the server key and certificate in // {TempDir}/k8s-webhook-server/serving-certs certDir string webhookServer *webhook.Server // leaseDuration is the duration that non-leader candidates will // wait to force acquire leadership. leaseDuration time.Duration // renewDeadline is the duration that the acting controlplane will retry // refreshing leadership before giving up. renewDeadline time.Duration // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration // waitForRunnable is holding the number of runnables currently running so that // we can wait for them to exit before quitting the manager waitForRunnable sync.WaitGroup // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration // onStoppedLeading is callled when the leader election lease is lost. // It can be overridden for tests. onStoppedLeading func() // shutdownCtx is the context that can be used during shutdown. It will be cancelled // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop // is closed because it will be nil. shutdownCtx context.Context internalCtx context.Context internalCancel context.CancelFunc // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} }
通过manager创建controller的步骤:
(1)实例化 manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
第一个参数是client-go的rest/config.go中定义的Config结构体
ctrl.GetConfigOrDie()会调用GetConfigWithContext(),使用~/.kube/config作为默认配置文件生成Config对象
第二个参数是pkg/manager/manager.go中定义的Options结构体,包含了新建manager时候的参数
通过在ctrl.Options结构体中设置namespace字符串,可以设置controller可以管理的namespace
也可以通过设置NewCache为cache.MultiNamespacedCacheBuilder(namespaces字符串切片)让其管理一系列namespace
NewManager实际调用了manager.new函数,为Manager执行初始化工作:
func New(config *rest.Config, options Options) (Manager, error) { // Initialize a rest.config if none was specified if config == nil { return nil, fmt.Errorf("must specify Config") } options = setOptionsDefaults(options) // 为Options设置一些默认的参数值 // Create the mapper provider mapper, err := options.MapperProvider(config) if err != nil { log.Error(err, "Failed to get API Group-Resources") return nil, err } // Create the cache for the cached read client and registering informers cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) if err != nil { return nil, err } apiReader, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper}) if err != nil { return nil, err } writeObj, err := options.NewClient(cache, config, client.Options{Scheme: options.Scheme, Mapper: mapper}) if err != nil { return nil, err } if options.DryRunClient { writeObj = client.NewDryRunClient(writeObj) } // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } // Create the resource lock to enable leader election) leaderConfig := config if options.LeaderElectionConfig != nil { leaderConfig = options.LeaderElectionConfig } resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) if err != nil { return nil, err } // Create the metrics listener. This will throw an error if the metrics bind // address is invalid or already in use. metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { return nil, err } // By default we have no extra endpoints to expose on metrics http server. metricsExtraHandlers := make(map[string]http.Handler) // Create health probes listener. This will throw an error if the bind // address is invalid or already in use. healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } return &controllerManager{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, resourceLock: resourceLock, mapper: mapper, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, elected: make(chan struct{}), port: options.Port, host: options.Host, certDir: options.CertDir, leaseDuration: *options.LeaseDuration, renewDeadline: *options.RenewDeadline, retryPeriod: *options.RetryPeriod, healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), }, nil }
(2)向manager添加scheme,以将api注册到scheme,Scheme 提供了GVK 到go type的映射。
如果多个crd,需要多次调用 AddToScheme
err = api.AddToScheme(mgr.GetScheme())
实际调用了api/vxxx/groupversion_info.go中API schema的相关定义:
var ( GroupVersion = schema.GroupVersion{Group: "data.fluid.io", Version: "v1alpha1"} //用于注册资源对象的GV SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} //SchemeBuilder用于将go type添加到GVK scheme AddToScheme = SchemeBuilder.AddToScheme //AddToScheme用于将GV中的type添加到scheme )
(3)向manager添加controller:
err = ctrl.NewControllerManagedBy(mgr). For(&api.ChaosPod{}). Owns(&corev1.Pod{}). Complete(&reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), })
①NewControllerManagedBy实际调用了builder.ControllerManagedBy函数,它会返回一个新的控制器构造器Builder对象,生成的控制器将管理器Manager启动,
type Builder struct { forInput ForInput ownsInput []OwnsInput watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate config *rest.Config ctrl controller.Controller ctrlOptions controller.Options name string } func ControllerManagedBy(m manager.Manager) *Builder { return &Builder{mgr: m} }
②For函数定义了被调谐的对象类型并配置 ControllerManagedBy 通过调谐对象来响应 create/delete/update 事件
调用For函数相当于调用Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder } input := ForInput{object: object} for _, opt := range opts { opt.ApplyToFor(&input) } blder.forInput = input return blder }
③Owns函数就是来配置监听的资源对象的子资源:
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) } blder.ownsInput = append(blder.ownsInput, input) return blder }
例如此处,ChaosPod拥有Pod作为子资源
没有子资源的话,此部分可省略
④Complete 函数:
func (blder *Builder) Complete(r reconcile.Reconciler) error { _, err := blder.Build(r) return err } //Build根据用户传入的自己自己实现的Reconciler结构体,构建应用程序ControllerManagedBy并返回它创建的 Controller func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } if blder.mgr == nil { return nil, fmt.Errorf("must provide a non-nil Manager") } if blder.forInput.err != nil { return nil, blder.forInput.err } if blder.forInput.object == nil { // Checking the reconcile type exist or not return nil, fmt.Errorf("must provide an object for reconciliation") } blder.loadRestConfig() // Set the Config if err := blder.doController(r); err != nil { // Set the ControllerManagedBy return nil, err } if err := blder.doWatch(); err != nil { // Set the Watch return nil, err } return blder.ctrl, nil } doController方法会将用户实现的XxxReconciler结构体传入为Controller的成员Do,因此可以调用c.Do.Reconcile(ctx, req): func (blder *Builder) doController(r reconcile.Reconciler) error { name, err := blder.getControllerName() if err != nil { return err } ctrlOptions := blder.ctrlOptions ctrlOptions.Reconciler = r blder.ctrl, err = newController(name, blder.mgr, ctrlOptions) return err }
会根据Builder.ctrlOptions和用户传入的XxxReconciler结构体构建创建controller所用的参数ctrlOptions
再通过newController创建新controller
(4)向manager添加webhook,同样需要实现逻辑处理
err = ctrl.NewWebhookManagedBy(mgr). For(&api.ChaosPod{}). Complete()
(5)启动 manager.start()
mgr.Start(ctrl.SetupSignalHandler())
用户对CRD的数据结构的定义:
type ChaosPod struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ChaosPodSpec `json:"spec,omitempty"` Status ChaosPodStatus `json:"status,omitempty"` } type ChaosPodList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []ChaosPod `json:"items"` }
需要在zz_generated.deepcopy.go中自动生成deepcopy的相关反法
将资源对象添加到Group的方法:
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})