zoukankan      html  css  js  c++  java
  • DeltaFIFO reflector

    // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
    // items. See also the comment on DeltaFIFO.
    func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
        if opts.KeyFunction == nil {
            opts.KeyFunction = MetaNamespaceKeyFunc
        }
    
        f := &DeltaFIFO{
            items:        map[string]Deltas{},
            queue:        []string{},
            keyFunc:      opts.KeyFunction,
            knownObjects: opts.KnownObjects,
    
            emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
        }
        f.cond.L = &f.lock
        return f
    }
    // Add inserts an item, and puts it in the queue. The item is only enqueued
    // if it doesn't already exist in the set.
    func (f *DeltaFIFO) Add(obj interface{}) error {
        f.lock.Lock()
        defer f.lock.Unlock()
        f.populated = true
        return f.queueActionLocked(Added, obj)
    }
    
    // Update is just like Add, but makes an Updated Delta.
    func (f *DeltaFIFO) Update(obj interface{}) error {
        f.lock.Lock()
        defer f.lock.Unlock()
        f.populated = true
        return f.queueActionLocked(Updated, obj)
    }
    
    // Delete is just like Add, but makes a Deleted Delta. If the given
    // object does not already exist, it will be ignored. (It may have
    // already been deleted by a Replace (re-list), for example.)  In this
    // method `f.knownObjects`, if not nil, provides (via GetByKey)
    // _additional_ objects that are considered to already exist.
    func (f *DeltaFIFO) Delete(obj interface{}) error {
        id, err := f.KeyOf(obj)
        if err != nil {
            return KeyError{obj, err}
        }
        f.lock.Lock()
        defer f.lock.Unlock()
        f.populated = true
        if f.knownObjects == nil {
            if _, exists := f.items[id]; !exists {
                // Presumably, this was deleted when a relist happened.
                // Don't provide a second report of the same deletion.
                return nil
            }
        } else {
            // We only want to skip the "deletion" action if the object doesn't
            // exist in knownObjects and it doesn't have corresponding item in items.
            // Note that even if there is a "deletion" action in items, we can ignore it,
            // because it will be deduped automatically in "queueActionLocked"
            _, exists, err := f.knownObjects.GetByKey(id)
            _, itemsExist := f.items[id]
            if err == nil && !exists && !itemsExist {
                // Presumably, this was deleted when a relist happened.
                // Don't provide a second report of the same deletion.
                return nil
            }
        }
    
        // exist in items and/or KnownObjects
        return f.queueActionLocked(Deleted, obj)
    }
    root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# cat main.go 
    package main
    
    import (
            "fmt"
            "github.com/spongeprojects/magicconch"
            corev1 "k8s.io/api/core/v1"
            "k8s.io/apimachinery/pkg/util/wait"
            "k8s.io/client-go/tools/cache"
            "time"
    )
    
    // newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
    func newStore() cache.Store {
            return cache.NewStore(cache.MetaNamespaceKeyFunc)
    }
    
    // newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
    // 注意在初始化时 store 作为 KnownObjects 参数传入其中,
    // 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
    // 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
    // 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
    func newQueue(store cache.Store) cache.Queue {
            return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
                    KnownObjects:          store,
                    EmitDeltaTypeReplaced: true,
            })
    }
    
    // newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
    // 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
    func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
            lw := newConfigMapsListerWatcher() // 前面有说明
            // 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
            // 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
            // 第 4 个参数是 resyncPeriod,
            // 这里传了 0,表示从不重新同步(除非连接超时或者中断),
            // 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
            // 同步过程中会产生 SYNC 类型的事件。
            return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
    }
    
    func main() {
            fmt.Println("----- 2-reflector -----")
    
            store := newStore()
            queue := newQueue(store)
            reflector := newConfigMapsReflector(queue)
    
            stopCh := make(chan struct{})
            defer close(stopCh)
    
            // reflector 开始运行后,队列中就会推入新收到的事件
            go reflector.Run(stopCh)
    
            // 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
            // 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
            processObj := func(obj interface{}) error {
                    // 最先收到的事件会被最先处理
                    for _, d := range obj.(cache.Deltas) {
                            switch d.Type {
                            case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
                                    if _, exists, err := store.Get(d.Object); err == nil && exists {
                                            if err := store.Update(d.Object); err != nil {
                                                    return err
                                            }
                                    } else {
                                            if err := store.Add(d.Object); err != nil {
                                                    return err
                                            }
                                    }
                            case cache.Deleted:
                                    if err := store.Delete(d.Object); err != nil {
                                            return err
                                    }
                            }
                            configMap, ok := d.Object.(*corev1.ConfigMap)
                            if !ok {
                                    return fmt.Errorf("not config: %T", d.Object)
                            }
                            fmt.Printf("%s: %s
    ", d.Type, configMap.Name)
                    }
                    return nil
            }
    
            fmt.Println("Start syncing...")
    
            // 持续运行直到 stopCh 关闭
            wait.Until(func() {
                    for {
                            _, err := queue.Pop(processObj)
                            magicconch.Must(err)
                    }
            }, time.Second, stopCh)
    }
    wget http://localhost:8001/api/v1/tmp/configmaps/watch
    kubectl create configmap -n tmp demo
    kubectl create namespace tmp
    kubectl delete configmaps -n tmp demo
    root@ubuntu:~# kubectl create configmap -n tmp demo
    configmap/demo created
    root@ubuntu:~# kubectl create configmap -n tmp demo2
    configmap/demo2 created
    root@ubuntu:~# kubectl delete configmap -n tmp demo2
    configmap "demo2" deleted
    root@ubuntu:~# kubectl delete configmap -n tmp demo
    configmap "demo" deleted
    root@ubuntu:~# 
    root@ubuntu:~/go_learn/articles/archive/dive-into-kubernetes-informer/2-reflector# ./main --kubeconfig=$HOME/.kube/config 
    ----- 2-reflector -----
    Start syncing...
    Added: demo
    Added: demo2
    Deleted: demo2
    Deleted: demo

    Kubernetes Informer 源码解析与深度使用 [1/4]: cache 包源码解析与 Informer 的使用

  • 相关阅读:
    Atitit.随时间变色特效 ---包厢管理系统的规划
    Atitit.request http乱码的设计防止 检测与解决最近实践p825 attilax总结.doc
    Atitit.request http乱码的设计防止 检测与解决最近实践p825 attilax总结.doc
    atitit.薄伽梵歌overview  attilax 读后感
    Atitit。 《吠陀》 《梨俱吠陀》overview 经读后感  是印度上古时期一些文献的总称
    Atitit。 《吠陀》 《梨俱吠陀》overview 经读后感  是印度上古时期一些文献的总称
    atitit.薄伽梵歌overview  attilax 读后感
    Atitit 《摩奴法典》overivew 读后感 不是由国王 颁布的,而是 僧侣编制
    Atitit 《摩奴法典》overivew 读后感 不是由国王 颁布的,而是 僧侣编制
    Atitit.执行cli cmd的原理与调试
  • 原文地址:https://www.cnblogs.com/dream397/p/14989983.html
Copyright © 2011-2022 走看看