zoukankan      html  css  js  c++  java
  • kurbenetes中的informer使用方法

    informer用途

    获取 kubernetes 中某个资源,同步k8s中的数据到本地缓存,并watch各种资源变化,触发相应的eventHandler.

    1. 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用
    2. 在一些自定义 controller 中使用,比如 operator 的开发

    informer定义

    informers是一个代码库,实现了一种类似通知的功能,k8s.io/client-go/informers,Informer 是 client-go 中的核心工具包。

    informer用法

    作为 client 的使用示例

    package main
    
    import (
        "flag"
        "fmt"
        "log"
        "path/filepath"
    
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/labels"
        "k8s.io/apimachinery/pkg/util/runtime"
    
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/clientcmd"
        "k8s.io/client-go/util/homedir"
    )
    
    func main() {
        var kubeconfig *string
        if home := homedir.HomeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
        } else {
            kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
        }
        flag.Parse()
    
        config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
        if err != nil {
            panic(err)
        }
    
        // 初始化 client
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            log.Panic(err.Error())
        }
    
        stopper := make(chan struct{})
        defer close(stopper)
        
        // 初始化 informer
        factory := informers.NewSharedInformerFactory(clientset, 0)
        nodeInformer := factory.Core().V1().Nodes()
        informer := nodeInformer.Informer()
        defer runtime.HandleCrash()
        
        // 启动 informer,list & watch
        go factory.Start(stopper)
        
        // 从 apiserver 同步资源,必不可少
        if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
            runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
            return
        }
    
        // 使用自定义 handler
        informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    onAdd,
            UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
            DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
        })
        
        // 创建 lister
        nodeLister := nodeInformer.Lister()
        // 从 lister 中获取所有 items
        nodeList, err := nodeLister.List(labels.Everything())
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println("nodelist:", nodeList)
        <-stopper
    }
    
    func onAdd(obj interface{}) {
        node := obj.(*corev1.Node)
        fmt.Println("add a node:", node.Name)
    }
    

    另外,还可以使用带消息队列的处理方法。如下所示,监测pod上的annotation标签。如果带宽注解变化则调用接口设置新的带宽:

    func main() {
    ...
        stopChan := make(chan struct{})
        go NewPodInformer().Run(1, stopChan)
    ...
    }
    
    func NewPodInformer() *PodInformer {
    	k8sclient := k8smgmt.GetK8sAPIClient()
    	if k8sclient == nil {
    		log.Fatal("k8s client can not be null")		
    	}
    
    	factory := informers.NewSharedInformerFactory(k8sclient.Client, 0)
    	podif := &PodInformer{
    		kubeClient:       k8sclient,
    		informerFactory:       factory,
    		informer:      factory.Core().V1().Pods().Informer(),
    		lister:        factory.Core().V1().Pods().Lister(),
    		listerSynced:  factory.Core().V1().Pods().Informer().HasSynced,
    		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "podUpdateQueue"),
    	}
    
    	// use customized handler
    	podif.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    		AddFunc:    podif.addPod,
    		UpdateFunc: podif.updatePod,
    		DeleteFunc: podif.deletePod,
    	})
    
    	podif.syncHandler = podif.syncPod
    
    	// create lister, you can get pod from the lister
    	cacher.SetPodLister(podif.lister)
    	return podif
    }
    
    // Run begins watching and syncing.
    func (podinf *PodInformer) Run(workers int, stopCh <-chan struct{}) {
    	defer runtime.HandleCrash()
    	defer podinf.queue.ShutDown()
    
    	log.Infof("Starting sync pod bandwidth...")
    	defer log.Infof("Shutting down sync pod bandwidth")
    
    	go podinf.informerFactory.Start(stopCh)
    	if !WaitForCacheSync("pod", stopCh, podinf.listerSynced) {
    		return
    	}
    
    	for i := 0; i < workers; i++ {
    		// Why does it not matter if I change the period parameter?
    		go wait.Until(podinf.worker, time.Second, stopCh)
    	}
    
    	<-stopCh
    }
    
    // WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
    // indicating that the controller identified by controllerName is waiting for syncs, followed by
    // either a successful or failed sync.
    func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
    	klog.Infof("Waiting for caches to sync for %s controller", controllerName)
    
    	if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
    		runtime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName))
    		return false
    	}
    
    	klog.Infof("Caches are synced for %s controller", controllerName)
    	return true
    }
    
    // worker runs a worker thread that just dequeues items, processes them, and marks them done.
    // It enforces that the syncHandler is never invoked concurrently with the same key.
    func (podinf *PodInformer) worker() {
    	for podinf.processNextWorkItem() {
    	}
    }
    
    func (podinf *PodInformer) processNextWorkItem() bool {
    	key, quit := podinf.queue.Get()
    	if quit {
    		return false
    	}
    	defer podinf.queue.Done(key)
    
    	err := podinf.syncHandler(key.(string))
    	if err == nil {
    		podinf.queue.Forget(key)
    		return true
    	}
    
    	runtime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    	podinf.queue.AddRateLimited(key)
    
    	return true
    }
    
    // syncPod will sync the Pod with the given key if it has had its expectations fulfilled,
    // This function is not meant to be invoked concurrently with the same key.
    func (podinf *PodInformer) syncPod(key string) error {
    	startTime := time.Now()
    	defer func() {
    		log.Infof("Finished syncing pod %s bandwidth (%v)", key, time.Since(startTime))
    	}()
    
    	namespace, name, err := cacher.SplitPodKey(key)
    	if err != nil {
    		log.Errorf("fail to get pod key: %v", err)
    		return err
    	}
    
    	pod, err := podinf.lister.Pods(namespace).Get(name)
    	if errors.IsNotFound(err) {
    		log.Infof("pod %v has been deleted", key)
    		return nil
    	}
    	if err != nil {
    		return err
    	}
    
    	// Always updates pod bandwidth as pods come up or die.
    	bwinfo, err := cacher.GetPodBandwidth(pod)
    	if err != nil {
    		return err
    	}
    	if bwinfo != nil {
    		// set new bandwidth...
    	}
    	return nil
    }
    
    func (podinf *PodInformer) updatePod(old interface{}, cur interface{}) {
    	if k8sutils.IsLeader() == false {
    		return
    	}
    	enabelBw := os.Getenv(VOYAGE_SERVER_ENABLE_PODLIMIT)
    	if strings.ToLower(enabelBw) != "true" {
    		return
    	}
    	curPod := cur.(*v1.Pod)
    	oldPod := old.(*v1.Pod)
    	if curPod.ResourceVersion == oldPod.ResourceVersion {
    		// Periodic resync will send update events for all known pods.
    		// Two different versions of the same pod will always have different RVs.
    		return
    	}
    
    	podid := cacher.CaculateID(curPod.Namespace, curPod.Name)
    	bwinfo, err := cacher.GetUpdatedBandwidth(oldPod, curPod)
    	if err != nil {
    		log.WithField("pod-update-inform", podid).Errorf("fail to handle update pod: %v", err)
    		return
    	} else if bwinfo != nil {
    		log.Infof("enqueue update pod bandwidth event %s: %v", podid, bwinfo)
    		podinf.enqueuePod(podid)
    	}
    	return
    }
    
    // obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
    func (podinf *PodInformer) enqueuePod(key string) {
    	podinf.queue.Add(key)
    }
    
    
  • 相关阅读:
    Android 下拉刷新上啦加载SmartRefreshLayout + RecyclerView
    Maven 本地打war包
    数据库性能优化一
    SessionStateMode之Redis共享session
    SessionStateMode之SQL Server共享session
    iframe跨域
    Jenkins发布MVC应用程序
    Docker入门之一Docker在Window下安装
    Window下SVN服务器搭建以及客户端使用
    Window下Jenkins的安装
  • 原文地址:https://www.cnblogs.com/janeysj/p/12957892.html
Copyright © 2011-2022 走看看