  • DeltaFIFO reflector

    // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
    // items. See also the comment on DeltaFIFO.
    func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
        if opts.KeyFunction == nil {
            opts.KeyFunction = MetaNamespaceKeyFunc
        f := &DeltaFIFO{
            items:        map[string]Deltas{},
            queue:        []string{},
            keyFunc:      opts.KeyFunction,
            knownObjects: opts.KnownObjects,
            emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
        f.cond.L = &f.lock
        return f
    // Add inserts an item, and puts it in the queue. The item is only enqueued
    // if it doesn't already exist in the set.
    func (f *DeltaFIFO) Add(obj interface{}) error {
        defer f.lock.Unlock()
        f.populated = true
        return f.queueActionLocked(Added, obj)
    // Update is just like Add, but makes an Updated Delta.
    func (f *DeltaFIFO) Update(obj interface{}) error {
        defer f.lock.Unlock()
        f.populated = true
        return f.queueActionLocked(Updated, obj)
    // Delete is just like Add, but makes a Deleted Delta. If the given
    // object does not already exist, it will be ignored. (It may have
    // already been deleted by a Replace (re-list), for example.)  In this
    // method `f.knownObjects`, if not nil, provides (via GetByKey)
    // _additional_ objects that are considered to already exist.
    func (f *DeltaFIFO) Delete(obj interface{}) error {
        id, err := f.KeyOf(obj)
        if err != nil {
            return KeyError{obj, err}
        defer f.lock.Unlock()
        f.populated = true
        if f.knownObjects == nil {
            if _, exists := f.items[id]; !exists {
                // Presumably, this was deleted when a relist happened.
                // Don't provide a second report of the same deletion.
                return nil
        } else {
            // We only want to skip the "deletion" action if the object doesn't
            // exist in knownObjects and it doesn't have corresponding item in items.
            // Note that even if there is a "deletion" action in items, we can ignore it,
            // because it will be deduped automatically in "queueActionLocked"
            _, exists, err := f.knownObjects.GetByKey(id)
            _, itemsExist := f.items[id]
            if err == nil && !exists && !itemsExist {
                // Presumably, this was deleted when a relist happened.
                // Don't provide a second report of the same deletion.
                return nil
        // exist in items and/or KnownObjects
        return f.queueActionLocked(Deleted, obj)
    root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# cat main.go 
    package main
    import (
            corev1 "k8s.io/api/core/v1"
    // newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
    func newStore() cache.Store {
            return cache.NewStore(cache.MetaNamespaceKeyFunc)
    // newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
    // 注意在初始化时 store 作为 KnownObjects 参数传入其中,
    // 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
    // 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
    // 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
    func newQueue(store cache.Store) cache.Queue {
            return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
                    KnownObjects:          store,
                    EmitDeltaTypeReplaced: true,
    // newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
    // 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
    func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
            lw := newConfigMapsListerWatcher() // 前面有说明
            // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
            // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
            // 第 4 个参数是 resyncPeriod,
            // 这里传了 0,表示从不重新同步(除非连接超时或者中断),
            // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
            // 同步过程中会产生 SYNC 类型的事件。
            return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
    func main() {
            fmt.Println("----- 2-reflector -----")
            store := newStore()
            queue := newQueue(store)
            reflector := newConfigMapsReflector(queue)
            stopCh := make(chan struct{})
            defer close(stopCh)
            // reflector 开始运行后,队列中就会推入新收到的事件
            go reflector.Run(stopCh)
            // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
            // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
            processObj := func(obj interface{}) error {
                    // 最先收到的事件会被最先处理
                    for _, d := range obj.(cache.Deltas) {
                            switch d.Type {
                            case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                                    if _, exists, err := store.Get(d.Object); err == nil && exists {
                                            if err := store.Update(d.Object); err != nil {
                                                    return err
                                    } else {
                                            if err := store.Add(d.Object); err != nil {
                                                    return err
                            case cache.Deleted:
                                    if err := store.Delete(d.Object); err != nil {
                                            return err
                            configMap, ok := d.Object.(*corev1.ConfigMap)
                            if !ok {
                                    return fmt.Errorf("not config: %T", d.Object)
                            fmt.Printf("%s: %s
    ", d.Type, configMap.Name)
                    return nil
            fmt.Println("Start syncing...")
            // 持续运行直到 stopCh 关闭
            wait.Until(func() {
                    for {
                            _, err := queue.Pop(processObj)
            }, time.Second, stopCh)
    wget http://localhost:8001/api/v1/tmp/configmaps/watch
    kubectl create configmap -n tmp demo
    kubectl create namespace tmp
    kubectl delete configmaps -n tmp demo
    root@ubuntu:~# kubectl create configmap -n tmp demo
    configmap/demo created
    root@ubuntu:~# kubectl create configmap -n tmp demo2
    configmap/demo2 created
    root@ubuntu:~# kubectl delete configmap -n tmp demo2
    configmap "demo2" deleted
    root@ubuntu:~# kubectl delete configmap -n tmp demo
    configmap "demo" deleted
    root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# ./main --kubeconfig=$HOME/.kube/config 
    ----- 2-reflector -----
    Start syncing...
    Added: demo
    Added: demo2
    Deleted: demo2
    Deleted: demo

