zoukankan      html  css  js  c++  java
  • weave-npc 介绍(适配k8s v1.6)

    Weave-npc使用iptables来生效network policy,由于network policy在使用namespace selector、pod selector会有匹配多个pod,所以使用linux ipset功能来批量生效iptables规则,如下所示:
    -A WEAVE-NPC-INGRESS -p tcp -m set --match-set weave-?!9~5$SRrA]EcONWS|ZaRex{v src -m set --match-set weave-*J?Cn/g0cU{zEyOeo;P6+_DD2 dst -m tcp --dport 80 -j ACCEPT
    其中ipset是”weave-“为前缀,后面是sha1处理过字符串。
    Weave有按照如下方式初始化iptables规则。Network policy中ingress规则会在weave-npc-ingress链中添加规则(白名单),所以如果没有匹配默认规则和白名单规则的话报文就会被drop掉。
    -A FORWARD -o weave -j WEAVE-NPC
    -A FORWARD -o weave -m state --state NEW -j NFLOG --nflog-group 86
    -A FORWARD -o weave -j DROP
    … …
    -A WEAVE-NPC -m state --state RELATED,ESTABLISHED -j ACCEPT
    -A WEAVE-NPC -d 224.0.0.0/4 -j ACCEPT
    -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-DEFAULT
    -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-INGRESS
    -A WEAVE-NPC -m set ! --match-set weave-local-pods dst -j ACCEPT

    func root(cmd *cobra.Command, args []string) {
       common.SetLogLevel(logLevel)
       //用于标识主机,产生本地的规则
       if nodeName == "" {
          // HOSTNAME is set by Kubernetes for pods in the host network namespace
          nodeName = os.Getenv("HOSTNAME")
       }
       if nodeName == "" {
          common.Log.Fatalf("Must set node name via --node-name or $HOSTNAME")
       }
       common.Log.Infof("Starting Weaveworks NPC %s; node name %q", version, nodeName)
    
    
       if err := metrics.Start(metricsAddr); err != nil {
          common.Log.Fatalf("Failed to start metrics: %v", err)
       }
    
       if err := ulogd.Start(); err != nil {
          common.Log.Fatalf("Failed to start ulogd: %v", err)
       }
    
       config, err := rest.InClusterConfig()
       handleError(err)
    
       client, err := kubernetes.NewForConfig(config)
       handleError(err)
       // 创建iptables 对象,用于管理iptables规则及生效
       ipt, err := iptables.New()
       handleError(err)
       // 创建ipeset对象,用于管理ipset资源
       ips := ipset.New(common.LogLogger())
       // resetIPTables在filter表添加WEAVE-NPC-INGRESS链、WEAVE-NPC-DEFAULT链、
       // WEAVE-NPC链,若已经存在则置空(FLUSH)
       handleError(resetIPTables(ipt))
       // resetIPSets 将weave-npc创建的ipset,即名字为”weave-“开头的ipset的成员删除
       handleError(resetIPSets(ips))
       // createBaseRules初始化iptables规则
       // weave-npc链中添加iptables规则
       // -A WEAVE-NPC -m state --state RELATED,ESTABLISHED -j ACCEPT
       // -A WEAVE-NPC -d 224.0.0.0/4 -j ACCEPT
       // -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-DEFAULT
       // -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-INGRESS
       // -A WEAVE-NPC -m set ! --match-set weave-local-pods dst -j ACCEPT
       handleError(createBaseRules(ipt, ips))
    
       npc := npc.New(nodeName, ipt, ips)
       // 下面起了三个controller,利用client-go的informer来分别负责namespaces、pods和
       // networkpolicies的创改删处理
       nsController := makeController(client.Core().RESTClient(), "namespaces", &coreapi.Namespace{},
          cache.ResourceEventHandlerFuncs{
             AddFunc: func(obj interface{}) {
                handleError(npc.AddNamespace(obj.(*coreapi.Namespace)))
             },
             DeleteFunc: func(obj interface{}) {
                switch obj := obj.(type) {
                case *coreapi.Namespace:
                   handleError(npc.DeleteNamespace(obj))
                case cache.DeletedFinalStateUnknown:
                   // We know this object has gone away, but its final state is no longer
                   // available from the API server. Instead we use the last copy of it
                   // that we have, which is good enough for our cleanup.
                   handleError(npc.DeleteNamespace(obj.Obj.(*coreapi.Namespace)))
                }
             },
             UpdateFunc: func(old, new interface{}) {
                handleError(npc.UpdateNamespace(old.(*coreapi.Namespace), new.(*coreapi.Namespace)))
             }})
    
       podController := makeController(client.Core().RESTClient(), "pods", &coreapi.Pod{},
          cache.ResourceEventHandlerFuncs{
             AddFunc: func(obj interface{}) {
                handleError(npc.AddPod(obj.(*coreapi.Pod)))
             },
             DeleteFunc: func(obj interface{}) {
                switch obj := obj.(type) {
                case *coreapi.Pod:
                   handleError(npc.DeletePod(obj))
                case cache.DeletedFinalStateUnknown:
                   // We know this object has gone away, but its final state is no longer
                   // available from the API server. Instead we use the last copy of it
                   // that we have, which is good enough for our cleanup.
                   handleError(npc.DeletePod(obj.Obj.(*coreapi.Pod)))
                }
             },
             UpdateFunc: func(old, new interface{}) {
                handleError(npc.UpdatePod(old.(*coreapi.Pod), new.(*coreapi.Pod)))
             }})
    
       npController := makeController(client.Extensions().RESTClient(), "networkpolicies", &extnapi.NetworkPolicy{},
          cache.ResourceEventHandlerFuncs{
             AddFunc: func(obj interface{}) {
                handleError(npc.AddNetworkPolicy(obj.(*extnapi.NetworkPolicy)))
             },
             DeleteFunc: func(obj interface{}) {
                switch obj := obj.(type) {
                case *extnapi.NetworkPolicy:
                   handleError(npc.DeleteNetworkPolicy(obj))
                case cache.DeletedFinalStateUnknown:
                   // We know this object has gone away, but its final state is no longer
                   // available from the API server. Instead we use the last copy of it
                   // that we have, which is good enough for our cleanup.
                   handleError(npc.DeleteNetworkPolicy(obj.Obj.(*extnapi.NetworkPolicy)))
                }
             },
             UpdateFunc: func(old, new interface{}) {
                handleError(npc.UpdateNetworkPolicy(old.(*extnapi.NetworkPolicy), new.(*extnapi.NetworkPolicy)))
             }})
    
       go nsController.Run(wait.NeverStop)
       go podController.Run(wait.NeverStop)
       go npController.Run(wait.NeverStop)
    
       signals := make(chan os.Signal, 1)
       signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
       common.Log.Fatalf("Exiting: %v", <-signals)
    }
    

    network policy是namespace scoped,所以资源的处理都在namespace的基础上去实现的。

    func (npc *controller) AddNamespace(obj *coreapi.Namespace) error {
       npc.Lock()
       defer npc.Unlock()
    
       common.Log.Infof("EVENT AddNamespace %s", js(obj))
       return npc.withNS(obj.ObjectMeta.Name, func(ns *ns) error {
          return errors.Wrap(ns.addNamespace(obj), "add namespace")
       })
    }
    

      

     

    一个namespace会有一个podselectorSet,其中entry中包含多个selector,每对一个pod spec作provision的时候,若此pod spec之前没有处理过则会创建一个seletor,并将通过onNewPodSelector将匹配的pod ip加到selector的ipset中去。

    func newNS(name, nodeName string, ipt iptables.Interface, ips ipset.Interface, nsSelectors *selectorSet) (*ns, error) {
       allPods, err := newSelectorSpec(&metav1.LabelSelector{}, name, ipset.HashIP)
       if err != nil {
          return nil, err
       }
    
       ns := &ns{
          ipt:         ipt,
          ips:         ips,
          name:        name,
          nodeName:    nodeName,
          pods:        make(map[types.UID]*coreapi.Pod),
          policies:    make(map[types.UID]*extnapi.NetworkPolicy),
          uid:         uuid.NewUUID(),
          allPods:     allPods,
          nsSelectors: nsSelectors,
          rules:       newRuleSet(ipt)}
    
       ns.podSelectors = newSelectorSet(ips, ns.onNewPodSelector)
    
       //  对allpods创建selector,因为allpods的spec是{},即选择所有的pod,因此allpods对应的selector的ipset中包含了该namespace下所有的pod。 
       if err := ns.podSelectors.provision(ns.uid, nil, map[string]*selectorSpec{ns.allPods.key: ns.allPods}); err != nil {
          return nil, err
       }
    
       return ns, nil
    }
    

     

    创建namespace
    k8s v1.7之前 network policy不支持default deny,default deny是使用annotation的方式实现的,yaml如下:

    kind: Namespace
    apiVersion: v1
    metadata:
      name: myns
      annotations:
        net.beta.kubernetes.io/network-policy: |
          {
            "ingress": {
            "isolation": "DefaultDeny"
            }
          }

    ipsetName为weave-$(sha1_on_namespacename)

    -A WEAVE-NPC-DEFAULT -m set --match-set weave-$(sha1_on_namespacename) dst -j ACCEPT –m comment –comment “DefaultAllow isolation for namespace: $(namespace_name)“ 

    weave默认是drop的,见如下红色。所以如果没有weave-npc中匹配接受包就会drop掉。

    -A FORWARD -o weave -j WEAVE-NPC

    -A FORWARD -o weave -m state --state NEW -j NFLOG --nflog-group 86

    -A FORWARD -o weave -j DROP

    func (ns *ns) addNamespace(obj *coreapi.Namespace) error {
       ns.namespace = obj
      
       if !isDefaultDeny(obj) {
          if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil {
             return err
          }
       }
    
       // Add namespace ipset to matching namespace selectors
       // 遍历所有的namespace selector,若某个namespace selector与刚添加的namespace label匹配,则将此namespace添加到该selector对应的ipset下。
       return ns.nsSelectors.addToMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns))
    

    更新namesapce

    func (ns *ns) updateNamespace(oldObj, newObj *coreapi.Namespace) error {
       ns.namespace = newObj
       // Update bypass rule if ingress default has changed
       oldDefaultDeny := isDefaultDeny(oldObj)
       newDefaultDeny := isDefaultDeny(newObj)
       // 对比新旧使用deaultdeny的用法,选择删除或者加上by pass的规则
       if oldDefaultDeny != newDefaultDeny {
          common.Log.Infof("namespace DefaultDeny changed from %t to %t", oldDefaultDeny, newDefaultDeny)
          if oldDefaultDeny {
             if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil {
                return err
             }
          }
          if newDefaultDeny {
             if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil {
                return err
             }
          }
       }
    
       // Re-evaluate namespace selector membership if labels have changed
       if !equals(oldObj.ObjectMeta.Labels, newObj.ObjectMeta.Labels) {
          for _, selector := range ns.nsSelectors.entries {
             oldMatch := selector.matches(oldObj.ObjectMeta.Labels)
             newMatch := selector.matches(newObj.ObjectMeta.Labels)
             if oldMatch == newMatch {
                continue
             }
             if oldMatch {
                if err := selector.delEntry(string(ns.allPods.ipsetName)); err != nil {
                   return err
                }
             }
             if newMatch {
                if err := selector.addEntry(string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil {
                   return err
                }
             }
          }
       }
    
       return nil
    }
    

    删除namespace,操作与增加相反,删除bypass rule规则,将namespace从匹配的 nsselector中删除

    func (ns *ns) deleteNamespace(obj *coreapi.Namespace) error {
       ns.namespace = nil
       // Remove bypass rule
       if !isDefaultDeny(obj) {
          if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil {
             return err
          }
       }
    
       // Remove namespace ipset from any matching namespace selectors
       return ns.nsSelectors.delFromMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName))
    }

     创建pod

    func (ns *ns) addPod(obj *coreapi.Pod) error {
       ns.pods[obj.ObjectMeta.UID] = obj
       // 若pod不是运行态或者使用的HostNetwork,则返回
       if !hasIP(obj) {
          return nil
       }
       //若pod属于本机,则添加到weave-local-pods ipset中
       if ns.checkLocalPod(obj) {
          ns.ips.AddEntry(LocalIpset, obj.Status.PodIP, podComment(obj))
       }
       // 若pod label与pod selector匹配,则将此pod ip添加pod selector对应的ipset
       // allPodspec由于每个pod都会匹配,所以所有新增的pod都会加入allpod ipset中。
       return ns.podSelectors.addToMatching(obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj))
    }
    
    // 当pod有无状态切换时,从local ipset中添加或者删除;label切换时,根据匹配关系从pod selector ipset中添加或者删除
    func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error {
       delete(ns.pods, oldObj.ObjectMeta.UID)
       ns.pods[newObj.ObjectMeta.UID] = newObj
    
       if !hasIP(oldObj) && !hasIP(newObj) {
          return nil
       }
    
       if hasIP(oldObj) && !hasIP(newObj) {
          if ns.checkLocalPod(oldObj) {
             ns.ips.DelEntry(LocalIpset, oldObj.Status.PodIP)
          }
          return ns.podSelectors.delFromMatching(oldObj.ObjectMeta.Labels, oldObj.Status.PodIP)
       }
    
       if !hasIP(oldObj) && hasIP(newObj) {
          if ns.checkLocalPod(newObj) {
             ns.ips.AddEntry(LocalIpset, newObj.Status.PodIP, podComment(newObj))
          }
          return ns.podSelectors.addToMatching(newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj))
       }
    
       if !equals(oldObj.ObjectMeta.Labels, newObj.ObjectMeta.Labels) ||
          oldObj.Status.PodIP != newObj.Status.PodIP {
    
          for _, ps := range ns.podSelectors.entries {
             oldMatch := ps.matches(oldObj.ObjectMeta.Labels)
             newMatch := ps.matches(newObj.ObjectMeta.Labels)
             if oldMatch == newMatch && oldObj.Status.PodIP == newObj.Status.PodIP {
                continue
             }
             if oldMatch {
                if err := ps.delEntry(oldObj.Status.PodIP); err != nil {
                   return err
                }
             }
             if newMatch {
                if err := ps.addEntry(newObj.Status.PodIP, podComment(newObj)); err != nil {
                   return err
                }
             }
          }
       }
    
       return nil
    }  

    从ipset weave-local-pods中删除该pod ip,并从匹配的pod selector ipset中删除

    func (ns *ns) deletePod(obj *coreapi.Pod) error {
       delete(ns.pods, obj.ObjectMeta.UID)
    
       if !hasIP(obj) {
          return nil
       }
    
       if ns.checkLocalPod(obj) {
          ns.ips.DelEntry(LocalIpset, obj.Status.PodIP)
       }
       return ns.podSelectors.delFromMatching(obj.ObjectMeta.Labels, obj.Status.PodIP)
    }
    
    
    func (ns *ns) addNetworkPolicy(obj *extnapi.NetworkPolicy) error {
       ns.policies[obj.ObjectMeta.UID] = obj
    
       // Analyse policy, determine which rules and ipsets are required
       rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj)
       if err != nil {
          return err
       }
    
       // Provision required resources in dependency order
       if err := ns.nsSelectors.provision(obj.ObjectMeta.UID, nil, nsSelectors); err != nil {
          return err
       }
       if err := ns.podSelectors.provision(obj.ObjectMeta.UID, nil, podSelectors); err != nil {
          return err
       }
       return ns.rules.provision(obj.ObjectMeta.UID, nil, rules)
    }
    

      

    func (ns *ns) addNetworkPolicy(obj *extnapi.NetworkPolicy) error {
       ns.policies[obj.ObjectMeta.UID] = obj
    
       // Analyse policy, determine which rules and ipsets are required
       rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj)
       if err != nil {
          return err
       }
    
       // Provision required resources in dependency order
       if err := ns.nsSelectors.provision(obj.ObjectMeta.UID, nil, nsSelectors); err != nil {
          return err
       }
       if err := ns.podSelectors.provision(obj.ObjectMeta.UID, nil, podSelectors); err != nil {
          return err
       }
       return ns.rules.provision(obj.ObjectMeta.UID, nil, rules)
    }
    
    
    
    func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) (
       rules map[string]*ruleSpec,
       nsSelectors, podSelectors map[string]*selectorSpec,
       err error) {
    
       nsSelectors = make(map[string]*selectorSpec)
       podSelectors = make(map[string]*selectorSpec)
       rules = make(map[string]*ruleSpec)
       // pod selector 匹配目的pod
       dstSelector, err := newSelectorSpec(&policy.Spec.PodSelector, ns.name, ipset.HashIP)
       if err != nil {
          return nil, nil, nil, err
       }
       podSelectors[dstSelector.key] = dstSelector
    
       for _, ingressRule := range policy.Spec.Ingress {
          // 如果ports或者from 不为空,但是没有内容,表示无报文可以匹配
          // ingress: 
          //   - ports:[]
          if ingressRule.Ports != nil && len(ingressRule.Ports) == 0 {
             // Ports is empty, this rule matches no ports (no traffic matches).
             continue
          }
          // ingress: 
          //   - from: []
          // 如果ports或者from 不为空,但是没有内容,表示无报文可以匹配
          if ingressRule.From != nil && len(ingressRule.From) == 0 {
             // From is empty, this rule matches no sources (no traffic matches).
             continue
          }
    
          if ingressRule.From == nil {
             // From is not provided, this rule matches all sources (traffic not restricted by source).
             if ingressRule.Ports == nil {
                // Ports is not provided, this rule matches all ports (traffic not restricted by port).
                rule := newRuleSpec(nil, nil, dstSelector, nil)
                rules[rule.key] = rule
             } else {
                // Ports is present and contains at least one item, then this rule allows traffic
                // only if the traffic matches at least one port in the ports list.
                withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) {
                   rule := newRuleSpec(&proto, nil, dstSelector, &port)
                   rules[rule.key] = rule
                })
             }
          } else {
             // From is present and contains at least on item, this rule allows traffic only if the
             // traffic matches at least one item in the from list.
             // (ingress.From)networkpolicyPeer在1.8版本中支持ipblock
             // ingress:
             // - from:
             //   - namespaceSelector:
             //       matchLabels:
             //         project: myproject
             //   - podSelector:
             //       matchLabels:
             //         role: frontend
             //   ports:
             //   - protocol: TCP
             //     port: 6379
             for _, peer := range ingressRule.From {
                var srcSelector *selectorSpec
                if peer.PodSelector != nil {
                   srcSelector, err = newSelectorSpec(peer.PodSelector, ns.name, ipset.HashIP)
                   if err != nil {
                      return nil, nil, nil, err
                   }
                   podSelectors[srcSelector.key] = srcSelector
                }
                if peer.NamespaceSelector != nil {
                   srcSelector, err = newSelectorSpec(peer.NamespaceSelector, "", ipset.ListSet)
                   if err != nil {
                      return nil, nil, nil, err
                   }
                   nsSelectors[srcSelector.key] = srcSelector
                }
                // 在前面两步选择了selector之后选择受限访问的port
                // 这里有个问题,如果namespace selector和pod selector都有的话,
                // 应该是有两个match set,而不是像下面这样只使用最后一个。
                if ingressRule.Ports == nil {
                   // Ports is not provided, this rule matches all ports (traffic not restricted by port).
                   rule := newRuleSpec(nil, srcSelector, dstSelector, nil)
                   rules[rule.key] = rule
                } else {
                   // Ports is present and contains at least one item, then this rule allows traffic
                   // only if the traffic matches at least one port in the ports list.
                   withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) {
                      rule := newRuleSpec(&proto, srcSelector, dstSelector, &port)
                      rules[rule.key] = rule
                   })
                }
             }
          }
       }
    
       return rules, nsSelectors, podSelectors, nil
    }
    

      

    根据network policy规则翻译为iptables规则。weave最后有一条默认drop规则,规则都是以白名单的创建,也就是-j ACCPET
    -p $(proto) –m set –match-set $(src_ipset) src –m set –match-set $(src_ipset) dst --dport $(dstPort) -j ACCPET

    func newRuleSpec(proto *string, srcHost *selectorSpec, dstHost *selectorSpec, dstPort *string) *ruleSpec {
       args := []string{}
       if proto != nil {
          args = append(args, "-p", *proto)
       }
       srcComment := "anywhere"
       if srcHost != nil {
          args = append(args, "-m", "set", "--match-set", string(srcHost.ipsetName), "src")
          if srcHost.nsName != "" {
             srcComment = fmt.Sprintf("pods: namespace: %s, selector: %s", srcHost.nsName, srcHost.key)
          } else {
             srcComment = fmt.Sprintf("namespaces: selector: %s", srcHost.key)
          }
       }
       dstComment := "anywhere"
       if dstHost != nil {
          args = append(args, "-m", "set", "--match-set", string(dstHost.ipsetName), "dst")
          dstComment = fmt.Sprintf("pods: namespace: %s, selector: %s", dstHost.nsName, dstHost.key)
       }
       if dstPort != nil {
          args = append(args, "--dport", *dstPort)
       }
       args = append(args, "-j", "ACCEPT")
       args = append(args, "-m", "comment", "--comment", fmt.Sprintf("%s -> %s", srcComment, dstComment))
       key := strings.Join(args, " ")
    
       return &ruleSpec{key, args}
    }

    参考:

    Implement Kubernetes 1.7 NetworkPolicy semantics https://github.com/weaveworks/weave/pull/3151

  • 相关阅读:
    第七次作业
    rfid工作原理
    实验九——基本数据类型存储及应用总结
    实验八——函数定义及调用总结
    实验七——函数定义及调用总结
    作业
    作业
    作业
    开始
    实验12——指针的基础应用2
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/7767543.html
Copyright © 2011-2022 走看看