一、informer机制

根据流程图来解释一下 Informer 中几个组件的作用:
-
Reflector:称之为反射器,实现对 apiserver 指定类型对象的监控(ListAndWatch),其中反射实现的就是把监控的结果实例化成具体的对象,最终也是调用 Kubernetes 的 List/Watch API;
-
DeltaIFIFO Queue:一个增量队列,将 Reflector 监控变化的对象形成一个 FIFO 队列,此处的 Delta 就是变化;
-
LocalStore:就是 informer 的 cache,这里面缓存的是 apiserver 中的对象(其中有一部分可能还在DeltaFIFO 中),此时使用者再查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力,LocalStore 只会被 Lister 的 List/Get 方法访问。
-
WorkQueue:DeltaIFIFO 收到事件后会先将事件存储在自己的数据结构中,然后直接操作 Store 中存储的数据,更新完 store 后 DeltaIFIFO 会将该事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件会根据对应的类型触发对应的回调函数。
2、Informer 的工作流程
- Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到"事件通知",这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
- Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
- 如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
- DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
- 在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
- Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
- 在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。
Informer 在使用时需要先初始化一个 InformerFactory,目前主要推荐使用的是 SharedInformerFactory,Shared 指的是在多个 Informer 中共享一个本地 cache。
三、Informer 使用示例
在实际的开发工作中,Informer 主要用在两处:
- 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用;
- 在一些自定义 controller 中使用,比如 operator 的开发;
1、下面是一个作为 client 的使用示例:
package main
import (
"flag"
"fmt"
"log"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 启动 informer,list & watch
go factory.Start(stopper)
// 从 apiserver 同步资源,即 list
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定义 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此处省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
// 创建 lister
nodeLister := nodeInformer.Lister()
// 从 lister 中获取所有 items
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
fmt.Println(err)
}
fmt.Println("nodelist:", nodeList)
<-stopper
}
func onAdd(obj interface{}) {
node := obj.(*corev1.Node)
fmt.Println("add a node:", node.Name)
}
2、以下是作为 controller 使用的一个整体工作流程
(1) 创建一个控制器
- 为控制器创建 workqueue
- 创建 informer, 为 informer 添加 callback 函数,创建 lister
(2) 启动控制器
- 启动 informer
- 等待本地 cache sync 完成后, 启动 workers
(3) 当收到变更事件后,执行 callback
- 等待事件触发
- 从事件中获取变更的 Object
- 做一些必要的检查
- 生成 object key,一般是 namespace/name 的形式
- 将 key 放入 workqueue 中
(4) worker loop
- 等待从 workqueue 中获取到 item,一般为 object key
- 用 object key 通过 lister 从本地 cache 中获取到真正的 object 对象
- 做一些检查
- 执行真正的业务逻辑
- 处理下一个 item
下面是自定义 controller 使用的一个参考:
var (
masterURL string
kubeconfig string
)
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
func main() {
flag.Parse()
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
// 所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client
// informer watch apiserver,每隔 30 秒 resync 一次(list)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())
// 启动 informer
go kubeInformerFactory.Start(stopCh)
// start controller
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}
// NewController returns a new network controller
func NewController(
kubeclientset kubernetes.Interface,
networkclientset clientset.Interface,
networkInformer informers.NetworkInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
networkclientset: networkclientset,
networksLister: networkInformer.Lister(),
networksSynced: networkInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
recorder: recorder,
}
glog.Info("Setting up event handlers")
// Set up an event handler for when Network resources change
networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNetwork,
UpdateFunc: func(old, new interface{}) {
oldNetwork := old.(*samplecrdv1.Network)
newNetwork := new.(*samplecrdv1.Network)
if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
// Periodic resync will send update events for all known Networks.
// Two different versions of the same Network will always have different RVs.
return
}
controller.enqueueNetwork(new)
},
DeleteFunc: controller.enqueueNetworkForDelete,
})
return controller
}
自定义controller参考:https://github.com/resouer/k8s-controller-custom-resource