zoukankan      html  css  js  c++  java
  • MetaNamespaceKeyFunc

    cat<<EOF >informer.go
    package main
    
    import (
     "fmt"
    
     v1 "k8s.io/api/core/v1"
     "k8s.io/apimachinery/pkg/api/meta"
     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
     "k8s.io/client-go/tools/cache"
    )
    
    const (
     NamespaceIndexName = "namespace"
     NodeNameIndexName  = "nodeName"
    )
    
    func NamespaceIndexFunc(obj interface{}) ([]string, error) {
     m, err := meta.Accessor(obj)
     if err != nil {
      return []string{""}, fmt.Errorf("object has no meta: %v", err)
     }
     return []string{m.GetNamespace()}, nil
    }
    
    func NodeNameIndexFunc(obj interface{}) ([]string, error) {
     pod, ok := obj.(*v1.Pod)
     if !ok {
      return []string{}, nil
     }
     return []string{pod.Spec.NodeName}, nil
    }
    
    func main() {
     index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
      NamespaceIndexName: NamespaceIndexFunc,
      NodeNameIndexName:  NodeNameIndexFunc,
     })
    
     pod1 := &v1.Pod{
      ObjectMeta: metav1.ObjectMeta{
       Name:      "index-pod-1",
       Namespace: "default",
      },
      Spec: v1.PodSpec{NodeName: "node1"},
     }
     pod2 := &v1.Pod{
      ObjectMeta: metav1.ObjectMeta{
       Name:      "index-pod-2",
       Namespace: "default",
      },
      Spec: v1.PodSpec{NodeName: "node2"},
     }
     pod3 := &v1.Pod{
      ObjectMeta: metav1.ObjectMeta{
       Name:      "index-pod-3",
       Namespace: "kube-system",
      },
      Spec: v1.PodSpec{NodeName: "node2"},
     }
    
     _ = index.Add(pod1)
     _ = index.Add(pod2)
     _ = index.Add(pod3)
    
     // ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key)
     pods, err := index.ByIndex(NamespaceIndexName, "default")
     if err != nil {
      panic(err)
     }
     for _, pod := range pods {
      fmt.Println(pod.(*v1.Pod).Name)
     }
    
     fmt.Println("==========================")
    
     pods, err = index.ByIndex(NodeNameIndexName, "node2")
     if err != nil {
      panic(err)
     }
     for _, pod := range pods {
      fmt.Println(pod.(*v1.Pod).Name)
     }
    
    }
    EOF
    root@ubuntu:~/go_learn/informer# go mod  vendor
    root@ubuntu:~/go_learn/informer# go build -o informer .
    root@ubuntu:~/go_learn/informer# ./informer 
    index-pod-1
    index-pod-2
    ==========================
    index-pod-2
    index-pod-3

    在上面的示例中首先通过 NewIndexer 函数实例化 Indexer 对象,第一个参数就是用于计算资源对象键的函数,这里我们使用的是 MetaNamespaceKeyFunc 这个默认的对象键函数;第二个参数是 Indexers,也就是存储索引器,上面我们知道 Indexers 的定义为 map[string]IndexFunc,为什么要定义成一个 map 呢?我们可以类比数据库中,我们要查询某项数据,索引的方式是不是多种多样啊?为了扩展,Kubernetes 中就使用一个 map 来存储各种各样的存储索引器,至于存储索引器如何生成,就使用一个 IndexFunc 暴露出去,给使用者自己实现即可。

    这里我们定义的了两个索引键生成函数:NamespaceIndexFunc 与 NodeNameIndexFunc,一个根据资源对象的命名空间来进行索引,一个根据资源对象所在的节点进行索引。然后定义了3个 Pod,前两个在 default 命名空间下面,另外一个在 kube-system 命名空间下面,然后通过 index.Add 函数添加这3个 Pod 资源对象。然后通过 index.ByIndex 函数查询在名为 namespace 的索引器下面匹配索引键为 default 的 Pod 列表。也就是查询 default 这个命名空间下面的所有 Pod,这里就是前两个定义的 Pod。

    对上面的示例如果我们理解了,那么就很容易理解上面定义的4个数据结构了:

    • IndexFunc:索引器函数,用于计算一个资源对象的索引值列表,上面示例是指定命名空间为索引值结果,当然我们也可以根据需求定义其他的,比如根据 Label 标签、Annotation 等属性来生成索引值列表。
    • Index:存储数据,对于上面的示例,我们要查找某个命名空间下面的 Pod,那就要让 Pod 按照其命名空间进行索引,对应的 Index 类型就是 map[namespace]sets.pod
    • Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数,上面的示例就是 map["namespace"]MetaNamespaceIndexFunc
    • Indices:存储缓存器,key 为索引器名称,value 为缓存的数据,对于上面的示例就是 map["namespace"]map[namespace]sets.pod

    可能最容易混淆的是 Indexers 和 Indices 这两个概念,因为平时很多时候我们没有怎么区分二者的关系,这里我们可以这样理解:Indexers 是存储索引(生成索引键)的,Indices 里面是存储的真正的数据(对象键),这样可能更好理解。

     

    按照上面的理解我们可以得到上面示例的索引数据如下所示:

    // Indexers 就是包含的所有索引器(分类)以及对应实现
    Indexers: {  
      "namespace": NamespaceIndexFunc,
      "nodeName": NodeNameIndexFunc,
    }
    // Indices 就是包含的所有索引分类中所有的索引数据
    Indices: {
     "namespace": {  //namespace 这个索引分类下的所有索引数据
      "default": ["pod-1", "pod-2"],  // Index 就是一个索引键下所有的对象键列表
      "kube-system": ["pod-3"]   // Index
     },
     "nodeName": {  //nodeName 这个索引分类下的所有索引数据(对象键列表)
      "node1": ["pod-1"],  // Index
      "node2": ["pod-2", "pod-3"]  // Index
     }
    }
    root@ubuntu:~/go_learn/informer# go build -o informer .
    root@ubuntu:~/go_learn/informer# ./informer 
    key is  default/index-pod-1
    key2 is  default/index-pod-1
    root@ubuntu:~/go_learn/informer# cat informer.go
    package main
    
    import (
     "fmt"
    
     v1 "k8s.io/api/core/v1"
     //"k8s.io/apimachinery/pkg/api/meta"
     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
     "k8s.io/client-go/tools/cache"
     "k8s.io/client-go/util/workqueue"
    )
    
    func main() {
        queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
        pod1 := &v1.Pod{
         ObjectMeta: metav1.ObjectMeta{
         Name:      "index-pod-1",
         Namespace: "default",
        },
        Spec: v1.PodSpec{NodeName: "node1"},
        }
         key, err := cache.MetaNamespaceKeyFunc(pod1)
       if err == nil {
        fmt.Println("key is ", key)
        queue.Add(key)
       }
    
       key2, err2  := queue.Get()
      if err2  {
        fmt.Println("quit")
       } else {
        fmt.Println("key2 is ", key2)
      }
    }
    root@ubuntu:~/go_learn/informer# ./informer  -kubeconfig=$HOME/.kube/config
    key is  default/index-pod-1
    key2 is  default/index-pod-1
    root@ubuntu:~/go_learn/informer# cat informer.go
    package main
    
    import (
     "fmt"
     "flag"
     "time"
     "k8s.io/client-go/kubernetes"
     "k8s.io/client-go/tools/clientcmd"
     klog "k8s.io/klog/v2"
     "k8s.io/client-go/informers"
     v1 "k8s.io/api/core/v1"
     //"k8s.io/apimachinery/pkg/api/meta"
     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
     "k8s.io/client-go/tools/cache"
     "k8s.io/client-go/util/workqueue"
     "k8s.io/kubectl/pkg/util/logs"
    )
    var kubeconfig string
    
    func init() {
        flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
    }
    
    
    func main() {
        flag.Parse()
        logs.InitLogs()
        defer logs.FlushLogs()
    
        config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
        if err != nil {
            panic(err.Error())
        }
    
        clientset, err := kubernetes.NewForConfig(config)
        if err != nil {
            klog.Fatal(err)
        }
    
        factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
        queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
        pod1 := &v1.Pod{
         ObjectMeta: metav1.ObjectMeta{
         Name:      "index-pod-1",
         Namespace: "default",
        },
        Spec: v1.PodSpec{NodeName: "node1"},
        }
         key, err := cache.MetaNamespaceKeyFunc(pod1)
       if err == nil {
        fmt.Println("key is ", key)
        queue.Add(key)
       }
    
       key2, err2  := queue.Get()
      if err2  {
        fmt.Println("quit")
       } else {
        fmt.Println("key2 is ", key2)
      }
      informer :=  factory.Core().V1().Pods().Informer()
      stop := make(chan struct{})
      go informer.Run(stop)
      pod,_,_:= informer.GetStore().GetByKey(key2.(string))
      fmt.Println(pod.(*v1.Pod).Name)
    }
    root@ubuntu:~/go_learn/informer# ./informer  -kubeconfig=$HOME/.kube/config
    key is  default/index-pod-1
    key2 is  default/index-pod-1
    panic: interface conversion: interface {} is nil, not *v1.Pod
    
    goroutine 1 [running]:
    main.main()
            /root/go_learn/informer/informer.go:65 +0x7e8
    root@ubuntu:~/go_learn/informer# go build -o informer . 
    root@ubuntu:~/go_learn/informer# ./informer 
    key is  default/index-pod-1
    key2 is  default/index-pod-1
    index-pod-1
    root@ubuntu:~/go_learn/informer# cat informer.go
    package main
    
    import (
     "fmt"
     v1 "k8s.io/api/core/v1"
     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
     "k8s.io/client-go/tools/cache"
     "k8s.io/client-go/util/workqueue"
    )
    
    
    func main() {
        queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
        indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
        pod1 := &v1.Pod{
         ObjectMeta: metav1.ObjectMeta{
         Name:      "index-pod-1",
         Namespace: "default",
        },
        Spec: v1.PodSpec{NodeName: "node1"},
        }
         key, err := cache.MetaNamespaceKeyFunc(pod1)
       if err == nil {
        fmt.Println("key is ", key)
        queue.Add(key)
        _ = indexer.Add(pod1)
       }
    
       key2, err2  := queue.Get()
      if err2  {
        fmt.Println("quit")
       } else {
        fmt.Println("key2 is ", key2)
      }
    
      pod,_,_ := indexer.GetByKey(key2.(string))
      fmt.Println(pod.(*v1.Pod).Name)
    }

    https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go

    What I learnt about Kubernetes Controllers

    client-go 之 Indexer 的理解

  • 相关阅读:
    Scoket简介
    AOP
    Windows服务
    Nginx 教程 (1):基本概念
    异步编程
    并发编程
    常用排序
    序列化
    MSBuild/Projectjson
    不汇报是职场发展的绊脚石
  • 原文地址:https://www.cnblogs.com/dream397/p/14986806.html
Copyright © 2011-2022 走看看