结合apisix-ingress-controller看下如何自定义CRD的流程,从阿里云上看,引入apisix后可见的新增CRD视图如下,不同的api版本分别对应不同的资源
那先用yaml创建一个再说
apiVersion: apisix.apache.org/v2alpha1
kind: ApisixConsumer
metadata:
name: keyauth
spec:
authParameter:
keyAuth:
value:
key: API
那么代码里如何创建的呢?这里看到在apisix/apis/config/不同版本下 types.go文件中有相关定义的描述
以ApisixConsumer这个配置为例,看下是如何定义的,有哪些配置项
// ApisixConsumer is the Schema for the ApisixConsumer resource. // An ApisixConsumer is used to identify a consumer. type ApisixConsumer struct { metav1.TypeMeta `json:",inline" yaml:",inline"` metav1.ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"` Spec ApisixConsumerSpec `json:"spec,omitempty" yaml:"spec,omitempty"` Status ApisixStatus `json:"status,omitempty" yaml:"status,omitempty"` } // ApisixConsumerSpec defines the desired state of ApisixConsumer. type ApisixConsumerSpec struct { AuthParameter ApisixConsumerAuthParameter `json:"authParameter" yaml:"authParameter"` } type ApisixConsumerAuthParameter struct { BasicAuth *ApisixConsumerBasicAuth `json:"basicAuth,omitempty" yaml:"basicAuth"` KeyAuth *ApisixConsumerKeyAuth `json:"keyAuth,omitempty" yaml:"keyAuth"` } // ApisixConsumerBasicAuth defines the configuration for basic auth. type ApisixConsumerBasicAuth struct { SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty" yaml:"secretRef,omitempty"` Value *ApisixConsumerBasicAuthValue `json:"value,omitempty" yaml:"value,omitempty"` } // ApisixConsumerBasicAuthValue defines the in-place username and password configuration for basic auth. type ApisixConsumerBasicAuthValue struct { Username string `json:"username" yaml:"username"` Password string `json:"password" yaml:"username"` } // ApisixConsumerKeyAuth defines the configuration for the key auth. type ApisixConsumerKeyAuth struct { SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty" yaml:"secretRef,omitempty"` Value *ApisixConsumerKeyAuthValue `json:"value,omitempty" yaml:"value,omitempty"` } // ApisixConsumerKeyAuthValue defines the in-place configuration for basic auth. type ApisixConsumerKeyAuthValue struct { Key string `json:"key" yaml:"key"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ApisixConsumerList contains a list of ApisixConsumer. type ApisixConsumerList struct { metav1.TypeMeta `json:",inline" yaml:",inline"` metav1.ListMeta `json:"metadata" yaml:"metadata"` Items []ApisixConsumer `json:"items,omitempty" yaml:"items,omitempty"` }
translation/translator.go中定义了各种自定义对象转换为可用的apisix对象的转换逻辑
// Translator translates Apisix* CRD resources to the description in APISIX. type Translator interface { // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes // according to the give port. Extra labels can be passed to filter the ultimate // upstream nodes. TranslateUpstreamNodes(kube.Endpoint, int32, types.Labels) (apisixv1.UpstreamNodes, error) // TranslateUpstreamConfig translates ApisixUpstreamConfig (part of ApisixUpstream) // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes. TranslateUpstreamConfig(*configv1.ApisixUpstreamConfig) (*apisixv1.Upstream, error) // TranslateUpstream composes an upstream according to the // given namespace, name (searching Service/Endpoints) and port (filtering Endpoints). // The returned Upstream doesn't have metadata info. // It doesn't assign any metadata fields, so it's caller's responsibility to decide // the metadata. // Note the subset is used to filter the ultimate node list, only pods whose labels // matching the subset labels (defined in ApisixUpstream) will be selected. // When the subset is not found, the node list will be empty. When the subset is empty, // all pods IP will be filled. TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error) // TranslateIngress composes a couple of APISIX Routes and upstreams according // to the given Ingress resource. TranslateIngress(kube.Ingress) (*TranslateContext, error) // TranslateRouteV1 translates the configv1.ApisixRoute object into several Route // and Upstream resources. TranslateRouteV1(*configv1.ApisixRoute) (*TranslateContext, error) // TranslateRouteV2alpha1 translates the configv2alpha1.ApisixRoute object into several Route // and Upstream resources. TranslateRouteV2alpha1(*configv2alpha1.ApisixRoute) (*TranslateContext, error) // TranslateRouteV2alpha1NotStrictly translates the configv2alpha1.ApisixRoute object into several Route // and Upstream resources not strictly, only used for delete event. TranslateRouteV2alpha1NotStrictly(*configv2alpha1.ApisixRoute) (*TranslateContext, error) // TranslateRouteV2beta1 translates the configv2beta1.ApisixRoute object into several Route // and Upstream resources. TranslateRouteV2beta1(*configv2beta1.ApisixRoute) (*TranslateContext, error) // TranslateRouteV2beta1NotStrictly translates the configv2beta1.ApisixRoute object into several Route // and Upstream resources not strictly, only used for delete event. TranslateRouteV2beta1NotStrictly(*configv2beta1.ApisixRoute) (*TranslateContext, error) // TranslateSSL translates the configv2alpha1.ApisixTls object into the APISIX SSL resource. TranslateSSL(*configv1.ApisixTls) (*apisixv1.Ssl, error) // TranslateClusterConfig translates the configv2alpha1.ApisixClusterConfig object into the APISIX // Global Rule resource. TranslateClusterConfig(*configv2alpha1.ApisixClusterConfig) (*apisixv1.GlobalRule, error) // TranslateApisixConsumer translates the configv2alpha1.APisixConsumer object into the APISIX Consumer // resource. TranslateApisixConsumer(*configv2alpha1.ApisixConsumer) (*apisixv1.Consumer, error) }
再往上一个入口,可以看到pkg/ingress/apisix_consumer.go中
整个监听,对该资源的各种事件的处理均在此处
package ingress import ( "context" "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1" "github.com/apache/apisix-ingress-controller/pkg/log" "github.com/apache/apisix-ingress-controller/pkg/types" ) type apisixConsumerController struct { controller *Controller workqueue workqueue.RateLimitingInterface workers int } func (c *Controller) newApisixConsumerController() *apisixConsumerController { ctl := &apisixConsumerController{ controller: c, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixConsumer"), workers: 1, } ctl.controller.apisixConsumerInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: ctl.onAdd, UpdateFunc: ctl.onUpdate, DeleteFunc: ctl.onDelete, }, ) return ctl } func (c *apisixConsumerController) run(ctx context.Context) { log.Info("ApisixConsumer controller started") defer log.Info("ApisixConsumer controller exited") if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixConsumerInformer.HasSynced); !ok { log.Error("cache sync failed") return } for i := 0; i < c.workers; i++ { go c.runWorker(ctx) } <-ctx.Done() c.workqueue.ShutDown() } func (c *apisixConsumerController) runWorker(ctx context.Context) { for { obj, quit := c.workqueue.Get() if quit { return } err := c.sync(ctx, obj.(*types.Event)) c.workqueue.Done(obj) c.handleSyncErr(obj, err) } } func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) error { key := ev.Object.(string) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Errorf("found ApisixConsumer resource with invalid meta namespace key %s: %s", key, err) return err } ac, err := c.controller.apisixConsumerLister.ApisixConsumers(namespace).Get(name) if err != nil { if !k8serrors.IsNotFound(err) { log.Errorf("failed to get ApisixConsumer %s: %s", key, err) return err } if ev.Type != types.EventDelete { log.Warnf("ApisixConsumer %s was deleted before it can be delivered", key) // Don't need to retry. return nil } } if ev.Type == types.EventDelete { if ac != nil { // We still find the resource while we are processing the DELETE event, // that means object with same namespace and name was created, discarding // this stale DELETE event. log.Warnf("discard the stale ApisixConsumer delete event since the %s exists", key) return nil } ac = ev.Tombstone.(*configv2alpha1.ApisixConsumer) } consumer, err := c.controller.translator.TranslateApisixConsumer(ac) if err != nil { log.Errorw("failed to translate ApisixConsumer", zap.Error(err), zap.Any("ApisixConsumer", ac), ) c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err) c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse) return err } log.Debug("got consumer object from ApisixConsumer", zap.Any("consumer", consumer), zap.Any("ApisixConsumer", ac), ) if err := c.controller.syncConsumer(ctx, consumer, ev.Type); err != nil { log.Errorw("failed to sync Consumer to APISIX", zap.Error(err), zap.Any("consumer", consumer), ) c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err) c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse) return err } c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil) return nil } func (c *apisixConsumerController) handleSyncErr(obj interface{}, err error) { if err == nil { c.workqueue.Forget(obj) return } log.Warnw("sync ApisixConsumer failed, will retry", zap.Any("object", obj), zap.Error(err), ) c.workqueue.AddRateLimited(obj) } func (c *apisixConsumerController) onAdd(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err) return } if !c.controller.namespaceWatching(key) { return } log.Debugw("ApisixConsumer add event arrived", zap.Any("object", obj), ) c.workqueue.AddRateLimited(&types.Event{ Type: types.EventAdd, Object: key, }) } func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) { prev := oldObj.(*configv2alpha1.ApisixConsumer) curr := newObj.(*configv2alpha1.ApisixConsumer) if prev.ResourceVersion >= curr.ResourceVersion { return } key, err := cache.MetaNamespaceKeyFunc(newObj) if err != nil { log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err) return } if !c.controller.namespaceWatching(key) { return } log.Debugw("ApisixConsumer update event arrived", zap.Any("new object", curr), zap.Any("old object", prev), ) c.workqueue.AddRateLimited(&types.Event{ Type: types.EventUpdate, Object: key, }) } func (c *apisixConsumerController) onDelete(obj interface{}) { ac, ok := obj.(*configv2alpha1.ApisixConsumer) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } ac = tombstone.Obj.(*configv2alpha1.ApisixConsumer) } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err) return } if !c.controller.namespaceWatching(key) { return } log.Debugw("ApisixConsumer delete event arrived", zap.Any("final state", ac), ) c.workqueue.AddRateLimited(&types.Event{ Type: types.EventDelete, Object: key, Tombstone: ac, }) }
反向从入口看看
cmd/ingress/ingress.go中
go func() { if err := ingress.Run(stop); err != nil { dief("failed to run ingress controller: %s", err) } }()
pkg/ingress/controller.go中
// Run launches the controller. func (c *Controller) Run(stop chan struct{}) error { rootCtx, rootCancel := context.WithCancel(context.Background()) defer rootCancel() go func() { <-stop rootCancel() }() c.metricsCollector.ResetLeader(false) go func() { if err := c.apiServer.Run(rootCtx.Done()); err != nil { log.Errorf("failed to launch API Server: %s", err) } }() lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Namespace: c.namespace, Name: c.cfg.Kubernetes.ElectionID, }, Client: c.kubeClient.Client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: c.name, EventRecorder: c, }, } cfg := leaderelection.LeaderElectionConfig{ Lock: lock, LeaseDuration: 15 * time.Second, RenewDeadline: 5 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: c.run, OnNewLeader: func(identity string) { log.Warnf("found a new leader %s", identity) if identity != c.name { log.Infow("controller now is running as a candidate", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) } }, OnStoppedLeading: func() { log.Infow("controller now is running as a candidate", zap.String("namespace", c.namespace), zap.String("pod", c.name), ) c.metricsCollector.ResetLeader(false) }, }, // Set it to false as current leaderelection implementation will report // "Failed to release lock: resource name may not be empty" error when // ReleaseOnCancel is true and the Run context is cancelled. ReleaseOnCancel: false, Name: "ingress-apisix", } elector, err := leaderelection.NewLeaderElector(cfg) if err != nil { log.Errorf("failed to create leader elector: %s", err.Error()) return err } election: curCtx, cancel := context.WithCancel(rootCtx) c.leaderContextCancelFunc = cancel elector.Run(curCtx) select { case <-rootCtx.Done(): return nil default: goto election } }