zoukankan      html  css  js  c++  java
  • 如何自定义一个CRD

    结合apisix-ingress-controller看下如何自定义CRD的流程,从阿里云上看,引入apisix后可见的新增CRD视图如下,不同的api版本分别对应不同的资源

    那先用yaml创建一个再说

    apiVersion: apisix.apache.org/v2alpha1
    kind: ApisixConsumer
    metadata:
      name: keyauth
    spec:
      authParameter:
        keyAuth:
          value:
            key: API

    那么代码里如何创建的呢?这里看到在apisix/apis/config/不同版本下 types.go文件中有相关定义的描述

    以ApisixConsumer这个配置为例,看下是如何定义的,有哪些配置项

    // ApisixConsumer is the Schema for the ApisixConsumer resource.
    // An ApisixConsumer is used to identify a consumer.
    type ApisixConsumer struct {
        metav1.TypeMeta   `json:",inline" yaml:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"`
        Spec              ApisixConsumerSpec `json:"spec,omitempty" yaml:"spec,omitempty"`
        Status            ApisixStatus       `json:"status,omitempty" yaml:"status,omitempty"`
    }
    
    // ApisixConsumerSpec defines the desired state of ApisixConsumer.
    type ApisixConsumerSpec struct {
        AuthParameter ApisixConsumerAuthParameter `json:"authParameter" yaml:"authParameter"`
    }
    
    type ApisixConsumerAuthParameter struct {
        BasicAuth *ApisixConsumerBasicAuth `json:"basicAuth,omitempty" yaml:"basicAuth"`
        KeyAuth   *ApisixConsumerKeyAuth   `json:"keyAuth,omitempty" yaml:"keyAuth"`
    }
    
    // ApisixConsumerBasicAuth defines the configuration for basic auth.
    type ApisixConsumerBasicAuth struct {
        SecretRef *corev1.LocalObjectReference  `json:"secretRef,omitempty" yaml:"secretRef,omitempty"`
        Value     *ApisixConsumerBasicAuthValue `json:"value,omitempty" yaml:"value,omitempty"`
    }
    
    // ApisixConsumerBasicAuthValue defines the in-place username and password configuration for basic auth.
    type ApisixConsumerBasicAuthValue struct {
        Username string `json:"username" yaml:"username"`
        Password string `json:"password" yaml:"username"`
    }
    
    // ApisixConsumerKeyAuth defines the configuration for the key auth.
    type ApisixConsumerKeyAuth struct {
        SecretRef *corev1.LocalObjectReference `json:"secretRef,omitempty" yaml:"secretRef,omitempty"`
        Value     *ApisixConsumerKeyAuthValue  `json:"value,omitempty" yaml:"value,omitempty"`
    }
    
    // ApisixConsumerKeyAuthValue defines the in-place configuration for basic auth.
    type ApisixConsumerKeyAuthValue struct {
        Key string `json:"key" yaml:"key"`
    }
    
    // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
    
    // ApisixConsumerList contains a list of ApisixConsumer.
    type ApisixConsumerList struct {
        metav1.TypeMeta `json:",inline" yaml:",inline"`
        metav1.ListMeta `json:"metadata" yaml:"metadata"`
        Items           []ApisixConsumer `json:"items,omitempty" yaml:"items,omitempty"`
    }

    translation/translator.go中定义了各种自定义对象转换为可用的apisix对象的转换逻辑

    // Translator translates Apisix* CRD resources to the description in APISIX.
    type Translator interface {
        // TranslateUpstreamNodes translate Endpoints resources to APISIX Upstream nodes
        // according to the give port. Extra labels can be passed to filter the ultimate
        // upstream nodes.
        TranslateUpstreamNodes(kube.Endpoint, int32, types.Labels) (apisixv1.UpstreamNodes, error)
        // TranslateUpstreamConfig translates ApisixUpstreamConfig (part of ApisixUpstream)
        // to APISIX Upstream, it doesn't fill the the Upstream metadata and nodes.
        TranslateUpstreamConfig(*configv1.ApisixUpstreamConfig) (*apisixv1.Upstream, error)
        // TranslateUpstream composes an upstream according to the
        // given namespace, name (searching Service/Endpoints) and port (filtering Endpoints).
        // The returned Upstream doesn't have metadata info.
        // It doesn't assign any metadata fields, so it's caller's responsibility to decide
        // the metadata.
        // Note the subset is used to filter the ultimate node list, only pods whose labels
        // matching the subset labels (defined in ApisixUpstream) will be selected.
        // When the subset is not found, the node list will be empty. When the subset is empty,
        // all pods IP will be filled.
        TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, error)
        // TranslateIngress composes a couple of APISIX Routes and upstreams according
        // to the given Ingress resource.
        TranslateIngress(kube.Ingress) (*TranslateContext, error)
        // TranslateRouteV1 translates the configv1.ApisixRoute object into several Route
        // and Upstream resources.
        TranslateRouteV1(*configv1.ApisixRoute) (*TranslateContext, error)
        // TranslateRouteV2alpha1 translates the configv2alpha1.ApisixRoute object into several Route
        // and Upstream resources.
        TranslateRouteV2alpha1(*configv2alpha1.ApisixRoute) (*TranslateContext, error)
        // TranslateRouteV2alpha1NotStrictly translates the configv2alpha1.ApisixRoute object into several Route
        // and Upstream resources not strictly, only used for delete event.
        TranslateRouteV2alpha1NotStrictly(*configv2alpha1.ApisixRoute) (*TranslateContext, error)
        // TranslateRouteV2beta1 translates the configv2beta1.ApisixRoute object into several Route
        // and Upstream resources.
        TranslateRouteV2beta1(*configv2beta1.ApisixRoute) (*TranslateContext, error)
        // TranslateRouteV2beta1NotStrictly translates the configv2beta1.ApisixRoute object into several Route
        // and Upstream resources not strictly, only used for delete event.
        TranslateRouteV2beta1NotStrictly(*configv2beta1.ApisixRoute) (*TranslateContext, error)
        // TranslateSSL translates the configv2alpha1.ApisixTls object into the APISIX SSL resource.
        TranslateSSL(*configv1.ApisixTls) (*apisixv1.Ssl, error)
        // TranslateClusterConfig translates the configv2alpha1.ApisixClusterConfig object into the APISIX
        // Global Rule resource.
        TranslateClusterConfig(*configv2alpha1.ApisixClusterConfig) (*apisixv1.GlobalRule, error)
        // TranslateApisixConsumer translates the configv2alpha1.APisixConsumer object into the APISIX Consumer
        // resource.
        TranslateApisixConsumer(*configv2alpha1.ApisixConsumer) (*apisixv1.Consumer, error)
    }

    再往上一个入口,可以看到pkg/ingress/apisix_consumer.go中

    整个监听,对该资源的各种事件的处理均在此处

    package ingress
    
    import (
        "context"
        "time"
    
        "go.uber.org/zap"
        corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
    
        configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
        "github.com/apache/apisix-ingress-controller/pkg/log"
        "github.com/apache/apisix-ingress-controller/pkg/types"
    )
    
    type apisixConsumerController struct {
        controller *Controller
        workqueue  workqueue.RateLimitingInterface
        workers    int
    }
    
    func (c *Controller) newApisixConsumerController() *apisixConsumerController {
        ctl := &apisixConsumerController{
            controller: c,
            workqueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixConsumer"),
            workers:    1,
        }
        ctl.controller.apisixConsumerInformer.AddEventHandler(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    ctl.onAdd,
                UpdateFunc: ctl.onUpdate,
                DeleteFunc: ctl.onDelete,
            },
        )
        return ctl
    }
    
    func (c *apisixConsumerController) run(ctx context.Context) {
        log.Info("ApisixConsumer controller started")
        defer log.Info("ApisixConsumer controller exited")
        if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.apisixConsumerInformer.HasSynced); !ok {
            log.Error("cache sync failed")
            return
        }
        for i := 0; i < c.workers; i++ {
            go c.runWorker(ctx)
        }
        <-ctx.Done()
        c.workqueue.ShutDown()
    }
    
    func (c *apisixConsumerController) runWorker(ctx context.Context) {
        for {
            obj, quit := c.workqueue.Get()
            if quit {
                return
            }
            err := c.sync(ctx, obj.(*types.Event))
            c.workqueue.Done(obj)
            c.handleSyncErr(obj, err)
        }
    }
    
    func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) error {
        key := ev.Object.(string)
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            log.Errorf("found ApisixConsumer resource with invalid meta namespace key %s: %s", key, err)
            return err
        }
    
        ac, err := c.controller.apisixConsumerLister.ApisixConsumers(namespace).Get(name)
        if err != nil {
            if !k8serrors.IsNotFound(err) {
                log.Errorf("failed to get ApisixConsumer %s: %s", key, err)
                return err
            }
            if ev.Type != types.EventDelete {
                log.Warnf("ApisixConsumer %s was deleted before it can be delivered", key)
                // Don't need to retry.
                return nil
            }
        }
        if ev.Type == types.EventDelete {
            if ac != nil {
                // We still find the resource while we are processing the DELETE event,
                // that means object with same namespace and name was created, discarding
                // this stale DELETE event.
                log.Warnf("discard the stale ApisixConsumer delete event since the %s exists", key)
                return nil
            }
            ac = ev.Tombstone.(*configv2alpha1.ApisixConsumer)
        }
    
        consumer, err := c.controller.translator.TranslateApisixConsumer(ac)
        if err != nil {
            log.Errorw("failed to translate ApisixConsumer",
                zap.Error(err),
                zap.Any("ApisixConsumer", ac),
            )
            c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
            c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
            return err
        }
        log.Debug("got consumer object from ApisixConsumer",
            zap.Any("consumer", consumer),
            zap.Any("ApisixConsumer", ac),
        )
    
        if err := c.controller.syncConsumer(ctx, consumer, ev.Type); err != nil {
            log.Errorw("failed to sync Consumer to APISIX",
                zap.Error(err),
                zap.Any("consumer", consumer),
            )
            c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
            c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
            return err
        }
    
        c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
        return nil
    }
    
    func (c *apisixConsumerController) handleSyncErr(obj interface{}, err error) {
        if err == nil {
            c.workqueue.Forget(obj)
            return
        }
        log.Warnw("sync ApisixConsumer failed, will retry",
            zap.Any("object", obj),
            zap.Error(err),
        )
        c.workqueue.AddRateLimited(obj)
    }
    
    func (c *apisixConsumerController) onAdd(obj interface{}) {
        key, err := cache.MetaNamespaceKeyFunc(obj)
        if err != nil {
            log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err)
            return
        }
        if !c.controller.namespaceWatching(key) {
            return
        }
        log.Debugw("ApisixConsumer add event arrived",
            zap.Any("object", obj),
        )
    
        c.workqueue.AddRateLimited(&types.Event{
            Type:   types.EventAdd,
            Object: key,
        })
    }
    
    func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
        prev := oldObj.(*configv2alpha1.ApisixConsumer)
        curr := newObj.(*configv2alpha1.ApisixConsumer)
        if prev.ResourceVersion >= curr.ResourceVersion {
            return
        }
        key, err := cache.MetaNamespaceKeyFunc(newObj)
        if err != nil {
            log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err)
            return
        }
        if !c.controller.namespaceWatching(key) {
            return
        }
        log.Debugw("ApisixConsumer update event arrived",
            zap.Any("new object", curr),
            zap.Any("old object", prev),
        )
    
        c.workqueue.AddRateLimited(&types.Event{
            Type:   types.EventUpdate,
            Object: key,
        })
    }
    
    func (c *apisixConsumerController) onDelete(obj interface{}) {
        ac, ok := obj.(*configv2alpha1.ApisixConsumer)
        if !ok {
            tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
            if !ok {
                return
            }
            ac = tombstone.Obj.(*configv2alpha1.ApisixConsumer)
        }
    
        key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err != nil {
            log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err)
            return
        }
        if !c.controller.namespaceWatching(key) {
            return
        }
        log.Debugw("ApisixConsumer delete event arrived",
            zap.Any("final state", ac),
        )
        c.workqueue.AddRateLimited(&types.Event{
            Type:      types.EventDelete,
            Object:    key,
            Tombstone: ac,
        })
    }

    反向从入口看看

    cmd/ingress/ingress.go中

                go func() {
                    if err := ingress.Run(stop); err != nil {
                        dief("failed to run ingress controller: %s", err)
                    }
                }()

    pkg/ingress/controller.go中

    // Run launches the controller.
    func (c *Controller) Run(stop chan struct{}) error {
        rootCtx, rootCancel := context.WithCancel(context.Background())
        defer rootCancel()
        go func() {
            <-stop
            rootCancel()
        }()
        c.metricsCollector.ResetLeader(false)
    
        go func() {
            if err := c.apiServer.Run(rootCtx.Done()); err != nil {
                log.Errorf("failed to launch API Server: %s", err)
            }
        }()
    
        lock := &resourcelock.LeaseLock{
            LeaseMeta: metav1.ObjectMeta{
                Namespace: c.namespace,
                Name:      c.cfg.Kubernetes.ElectionID,
            },
            Client: c.kubeClient.Client.CoordinationV1(),
            LockConfig: resourcelock.ResourceLockConfig{
                Identity:      c.name,
                EventRecorder: c,
            },
        }
        cfg := leaderelection.LeaderElectionConfig{
            Lock:          lock,
            LeaseDuration: 15 * time.Second,
            RenewDeadline: 5 * time.Second,
            RetryPeriod:   2 * time.Second,
            Callbacks: leaderelection.LeaderCallbacks{
                OnStartedLeading: c.run,
                OnNewLeader: func(identity string) {
                    log.Warnf("found a new leader %s", identity)
                    if identity != c.name {
                        log.Infow("controller now is running as a candidate",
                            zap.String("namespace", c.namespace),
                            zap.String("pod", c.name),
                        )
                    }
                },
                OnStoppedLeading: func() {
                    log.Infow("controller now is running as a candidate",
                        zap.String("namespace", c.namespace),
                        zap.String("pod", c.name),
                    )
                    c.metricsCollector.ResetLeader(false)
                },
            },
            // Set it to false as current leaderelection implementation will report
            // "Failed to release lock: resource name may not be empty" error when
            // ReleaseOnCancel is true and the Run context is cancelled.
            ReleaseOnCancel: false,
            Name:            "ingress-apisix",
        }
    
        elector, err := leaderelection.NewLeaderElector(cfg)
        if err != nil {
            log.Errorf("failed to create leader elector: %s", err.Error())
            return err
        }
    
    election:
        curCtx, cancel := context.WithCancel(rootCtx)
        c.leaderContextCancelFunc = cancel
        elector.Run(curCtx)
        select {
        case <-rootCtx.Done():
            return nil
        default:
            goto election
        }
    }
  • 相关阅读:
    今天试了下lockerz,感觉国外的概念很先进
    十月一日,本人就结婚了,我和老婆选的婚礼主题曲,大家听听
    今天遇到了个奇怪的问题
    第一个博客,第一次随笔
    遇到一个奇葩的问题,could not load the assembly file XXX downloaded from the Web
    Log4net简单使用
    AutoWCFService心跳动态加载服务
    初学Service Broker
    新浪SAE云空间和SVN版本控制
    软件工程导论第一次作业
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15356094.html
Copyright © 2011-2022 走看看