zoukankan      html  css  js  c++  java
  • kubernetes CRD学习笔记

    前言

    最近在极客时间订阅了kubernetes的专栏,这篇文章是想记录一下自己学习CRD(custom resource definition)的过程,加深一下记忆。

    准备工作

    首先安装一下我们用的go依赖:

    cd $GOPATH/src/
    mkdir resouer
    git clone https://github.com/resouer/k8s-controller-custom-resource.git
    cd k8s-controller-custom-resource
    godep restore
    

    然后参照github地址,我自己也照着抄了一个,这是我的那个git地址,没有注释,加深一下印象,以下代码还有目录都是我自己创建的,创建目录和文件如下(当前目录):

    hongzhi:k8s-crd hongzhi.wang$ tree
    .
    ├── controller.go
    ├── crd
    │   └── network.yaml
    ├── example
    │   └── example-network.yaml
    ├── main.go
    └── pkg
        └── apis
            └── samplecrd
                ├── register.go
                └── v1
                    ├── doc.go
                    ├── register.go
                    └── types.go
    

    并且参考原文填写这些文件,接着安装

    go get -u k8s.io/code-generator
    cd $GOPATH/src/k8s.io/code-generator
    godep restore
    

    然后生成代码

    ROOT_PACKAGE="k8s-crd"
    CUSTOM_RESOURCE_NAME="samplecrd"
    CUSTOM_RESOURCE_VERSION="v1"
    cd $GOPATH/src/k8s.io/code-generator
    ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"
    

    脚本运行结果:

    hongzhi:code-generator hongzhi.wang$ ./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"
    Generating deepcopy funcs
    Generating clientset for samplecrd:v1 at k8s-crd/pkg/client/clientset
    Generating listers for samplecrd:v1 at k8s-crd/pkg/client/listers
    Generating informers for samplecrd:v1 at k8s-crd/pkg/client/informers
    

    之后代码如下:(自己的types.go:37行的代码注释写错了,然后在生成代码之后,goland 只有 networklist 提示networklist 不是 runtime.Object 类型,而 network 在生成代码之前也提示了,生成之后错误提示就消失了。我仔细看了一下 runtime.Object 是一个 interface 类型,在 k8s.io/apimachinery/pkg/runtime 的目录下的 interface.go 文件中,goland 报错应该是没生成代码之前我们自定义的 type 没有完全实现 runtime.Object 这个接口。)

    hongzhi:k8s-crd hongzhi.wang$ tree
    .
    ├── controller.go
    ├── crd
    │   └── network.yaml
    ├── example
    │   └── example-network.yaml
    ├── main.go
    └── pkg
        ├── apis
        │   └── samplecrd
        │       ├── register.go
        │       └── v1
        │           ├── doc.go
        │           ├── register.go
        │           ├── types.go
        │           └── zz_generated.deepcopy.go
        ├── client
        │   ├── clientset
        │   │   └── versioned
        │   │       ├── clientset.go
        │   │       ├── doc.go
        │   │       ├── fake
        │   │       │   ├── clientset_generated.go
        │   │       │   ├── doc.go
        │   │       │   └── register.go
        │   │       ├── scheme
        │   │       │   ├── doc.go
        │   │       │   └── register.go
        │   │       └── typed
        │   │           └── samplecrd
        │   │               └── v1
        │   │                   ├── doc.go
        │   │                   ├── fake
        │   │                   │   ├── doc.go
        │   │                   │   ├── fake_network.go
        │   │                   │   └── fake_samplecrd_client.go
        │   │                   ├── generated_expansion.go
        │   │                   ├── network.go
        │   │                   └── samplecrd_client.go
        │   ├── informers
        │   │   └── externalversions
        │   │       ├── factory.go
        │   │       ├── generic.go
        │   │       ├── internalinterfaces
        │   │       │   └── factory_interfaces.go
        │   │       └── samplecrd
        │   │           ├── interface.go
        │   │           └── v1
        │   │               ├── interface.go
        │   │               └── network.go
        │   └── listers
        │       └── samplecrd
        │           └── v1
        │               ├── expansion_generated.go
        │               └── network.go
        └── signals
            ├── signal.go
            ├── signal_posix.go
            └── signal_windows.go
    
    23 directories, 31 files
    

    signals 目录和里面的内容需要自己创建。接下来创建crd和crd对象

    cd $GOPATH/src/k8s-crd
    hongzhi:k8s-crd hongzhi.wang$ kubectl apply -f crd/network.yaml
    customresourcedefinition.apiextensions.k8s.io/networks.samplecrd.k8s.io created
    
    hongzhi:k8s-crd hongzhi.wang$ kubectl apply -f example/example-network.yaml
    network.samplecrd.k8s.io/example-network created
    
    hongzhi:k8s-crd hongzhi.wang$ kubectl get crd
    NAME                        CREATED AT
    networks.samplecrd.k8s.io   2018-10-22T07:17:56Z
    hongzhi:k8s-crd hongzhi.wang$ kubectl get network
    NAME              AGE
    example-network   18s
    

    整个 cunstom controller 的流程图(张磊在AS深圳2018分享ppt中的图片):

    cunstom controller流程图

    接下来分析一下 main.go 和 controller.go :
    main.go 中首先需要 kube apiserver 的配置,连上我们k8s集群,然后就是主要的几行代码

    	networkInformerFactory := informers.NewSharedInformerFactory(networkClient, time.Second*30)
    	controller := NewController( kubeClient, networkClient,
    		networkInformerFactory.Smaplecrd().V1().Networks())
    	go networkInformerFactory.Start(stopCh)
    	if err = controller.Run(2, stopCh); err != nil{
    		glog.Fatalf("Error running controller: %s", err.Error())
    	}
    

    informers.NewSharedInformerFactory 这个函数返回的是SharedInformerFactory这个interface
    然后controller := NewController( kubeClient, networkClient,networkInformerFactory.Smaplecrd().V1().Networks()),这里面主要是把针对这个 crd的增,删,改注册到 eventhandler 中。

    这里需要说下,这个增删改是针对 reflector 从 apiserver 接到事件的时候发出的,time.Second*30这个时间是 local cache 和 apiserver 的定时同步时间,同步完成会触发 updateFunc ,这个 update 操作是对所有的network对象的,这时需要对比一下r esource version 如果一样就不需要进入 workqueue。这个增删改正常只是 apiserver 接到请求后传给 reflector 的,比如我们通过 kubectl 删了一个 network 对象,这个删除的操作会通过 apiserver 然后触发 DeleteFunc。

    然后调用 SharedInformerFactory 这个 interface 的 Start 方法,其实这个只是启动 reflector 监听 apiserver 中n etwork 对象的变化。
    然后调用 func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) 方法启动 controller 。

    for i:=0 ; i < threadiness; i++ {
    		go wait.Until(c.runWorker, time.Second, stopCh)
    	}
    

    这是要启动 threadiness 个 go 的协程,来进行 reconcile loop。其中期望状态是从 cache 中获取的,实际状态是从 k8s 集群是实时获取的。整个流程是 apiserver 接到请求(增删改),然后发给 reflector ,reflector 调用对应的方法,把这个 key(这个 key 就是 namespace/name 的字符串)加到 workqueue 中,然后根据对应的请求操作 cache ,reconcile loop 从 workqueue 拿到这个 key ,然后从 cache 中取这个 key 如果报 notfound error,reconcile loop 接着就会从实际的集群中删除这个 network 对象。如果 cache 中有这个 key ,那么就看实际状态有没有,没有就创建,有就对比,不一样就更新。

    更新于20201015

    resync 不是 informer 和 apiserver 去定期通讯,而是 informer 同步最近一次 relist 的结果。之后搞懂了这块会专门写一篇关于 informer resync 的文章。

    参考代码

    main.go
    
    package main
    
    import (
    	clientset "k8s-crd/pkg/client/clientset/versioned"
    	informers "k8s-crd/pkg/client/informers/externalversions"
    	"flag"
    	"k8s-crd/pkg/signals"
    	"k8s.io/client-go/tools/clientcmd"
    	"github.com/golang/glog"
    	"k8s.io/client-go/kubernetes"
    	"time"
    )
    
    var (
    	masterURL string
    	kubeconfig string
    )
    
    func main()  {
    	flag.Parse()
    	stopCh := signals.SetupSignalHandler()
    
    	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    	if err != nil {
    		glog.Fatalf("Error building kubecofnig: %s", err.Error())
    	}
    
    	kubeClient, err := kubernetes.NewForConfig(cfg)
    	if err != nil {
    		glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    	}
    
    	networkClient, err := clientset.NewForConfig(cfg)
    	if err != nil {
    		glog.Fatalf("Error building example clientset: %s", err.Error())
    	}
    
    	networkInformerFactory := informers.NewSharedInformerFactory(networkClient, time.Second*30)
    
    	controller := NewController( kubeClient, networkClient,
    		networkInformerFactory.Smaplecrd().V1().Networks())
    
    	go networkInformerFactory.Start(stopCh)
    
    	if err = controller.Run(2, stopCh); err != nil{
    		glog.Fatalf("Error running controller: %s", err.Error())
    	}
    }
    
    func init()  {
    	flag.StringVar(&kubeconfig, "kubeconfig","","Path to kubeconfig")
    	flag.StringVar(&masterURL, "master", "","The address of the kubernetes api server")
    }
    
    
    
    controller.go
    
    package main
    
    
    import (
    	corev1 "k8s.io/api/core/v1"
    
    	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    	samplecrdv1 "k8s-crd/pkg/apis/samplecrd/v1"
    	clientset "k8s-crd/pkg/client/clientset/versioned"
    	networkscheme "k8s-crd/pkg/client/clientset/versioned/scheme"
    	informers "k8s-crd/pkg/client/informers/externalversions/samplecrd/v1"
    	listers "k8s-crd/pkg/client/listers/samplecrd/v1"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/util/workqueue"
    	"k8s.io/client-go/tools/record"
    	"k8s.io/client-go/kubernetes/scheme"
    	"github.com/golang/glog"
    	"github.com/contrib/service-loadbalancer/Godeps/_workspace/src/k8s.io/kubernetes/pkg/util/runtime"
    	"fmt"
    	"k8s.io/apimachinery/pkg/util/wait"
    
    	"time"
    	"k8s.io/apimachinery/pkg/api/errors"
    )
    
    const controllerAgentName  = "network-controller"
    
    const (
    	SuccessSynced = "Synced"
    	MessageResourceSynced = "Network synced successfully"
    )
    
    type Controller struct {
    	kubeclientset kubernetes.Interface
    	networkclientset clientset.Interface
    
    	networksLister listers.NetworkLister
    	networksSynced cache.InformerSynced
    
    	workqueue workqueue.RateLimitingInterface
    
    	recorder record.EventRecorder
    }
    
    func NewController(
    	kubeclientset kubernetes.Interface,
    	networkclientset clientset.Interface,
    	networkInformer informers.NetworkInformer) *Controller  {
    
    
    	utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
    	glog.V(4).Info("Creating event broadcaster")
    	eventBroadcaster := record.NewBroadcaster()
    	eventBroadcaster.StartLogging(glog.Infof)
    	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface:kubeclientset.CoreV1().Events("")})
    	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
    
    	controller := &Controller{
    		kubeclientset: kubeclientset,
    		networkclientset: networkclientset,
    		networksLister: networkInformer.Lister(),
    		networksSynced: networkInformer.Informer().HasSynced,
    		workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"Networks"),
    		recorder: recorder,
    	}
    
    	glog.Info("Setting up event handlers")
    	networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc: controller.enqueueNetwork,
    		UpdateFunc: func(old, new interface{}) {
    			oldNetwork := old.(*samplecrdv1.Network)
    			newNetwork := new.(*samplecrdv1.Network)
    			if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
    				return
    			}
    			controller.enqueueNetwork(new)
    		},
    		DeleteFunc: controller.enqueueNetworkForDelete,
    	})
    	return controller
    }
    
    func (c *Controller) Run(threadiness int, stopCh <- chan struct{}) error {
    	defer runtime.HandleCrash()
    	defer c.workqueue.ShutDown()
    
    	glog.Info("Starting Network control loop")
    
    	glog.Info("Waiting for informer caches to sync")
    	if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {
    		return fmt.Errorf("failed to wait for caches to sync")
    	}
    
    	glog.Info("Starting workers")
    	for i:=0 ; i < threadiness; i++ {
    		go wait.Until(c.runWorker, time.Second, stopCh)
    	}
    
    	glog.Info("Started workers")
    	<-stopCh
    	glog.Info("shutting down workers")
    
    	return nil
    }
    
    func (c *Controller) runWorker()  {
    	for c.processNextWorkItem(){
    
    	}
    }
    
    func (c *Controller) processNextWorkItem() bool {
    	obj, shutdown := c.workqueue.Get()
    	if shutdown {
    		return false
    	}
    
    	err := func(obj interface{}) error {
    		defer c.workqueue.Done(obj)
    		var key string
    		var ok bool
    
    		if key,ok = obj.(string); !ok {
    			c.workqueue.Forget(obj)
    			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
    			return nil
    		}
    
    		if err := c.syncHandler(key); err != nil {
    			return fmt.Errorf("error syncing '%s' : %s", key, err.Error())
    		}
    
    		c.workqueue.Forget(obj)
    		glog.Infof("Successfully synced '%s'", key)
    		return nil
    	}(obj)
    	if err != nil {
    		runtime.HandleError(err)
    		return true
    	}
    	return true
    }
    
    
    func (c *Controller) syncHandler(key string) error  {
    	namespace, name , err := cache.SplitMetaNamespaceKey(key)
    	if err != nil {
    		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
    		return nil
    	}
    	network, err := c.networksLister.Networks(namespace).Get(name)
    	if err != nil {
    		if errors.IsNotFound(err){
    			glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",
    				namespace, name)
    			glog.Infof("[Neutron] Deleting network: %s/%s ...", namespace, name)
    			return nil
    		}
    		runtime.HandleError(fmt.Errorf("failed to list network by : %s/%s", namespace, name))
    		return nil
    	}
    	glog.Infof("[Neutron] Try to process network: %#v ...", network)
    
    	c.recorder.Event(network, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    	return nil
    }
    
    
    func (c *Controller) enqueueNetwork(obj interface{})  {
    	var key string
    	var err error
    	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil{
    		runtime.HandleError(err)
    		return
    	}
    	c.workqueue.AddRateLimited(key)
    }
    
    
    func (c *Controller) enqueueNetworkForDelete(obj interface{})  {
    	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
    	if err != nil {
    		runtime.HandleError(err)
    		return
    	}
    	c.workqueue.AddRateLimited(key)
    }
    
    
  • 相关阅读:
    SQL数据库一直显示正在还原
    jQuery获取display为none的隐藏元素的宽度和高度的解决方案
    火狐打开新标签页面不出现九宫格的设置
    【转】在C#中?,?:和??
    【转】JS字符(字母)与ASCII码转换方法
    如何为 .NET Core 安装本地化的 IntelliSense 文件
    compass typography 排版 常用排版方法[Sass和compass学习笔记]
    单元测试 逃不开的Done 与约定
    SASS+COMPASS 自适应 学习笔记
    compass tables 表格 表格常见样式[Sass和compass学习笔记]
  • 原文地址:https://www.cnblogs.com/WisWang/p/9843990.html
Copyright © 2011-2022 走看看