zoukankan      html  css  js  c++  java
  • kubernetes源码阅读笔记——kube-proxy(之二)

    本篇文章我们从ServiceConfig的创建和运行开始。

    一、ServiceConfig的创建

    ServiceConfig是kube-proxy中用于监听service变化的组件,其本质就是informer,进入NewServiceConfig方法可知。

    pkg/proxy/config/config.go
    
    // NewServiceConfig creates a new ServiceConfig.
    func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
        result := &ServiceConfig{
            lister:       serviceInformer.Lister(),
            listerSynced: serviceInformer.Informer().HasSynced,
        }
    
        serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    result.handleAddService,
                UpdateFunc: result.handleUpdateService,
                DeleteFunc: result.handleDeleteService,
            },
            resyncPeriod,
        )
    
        return result
    }

    方法为serviceInformer添加了3个回调函数,当service发生变化时会调用相应的函数。

    二、回调函数

    add和delete的回调函数本质上仍是执行了update的回调函数,特殊点在于前者为nil更新为目标service,后者为目标service更新为nil。因此我们以update为例。

    其回调函数handleUpdateService本质上是调用了OnServiceUpdate方法:

    pkg/proxy/iptables/proxier.go
    
    func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
        if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
            proxier.syncRunner.Run()
        }
    }

    方法很短,首先调用了Update方法,成功后调用Run方法。

    看一下Update方法:

    pkg/proxy/service.go
    
    // Update updates given service's change map based on the <previous, current> service pair.  It returns true if items changed,
    // otherwise return false.  Update can be used to add/update/delete items of ServiceChangeMap.  For example,
    // Add item
    //   - pass <nil, service> as the <previous, current> pair.
    // Update item
    //   - pass <oldService, service> as the <previous, current> pair.
    // Delete item
    //   - pass <service, nil> as the <previous, current> pair.
    func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
        svc := current
        if svc == nil {
            svc = previous
        }
        // previous == nil && current == nil is unexpected, we should return false directly.
        if svc == nil {
            return false
        }
        namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
    
        sct.lock.Lock()
        defer sct.lock.Unlock()
    
        change, exists := sct.items[namespacedName]
        if !exists {
            change = &serviceChange{}
            change.previous = sct.serviceToServiceMap(previous)
            sct.items[namespacedName] = change
        }
        change.current = sct.serviceToServiceMap(current)
        // if change.previous equal to change.current, it means no change
        if reflect.DeepEqual(change.previous, change.current) {
            delete(sct.items, namespacedName)
        }
        return len(sct.items) > 0
    }

    可以看到,这一方法足以处理add、update、delete三种情况。这里的update本质上是调用serviceToServiceMap方法,将service的改变前和改变后的状态存储在ServiceChangeTracker结构体的items map中,其键和值分别为service的NamespacedName和serviceChange两个结构体。serviceChange结构体只有两个字段:

    pkg/proxy/service.go
    
    // serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
    // changes are accumulated, i.e. previous is state from before applying the changes,
    // current is state after applying all of the changes.
    type serviceChange struct {
        previous ServiceMap
        current  ServiceMap
    }

    分别用来存放状态改变前后的ServiceMap,而ServiceMap则是一个以ServicePortName和ServicePort为键值对的map:

    pkg/proxy/service.go
    
    type ServiceMap map[ServicePortName]ServicePort

    综上,Update方法做的就是将service的改变前后的数据存入map中。

    Update执行成功且proxier初始化后,会调用syncRunner.Run方法。进入看看:

    pkg/util/async/bounded_frequency_runner.go
    
    func (bfr *BoundedFrequencyRunner) Run() {
        // If it takes a lot of time to run the underlying function, noone is really
        // processing elements from <run> channel. So to avoid blocking here on the
        // putting element to it, we simply skip it if there is already an element
        // in it.
        select {
        case bfr.run <- struct{}{}:
        default:
        }
    }

    当bfr的run字段为空时,将一个空结构体写入这个channel,否则直接结束。而这个结构体一旦传入channel,会触发bfr调用相应的方法。

    三、ServiceConfig的运行

    回想上一篇文章中的server.go中的Run方法:

    cmd/kube-proxy/app/server.go
    
    // Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
    func (s *ProxyServer) Run() error {
        ...
        serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
        serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
        go serviceConfig.Run(wait.NeverStop)
    
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
        go endpointsConfig.Run(wait.NeverStop)
    
        ...// Just loop forever for now...
        s.Proxier.SyncLoop()
        return nil
    }

    前面我们重点看了NewServiceConfig方法,下面我们来看后面的Run和SyncLoop。

    进入Run方法:

    pkg/proxy/config/config.go

    //
    Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() klog.Info("Starting service config controller") defer klog.Info("Shutting down service config controller") if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) { return } for i := range c.eventHandlers { klog.V(3).Info("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() } <-stopCh }

    此方法的核心就是调用handler的OnServiceSynced方法。进入OnServiceSynced:

    pkg/proxy/iptables/proxier.go
    
    func (proxier *Proxier) OnServiceSynced() {
        proxier.mu.Lock()
        proxier.servicesSynced = true
        proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
        proxier.mu.Unlock()
    
        // Sync unconditionally - this is called once per lifetime.
        proxier.syncProxyRules()
    }

    首先调用setInitialized方法,将proxier初始化。只有经过初始化后,proxier才会开始调用回调函数。最后,就是执行一次syncProxyRules方法。

    总而言之,Run方法是将刚创建好的ServiceConfig初始化,并在初始化后先调用一次syncProxyRules方法。而这个方法,就是kube-proxy维护iptables的具体操作,我们后面再详细分析。

    四、SyncLoop

    下面我们再来看SyncLoop。SyncLoop本质上调用了bounded_frequency_runner.go中的Loop方法:

    pkg/util/async/bounded_frequency_runner.go

    //
    Loop handles the periodic timer and run requests. This is expected to be // called as a goroutine. func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { klog.V(3).Infof("%s Loop running", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() klog.V(3).Infof("%s Loop stopping", bfr.name) return case <-bfr.timer.C(): bfr.tryRun() case <-bfr.run: bfr.tryRun() } } }

    可以看到,此方法运行一个无限循环,并定时运行tryRun方法。此外,当bfr的run字段有消息传入时,也会执行一次tryRun。那么这个channel什么时候传入消息呢?答案就是上一篇文章中提到的,ServiceConfig的回调函数被调用的时候。所以说,每当service发生变化,回调函数被调用时,最终都会执行一次tryRun方法。

    进入tryRun方法:

    pkg/util/async/bounded_frequency_runner.go
    // assumes the lock is not held
    func (bfr *BoundedFrequencyRunner) tryRun() {
        bfr.mu.Lock()
        defer bfr.mu.Unlock()
    
        if bfr.limiter.TryAccept() {
            // We're allowed to run the function right now.
            bfr.fn()
            bfr.lastRun = bfr.timer.Now()
            bfr.timer.Stop()
            bfr.timer.Reset(bfr.maxInterval)
            klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
            return
        }
    
        // It can't run right now, figure out when it can run next.
    
        elapsed := bfr.timer.Since(bfr.lastRun)    // how long since last run
        nextPossible := bfr.minInterval - elapsed  // time to next possible run
        nextScheduled := bfr.maxInterval - elapsed // time to next periodic run
        klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
    
        if nextPossible < nextScheduled {
            // Set the timer for ASAP, but don't drain here.  Assuming Loop is running,
            // it might get a delivery in the mean time, but that is OK.
            bfr.timer.Stop()
            bfr.timer.Reset(nextPossible)
            klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
        }
    }

    所有其他的代码,都在为bfr.fn服务,此方法的核心,就是在合适的时机运行bfr.fn方法。而这一方法,是在创建proxier的时候注册进去的。回忆一下上一篇的内容,在NewProxier方法中有一行:

    pkg/proxy/iptables/proxier.go
    
    func NewProxier(...) (*Proxier, error) {
        
        ......
    
        proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
        return proxier, nil
    }

    就是在这里进行的注册。可以看到,这里将前面提到的proxier.syncProxyRules方法注册为了bfr.fn。

    所以综上可知,syncProxyRules方法有三种被执行的途径,即:

    (1)在service和endpoint的Config刚创建并初始化的时候;

    (2)在service和endpoint发生变化的时候;

    (3)每隔一段时间会自动执行。

    五、syncProxyRules

    syncProxyRules方法是kube-proxy组件最为核心的方法。方法长达700多行,涉及到大量的网络相关知识:

    pkg/proxy/iptables/proxier.go
    
    // This is where all of the iptables-save/restore calls happen.
    // The only other iptables rules are those that are setup in iptablesInit()
    // This assumes proxier.mu is NOT held
    func (proxier *Proxier) syncProxyRules() {
        proxier.mu.Lock()
        defer proxier.mu.Unlock()
    
        ...// We assume that if this was called, we really want to sync them,
        // even if nothing changed in the meantime. In other words, callers are
        // responsible for detecting no-op changes and not calling this function.
        serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
        endpointUpdateResult := proxy.UpdateEndpointsMap(proxier.endpointsMap, proxier.endpointsChanges)
    
        ...
    
        klog.V(3).Info("Syncing iptables rules")
    
        // Create and link the kube chains.
        for _, chain := range iptablesJumpChains {
            if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
                klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
                return
            }
            args := append(chain.extraArgs,
                "-m", "comment", "--comment", chain.comment,
                "-j", string(chain.chain),
            )
            if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
                klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
                return
            }
        }
    
        ...

    // Build rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) if !ok { klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP) protocol := strings.ToLower(string(svcInfo.Protocol)) svcNameString := svcInfo.serviceNameString hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 svcChain := svcInfo.servicePortChainName if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true } ...// Capture the clusterIP. if hasEndpoints { args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), ) ... } else { writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", utilproxy.ToCIDR(svcInfo.ClusterIP), "--dport", strconv.Itoa(svcInfo.Port), "-j", "REJECT", ) } // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPs {
    ...

    } // Capture load-balancer ingress. if hasEndpoints { ... }// Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort != 0 { ... } if !hasEndpoints { continue } ...// Now write loadbalancing & DNAT rules.
    ...

    // Now write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.

         ... }
    // Delete chains no longer in use.
    ...

    // Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules.

    ...

    // Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { klog.Errorf("Failed to execute iptables-restore: %v", err) // Revert new local ports. klog.V(2).Infof("Closing local ports after iptables-restore failure") utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } ... }

    这里只贴出一小部分。总的来说,方法做了以下几件事:

    (1)调用UpdateServiceMap和UpdateEndpointMap方法,执行Service和Endpoint的更新。

    (2)添加数据链表。

    (3)遍历所有的service,判断每个service的类型,并添加相应的规则。

    (4)删除多余的链表,并对iptables进行重构。

    具体细节可参考https://blog.csdn.net/zhangxiangui40542/article/details/79486995https://www.jianshu.com/p/a978af8e5dd8https://blog.csdn.net/zhonglinzhang/article/details/80185053

    六、总结

    kube-proxy组件(iptables模式下)的逻辑相对简单,就是通过informer去实时监听service和endpoint资源的变化,并及时更新iptables。

  • 相关阅读:
    变量对象,作用域链,闭包,匿名函数,this关键字,原型链,构造器,js预编译,对象模型,执行模型,prototype继承
    iptables-snat-dnat-设置
    salt-ssh
    linux-vsftp
    网站申请HTTPS 访问
    shell 处理文件脚本
    last与lastb命令 读取的日志文件
    Linux-server-sshd
    Nginx 日志切割
    修改或隐藏服务器名称需要修改源码
  • 原文地址:https://www.cnblogs.com/00986014w/p/11018314.html
Copyright © 2011-2022 走看看