zoukankan      html  css  js  c++  java
  • 自己实现一个Controller——终极型

    经过前两篇的学习与实操,也大致掌握了一个k8s资源的Controller写法了,如有不熟,可回顾

    先说说CRD-controller的作用,本CR原意是记录云主机ECS及node节点映射信息,而本controller则把这个映射操作省略掉,只为所有创建成功的CR打一个Label。而本篇为达成此目的,需要执行的步骤有以下几个:

    • 往k8s创建一个CRD
    • 定义对应CRD的api,包含了struct
    • 给CRD的api注册到scheme
    • 实现这个CRD的clinet,封装其请求apiserver的相关操作
    • 实现一个Informer和Lister
    • 实现一个controller

    通过上述步骤,可以绕开ApiBuilder脚手架,自己手捏一个CRD-Controller出来。可以更好的理解整个Informer机制的架构

    创建一个CRD

    创建CRD的manifest如下所示

    apiVersion: apiextensions.k8s.io/v1beta1
    kind: CustomResourceDefinition
    metadata:
      annotations:
        controller-gen.kubebuilder.io/version: v0.2.5
      creationTimestamp: null
      name: ecsbinding.hopegi.com
    spec:
      group: hopegi.com  #此处定义api group
      names:
        kind: EcsBinding          #本行和下一行定义CRD的单体kind和集合类型的Kind
        listKind: EcsBindingList
        plural: ecsbinding       #复数写法 #本行和下行仅填写CRD的小写即可
        singular: ecsbinding    #单数写法
        shortNames:
          - ecsb          #
      preserveUnknownFields: false   #允许存储未知字段,就是下面没声明的
      additionalPrinterColumns:  #在kubectl显示的字段
        - JSONPath: .metadata.creationTimestamp
          name: Age
          type: date
        - JSONPath: .spec.nodename
          name: NodeName
          type: string
        - JSONPath: .spec.innerip
          name: InnerIp
          type: string
      scope: Cluster          #资源的作用域,一般是命名空间下和集群级别,命名空间是Namespaced,集群则Cluster
      validation:                 #本行和下行都是固定
        openAPIV3Schema:
          description: hopegi test crd
          properties:           #后面开始描述各个字段,API和metadata是固定,spec也是固定,spec往下则是自定义的字段,每个字段需要制定
                                #type,name属性,description可选,type有常用的string,object,bool,int等,object则会多一级properties
                                #用于定义下一级的子字段
            apiVersion:
              type: string
            metadata:
              type: object
            spec:
              properties:
                id:
                  type: string
                name:
                  type: string
                nodename:
                  type: string
                innerip:
                  type: string
              type: object
            status:
              type: object
          type: object
      version: v1           #到此处往下皆固定
      versions:
        - name: v1
          served: true
          storage: true
    status:
      acceptedNames:
        kind: ""
        plural: ""
      conditions: []
      storedVersions: []
    

    这里比较值得注意是ApiGroup需要定好,这个group到后续给scheme注册资源类型时用到,影响往apiserver去交互管理资源。

    定义CRD的api

    这个api可能容易给人造成误解,实际是定义CR的struct,包含什么字段,文件路径api/v1/ecs-bing.go

    type EcsBinding struct {
    	metav1.TypeMeta   `json:",inline"`
    	metav1.ObjectMeta `json:"metadata,omitempty"`
    
    	Spec   EcsBindSpec   `json:"spec,omitempty"`
    	Status EcsBindStatus `json:"status,omitempty"`
    }
    
    type EcsBindingList struct {
    	metav1.TypeMeta `json:",inline"`
    	metav1.ListMeta `json:"metadata,omitempty"`
    
    	Items []EcsBinding `json:"items"`
    }
    
    type EcsBindSpec struct {
    	Id       string `json:"id"`
    	Name     string `json:"name"`
    	NodeName string `json:"node_name"`
    	InnerIp  string `json:"inner_ip"`
    }
    
    type EcsBindStatus struct {
    }
    

    自上而下定义EcsBinding和EcsBindingList两个struct,由于要实现runtime.Object的接口,需要实现DeepCopyObject方法,如果用脚手架生成的代码,这部分实现接口的代码就不用手敲

    注册到Scheme

    scheme注册这里分两部分,一部分是定义一个scheme,另一部分是在各个api里面提供AddToScheme函数,这个函数用于把各种类型各种版本的api(也就是GVK)注册到scheme

    先看第一部分,文件路径client/versiond/scheme/register.go

    var scheme   = runtime.NewScheme()
    var Codecs = serializer.NewCodecFactory(scheme)
    var ParameterCodec = runtime.NewParameterCodec(scheme)
    
    func init()  {
    	metav1.AddToGroupVersion(scheme,schema.GroupVersion{Version:"v1"})
    	if err:=AddToScheme(scheme);err!=nil{
    		fmt.Println("error to AddToScheme ",err)
    	}
    }
    
    func AddToScheme(scheme *runtime.Scheme)error  {
    	return ecsv1.AddToScheme(scheme)
    }
    

    在AddToScheme中就是调用各个kind的AddToScheme,尽管这里只有一个Kind。第二部分的又回去api/v1/ecs-bing.go

    var (
    
    
    	// GroupVersion is group version used to register these objects
    	GroupVersion = schema.GroupVersion{Group: "hopegi.com", Version: "v1"}
    
    	// SchemeBuilder is used to add go types to the GroupVersionKind scheme
    	SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
    	//SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes)
    
    	// AddToScheme adds the types in this group-version to the given scheme.
    	AddToScheme = SchemeBuilder.AddToScheme
    
    	EscBindVersionKind = schema.GroupVersionKind{Group: GroupVersion.Group, Version: GroupVersion.Version, Kind: "EcsBinding"}
    )
    func init()  {
    
    	SchemeBuilder.Register(&EcsBinding{},&EcsBindingList{})
    
    }
    

    此处的Group需要和之前定义CRD时的group一致

    实现CRD的clinet

    这里实际定义了一个clientSet,clientset应该包含多个version,一个version包含多个资源类型,但是这里只有一个version,一个kind。clientSet的结构如下所示

    clientSet
    |---Discovery
    |---EcsV1
         |---RESTClient
         |---EcsBindingClient
    

    clientSet位于client/versiond/clientset.go

    EcsV1位于client/versiond/typed/ecsbinding/v1/ecs_client.go中,它的RESTClient也用于传递给EcsClient,用于EcsClient对apiserver通讯的http客户端

    EcsBindingClient位于client/version/typed/ecsbingding/v1/ecs-binding.go中,定义了client的各种操作方法,封装了对apiserver的各个http请求。

    各个client的初始化,则是由最外层把Config一层一层的往里面具体的Client传。整套client的代码不在这里展示,仅展示一下调用链

    versiond.NewForConfig->ecsbindv1.NewForConfig
    创建出clientSet          创建出EcsV1
    

    当调用EcsV1的EcsBinding方法(也就是获取EcsClient)时,才调用newEcsbindings构造函数构造一个client

    ecsbindv1.NewForConfig的代码如下:

    func NewForConfig(c *rest.Config)(*EcsV1Client,error)  {
    	config:=*c
    	if err:=setConfigDefaults(&config);err!=nil{
    		return nil,err
    	}
    	client,err:=rest.RESTClientFor(&config)
    	if err!=nil{
    		return nil,err
    	}
    	return &EcsV1Client{client},nil
    
    }
    

    在这个函数中先给config设置默认参数,最后按照这些默认参数构造出一个RESTClient,这个RESTClient传递给EcsV1Client,一个作用是把它自己的一个成员restClient,另一个作用就是用于构造EcsClient所需的RESTClient。

    setConfigDefaults函数定义如下

    func setConfigDefaults(config *rest.Config) error {
    	gv := ecsv1.GroupVersion
    	config.GroupVersion = &gv
    	config.APIPath = "/apis"
    	config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
    
    	if config.UserAgent == "" {
    		config.UserAgent = rest.DefaultKubernetesUserAgent()
    	}
    
    	return nil
    }
    

    函数给config指定了groudversion这个gv就是hopegi.com v1;api的地址固定是"/apis",通过这两句可以确定客户端跟apiserver通讯时的地址是/apis/hopegi.com/v1,后面设置scheme的序列化器,用于把apiserver响应的json数据反序列化成struct数据。

    EcsBindingClient接口定义的函数如下

    type EcsBindingInterface interface {
    	Create(*v1.EcsBinding)(*v1.EcsBinding,error)
    	Update(*v1.EcsBinding)(*v1.EcsBinding,error)
    	Delete(string,*metav1.DeleteOptions)error
    	DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
    	List(metav1.ListOptions)(*v1.EcsBindingList,error)
    	Get(name string,options metav1.GetOptions)(*v1.EcsBinding,error)
    	Watch(options metav1.ListOptions)(watch.Interface,error)
    	Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.EcsBinding, err error)
    }
    

    以List方法实现作例子

    func (c *ecsBinding)List(opts metav1.ListOptions)(*v1.EcsBindingList,error){
    	result := &v1.EcsBindingList{}
    	err := c.client.Get().
    		Resource("ecsbinding").
    		VersionedParams(&opts, scheme.ParameterCodec).
    		Do().
    		Into(result)
    	for _,o:=range result.Items{
    		o.SetGroupVersionKind(v1.EscBindVersionKind)
    	}
    	return result,err
    }
    

    client成员则是先前构造时传入的RESTClient,Resource指定资源的名ecsbingding,当有CR返回时需要执行SetGroupVersionKind,否则拿到的CR结构体会丢失GroupVersion和Kind信息

    实现一个Informer和Lister

    在实现某个资源的Informer之前,要实现一个Informer的Factory。这个Factory的作用有几个,其一是用于构造一个Informer;另外就是在start一个Controller的时候调用它Start方法,Start方法内部就会把它管理的所有Informer Run起来。

    实现一个Informer的Factory

    SharedInformerFactory接口的定义如下所示,代码位于controller-demo/informers/factory.go

    type SharedInformerFactory interface {
    	internalinterfaces.SharedInformerFactory
    	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
    
    	EcsBind() ecsbind.Interface
    }
    

    这里主要是暴露一个构造并获取各个Group的Interface,Start方法的接口则来源于它继承的internalinterfaces.SharedInformerFactory接口,代码位于 controller-demo/informers/internalinterface/factory_interfaces.go

    type SharedInformerFactory interface {
    	Start(stopCh <-chan struct{})
    	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
    }
    

    除了Start方法,InformerFor跟构造一个Informer有关,实现Informer的时候会调用到factory的方法,后续会再介绍

    EcsBind()返回的是这个Group的Interface,代码位于controller-demo/informers/ecsbind/interface.go
    type Interface interface {
    	// V1 provides access to shared informers for resources in V1.
    	V1() v1.Interface
    }
    

    V1的Interface则是涵盖了这个版本下各个资源的客户端接口,代码位于controller-demo/informers/ecsbind/v1/interface.go

    type Interface interface {
    	EcsBinding() EcsBindingInformer
    }
    

    这样也刚好跟k8s的api的层级相呼应,先是ApiGroup,再到Version,最后到Kind,就是GVK

    实现一个Informer

    一个Informer的最核心逻辑是List和Watch方法,不过我们实现一个Infomer时只需要给SharedIndexInformer提供这两个方法就可以,调用这两个方法的逻辑由SharedIndexInformer统一实现

    func NewFilteredEcsBindingInformer(clientset *versiond.Clientset, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    	return cache.NewSharedIndexInformer(&cache.ListWatch{
    		ListFunc: func(options meta_v1.ListOptions) (object runtime.Object, e error) {
    			if tweakListOptions!=nil{
    				tweakListOptions(&options)
    			}
    			return clientset.EcsV1().EcsBinding().List(options)
    		},
    		WatchFunc: func(options meta_v1.ListOptions) (i watch.Interface, e error) {
    			if tweakListOptions!=nil{
    				tweakListOptions(&options)
    			}
    			return clientset.EcsV1().EcsBinding().Watch(options)
    		},
    	},&ecsv1.EcsBinding{},resyncPeriod,indexers)
    }
    

    实际上仅仅是调用了client而已,client则是来源于这个CR的Informer——EcsBindingInformer,看看它的接口定义和结构

    type EcsBindingInformer interface {
    	Informer() cache.SharedIndexInformer
    	Lister() demov1.EcsBindingLister
    }
    
    type ecsBindingInformer struct {
    	factory          internalinterfaces.SharedInformerFactory
    	tweakListOptions internalinterfaces.TweakListOptionsFunc
    }
    

    对外暴露的EcsBindingInformer仅仅是一个接口,暴露Informer和LIster两个方法,实现则交由一个内部的结构实现,纵观这个CRD的client,CR的client,clientset,Informer乃至后续的lister都是这样的模式。

    EcsBindingInformer的Informer()实现如下

    func (e *ecsBindingInformer) Informer() cache.SharedIndexInformer {
    	return e.factory.InformerFor(&ecsv1.EcsBinding{}, e.defaultInformer)
    }
    func (e *ecsBindingInformer) defaultInformer(client versiond.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    	return NewFilteredEcsBindingInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, e.tweakListOptions)
    }
    

    如前面介绍Factory的时候所介绍的,Informer创建时需要调用factory的InformerFor方法,传入资源的指针以及一个函数回调

    func (e *ecsBindingInformer) Informer() cache.SharedIndexInformer {
    	return e.factory.InformerFor(&ecsv1.EcsBinding{}, e.defaultInformer)
    }
    

    回调的声明在internalinterface处,controller-demo/informers/internalinterface/factory_interfaces.go

    type NewInformerFunc func(versiond.Interface, time.Duration) cache.SharedIndexInformer
    

    在这里就是ecsBindingInformer.defaultInformer,调用这个方法时就会把factory的client传递到构造SharedIndexInformer函数,这样List函数和Watch函数就有client使用,相当于整个构造过程是

    • 创建一个client,将这个client传递给Factory
    • 创建一个Informer时,会通过Factory经过GVK三个层次的接口调到对应资源的Informer,同时factory的实例也会经过每一级往下传递
    • 调用Inform()方法获得SharedIndexInformer,依次经过EcsBindingInformer.Informer()-->d.defaultInformer(即:NewInformerFunc回调)-->NewFilteredEcsBindingInformer

    实现一个Lister

    EcsBindingInformer接口的另一个方法就是获取Lister,仅仅需要把SharedIndexInformer的Indexer传递过去则可,Lister的缓存机构已由SharedIndexInformer实现

    func (e *ecsBindingInformer) Lister() demov1.EcsBindingLister {
    	return demov1.NewEcsBindingLister(e.Informer().GetIndexer())
    }
    

    作为apiserver的缓存,供controller调用快速获取资源,因此它需要提供两个查询的方法,代码位于controller-demo/listers/ecsbind/v1/ecs-binding-lister.go

    type EcsBindingLister interface {
    	List(selector labels.Selector)([]*ecsv1.EcsBinding,error)
    	Get(name string)(*ecsv1.EcsBinding,error)
    }
    
    func NewEcsBindingLister(indexer cache.Indexer) EcsBindingLister  {
    	return &ecsBindingLister{
    		indexer:indexer,
    	}
    }
    
    func (e *ecsBindingLister)List(selector labels.Selector)(ret []*ecsv1.EcsBinding,err error)  {
    	err= cache.ListAll(e.indexer,selector, func(i interface{}) {
    		ret= append(ret, i.(*ecsv1.EcsBinding))
    	})
    	return
    }
    
    func (e *ecsBindingLister)Get(name string)(*ecsv1.EcsBinding,error)  {
    	obj, exists, err := e.indexer.GetByKey(name)
    	if err != nil {
    		return nil, err
    	}
    	if !exists {
    		return nil, errors.NewNotFound(v1.Resource("ecsbind"), name)
    	}
    	return obj.(*ecsv1.EcsBinding), nil
    }
    

    实现一个controller

    controller所依赖的各个组件都已经实现完毕,现在可以实现这个crd的controller,完整的实现不展示,大致跟上一篇NodeController类似。仅展示他的字段和构造函数

    type EcsBindingController struct {
    	kubeClient       *versiond.Clientset
    	clusterName      string
    	ecsbingdingLister       list_and_watch.EcsBindingLister
    	ecsbingdingListerSynced cache.InformerSynced
    	broadcaster      record.EventBroadcaster
    	recorder         record.EventRecorder
    
    	ecsQueue      workqueue.DelayingInterface 
    	lock          sync.Mutex
    }
    
    func NewEcsBindingController(kubeclient  *versiond.Clientset,informer list_and_watch.EcsBindingInformer,clusterName string)*EcsBindingController  {
    	eventBroadcaster := record.NewBroadcaster()
    	eventBroadcaster.StartLogging(glog.Infof)
    	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ecsbinding_controller"})
    
    	ec:= &EcsBindingController{
    		kubeClient:kubeclient,
    		clusterName:clusterName,
    		ecsbingdingLister:informer.Lister(),
    		ecsbingdingListerSynced:informer.Informer().HasSynced,
    		broadcaster:eventBroadcaster,
    		recorder:recorder,
    
    		ecsQueue:workqueue.NewNamedDelayingQueue("EcsBinding"),
    	}
    
    	informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc: func(obj interface{}) {
    			ecsbindingg:=obj.(*ecsV1.EcsBinding)
    			fmt.Printf("controller: Add event, ecsbinding [%s]
    ",ecsbindingg.Name)
    			ec.syncEcsbinding(ecsbindingg)
    		},
    		UpdateFunc: func(oldObj, newObj interface{}) {
    			ecs1,ok1:=oldObj.(*ecsV1.EcsBinding)
    			ecs2,ok2:=newObj.(*ecsV1.EcsBinding)
    			if ok1&&ok2 && !reflect.DeepEqual(ecs1,ecs2){
    				fmt.Printf("controller: Update event, ecsbinding [%s]
    ",ecs1.Name)
    				ec.syncEcsbinding(ecs1)
    			}
    		},
    		DeleteFunc: func(obj interface{}) {
    			ecsbindingg:=obj.(*ecsV1.EcsBinding)
    			fmt.Printf("controller: Delete event, ecsbinding [%s]
    ",ecsbindingg.Name)
    			ec.syncEcsbinding(ecsbindingg)
    		},
    	})
    
    	return ec
    }
    

    最后把controller加到controller的Start方法中,统一启动

    	demoCli, _ := versiond.NewForConfig(cfg)
    
    	ecsbindFactory := ecsbindInformer.NewSharedInformerFactory(demoCli, 0)
    	ecsBindingInformer := ecsbindFactory.EcsBind().V1().EcsBinding()
    	ec := controller.NewEcsBindingController(demoCli, ecsBindingInformer, "k8s-cluster")
    	go ec.Run(stopCh)
    

    小结

    本篇虽然是说定义个CRD的controller,然而却把更多的篇幅放到的controller外的一些组件上:api,client,informer。但正事如此自己编码过一次,才会加深印象,后续在查看K8S源码时遇到controller的源码抠出其核心逻辑,通过client去翻查api地址,才会快速上手。本篇的目的也就如此。

    参考

    client-go源码分析--informer机制流程分析
    kubernetes client-go解析
    深入浅出kubernetes之client-go的SharedInformer

  • 相关阅读:
    Linux自动批量增加公钥
    主机存活监控
    [Linux小技巧] 将 rm 命令删除的文件放在回收站
    Linux常见问题及命令
    数据分析职位招聘情况及发展前景分析
    SQL查询小案例
    Oracle查看表结构
    前端JSON请求转换Date问题
    Centos7最小化安装
    拓词和扇贝有何不同
  • 原文地址:https://www.cnblogs.com/HopeGi/p/15319182.html
Copyright © 2011-2022 走看看