cat<<EOF >informer.go package main import ( "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) const ( NamespaceIndexName = "namespace" NodeNameIndexName = "nodeName" ) func NamespaceIndexFunc(obj interface{}) ([]string, error) { m, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{m.GetNamespace()}, nil } func NodeNameIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{}, nil } return []string{pod.Spec.NodeName}, nil } func main() { index := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ NamespaceIndexName: NamespaceIndexFunc, NodeNameIndexName: NodeNameIndexFunc, }) pod1 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-1", Namespace: "default", }, Spec: v1.PodSpec{NodeName: "node1"}, } pod2 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-2", Namespace: "default", }, Spec: v1.PodSpec{NodeName: "node2"}, } pod3 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-3", Namespace: "kube-system", }, Spec: v1.PodSpec{NodeName: "node2"}, } _ = index.Add(pod1) _ = index.Add(pod2) _ = index.Add(pod3) // ByIndex 两个参数:IndexName(索引器名称)和 indexKey(需要检索的key) pods, err := index.ByIndex(NamespaceIndexName, "default") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) } fmt.Println("==========================") pods, err = index.ByIndex(NodeNameIndexName, "node2") if err != nil { panic(err) } for _, pod := range pods { fmt.Println(pod.(*v1.Pod).Name) } } EOF
root@ubuntu:~/go_learn/informer# go mod vendor root@ubuntu:~/go_learn/informer# go build -o informer . root@ubuntu:~/go_learn/informer# ./informer index-pod-1 index-pod-2 ========================== index-pod-2 index-pod-3
在上面的示例中首先通过 NewIndexer 函数实例化 Indexer 对象,第一个参数就是用于计算资源对象键的函数,这里我们使用的是 MetaNamespaceKeyFunc
这个默认的对象键函数;第二个参数是 Indexers,也就是存储索引器,上面我们知道 Indexers
的定义为 map[string]IndexFunc
,为什么要定义成一个 map 呢?我们可以类比数据库中,我们要查询某项数据,索引的方式是不是多种多样啊?为了扩展,Kubernetes 中就使用一个 map 来存储各种各样的存储索引器,至于存储索引器如何生成,就使用一个 IndexFunc
暴露出去,给使用者自己实现即可。
这里我们定义的了两个索引键生成函数:NamespaceIndexFunc
与 NodeNameIndexFunc
,一个根据资源对象的命名空间来进行索引,一个根据资源对象所在的节点进行索引。然后定义了3个 Pod,前两个在 default 命名空间下面,另外一个在 kube-system 命名空间下面,然后通过 index.Add
函数添加这3个 Pod 资源对象。然后通过 index.ByIndex
函数查询在名为 namespace
的索引器下面匹配索引键为 default
的 Pod 列表。也就是查询 default 这个命名空间下面的所有 Pod,这里就是前两个定义的 Pod。
对上面的示例如果我们理解了,那么就很容易理解上面定义的4个数据结构了:
- IndexFunc:索引器函数,用于计算一个资源对象的索引值列表,上面示例是指定命名空间为索引值结果,当然我们也可以根据需求定义其他的,比如根据 Label 标签、Annotation 等属性来生成索引值列表。
- Index:存储数据,对于上面的示例,我们要查找某个命名空间下面的 Pod,那就要让 Pod 按照其命名空间进行索引,对应的 Index 类型就是
map[namespace]sets.pod
。 - Indexers:存储索引器,key 为索引器名称,value 为索引器的实现函数,上面的示例就是
map["namespace"]MetaNamespaceIndexFunc
。 - Indices:存储缓存器,key 为索引器名称,value 为缓存的数据,对于上面的示例就是
map["namespace"]map[namespace]sets.pod
。
可能最容易混淆的是 Indexers 和 Indices 这两个概念,因为平时很多时候我们没有怎么区分二者的关系,这里我们可以这样理解:Indexers 是存储索引(生成索引键)的,Indices 里面是存储的真正的数据(对象键),这样可能更好理解。
按照上面的理解我们可以得到上面示例的索引数据如下所示:
// Indexers 就是包含的所有索引器(分类)以及对应实现 Indexers: { "namespace": NamespaceIndexFunc, "nodeName": NodeNameIndexFunc, } // Indices 就是包含的所有索引分类中所有的索引数据 Indices: { "namespace": { //namespace 这个索引分类下的所有索引数据 "default": ["pod-1", "pod-2"], // Index 就是一个索引键下所有的对象键列表 "kube-system": ["pod-3"] // Index }, "nodeName": { //nodeName 这个索引分类下的所有索引数据(对象键列表) "node1": ["pod-1"], // Index "node2": ["pod-2", "pod-3"] // Index } }
root@ubuntu:~/go_learn/informer# go build -o informer . root@ubuntu:~/go_learn/informer# ./informer key is default/index-pod-1 key2 is default/index-pod-1 root@ubuntu:~/go_learn/informer# cat informer.go package main import ( "fmt" v1 "k8s.io/api/core/v1" //"k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) func main() { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) pod1 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-1", Namespace: "default", }, Spec: v1.PodSpec{NodeName: "node1"}, } key, err := cache.MetaNamespaceKeyFunc(pod1) if err == nil { fmt.Println("key is ", key) queue.Add(key) } key2, err2 := queue.Get() if err2 { fmt.Println("quit") } else { fmt.Println("key2 is ", key2) } }
root@ubuntu:~/go_learn/informer# ./informer -kubeconfig=$HOME/.kube/config key is default/index-pod-1 key2 is default/index-pod-1
root@ubuntu:~/go_learn/informer# cat informer.go package main import ( "fmt" "flag" "time" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" klog "k8s.io/klog/v2" "k8s.io/client-go/informers" v1 "k8s.io/api/core/v1" //"k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/util/logs" ) var kubeconfig string func init() { flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") } func main() { flag.Parse() logs.InitLogs() defer logs.FlushLogs() config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatal(err) } factory := informers.NewSharedInformerFactory(clientset, time.Hour*24) queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) pod1 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-1", Namespace: "default", }, Spec: v1.PodSpec{NodeName: "node1"}, } key, err := cache.MetaNamespaceKeyFunc(pod1) if err == nil { fmt.Println("key is ", key) queue.Add(key) } key2, err2 := queue.Get() if err2 { fmt.Println("quit") } else { fmt.Println("key2 is ", key2) } informer := factory.Core().V1().Pods().Informer() stop := make(chan struct{}) go informer.Run(stop) pod,_,_:= informer.GetStore().GetByKey(key2.(string)) fmt.Println(pod.(*v1.Pod).Name) }
root@ubuntu:~/go_learn/informer# ./informer -kubeconfig=$HOME/.kube/config key is default/index-pod-1 key2 is default/index-pod-1 panic: interface conversion: interface {} is nil, not *v1.Pod goroutine 1 [running]: main.main() /root/go_learn/informer/informer.go:65 +0x7e8
root@ubuntu:~/go_learn/informer# go build -o informer . root@ubuntu:~/go_learn/informer# ./informer key is default/index-pod-1 key2 is default/index-pod-1 index-pod-1 root@ubuntu:~/go_learn/informer# cat informer.go package main import ( "fmt" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) func main() { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) pod1 := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "index-pod-1", Namespace: "default", }, Spec: v1.PodSpec{NodeName: "node1"}, } key, err := cache.MetaNamespaceKeyFunc(pod1) if err == nil { fmt.Println("key is ", key) queue.Add(key) _ = indexer.Add(pod1) } key2, err2 := queue.Get() if err2 { fmt.Println("quit") } else { fmt.Println("key2 is ", key2) } pod,_,_ := indexer.GetByKey(key2.(string)) fmt.Println(pod.(*v1.Pod).Name) }
workqueue
https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go