Weave-npc使用iptables来生效network policy,由于network policy在使用namespace selector、pod selector会有匹配多个pod,所以使用linux ipset功能来批量生效iptables规则,如下所示:
-A WEAVE-NPC-INGRESS -p tcp -m set --match-set weave-?!9~5$SRrA]EcONWS|ZaRex{v src -m set --match-set weave-*J?Cn/g0cU{zEyOeo;P6+_DD2 dst -m tcp --dport 80 -j ACCEPT
Weave有按照如下方式初始化iptables规则。Network policy中ingress规则会在weave-npc-ingress链中添加规则(白名单),所以如果没有匹配默认规则和白名单规则的话报文就会被drop掉。
-A FORWARD -o weave -j WEAVE-NPC
-A FORWARD -o weave -m state --state NEW -j NFLOG --nflog-group 86
-A FORWARD -o weave -j DROP
… …
-A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-DEFAULT
-A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-INGRESS
-A WEAVE-NPC -m set ! --match-set weave-local-pods dst -j ACCEPT
func root(cmd *cobra.Command, args []string) { common.SetLogLevel(logLevel) //用于标识主机,产生本地的规则 if nodeName == "" { // HOSTNAME is set by Kubernetes for pods in the host network namespace nodeName = os.Getenv("HOSTNAME") } if nodeName == "" { common.Log.Fatalf("Must set node name via --node-name or $HOSTNAME") } common.Log.Infof("Starting Weaveworks NPC %s; node name %q", version, nodeName) if err := metrics.Start(metricsAddr); err != nil { common.Log.Fatalf("Failed to start metrics: %v", err) } if err := ulogd.Start(); err != nil { common.Log.Fatalf("Failed to start ulogd: %v", err) } config, err := rest.InClusterConfig() handleError(err) client, err := kubernetes.NewForConfig(config) handleError(err) // 创建iptables 对象,用于管理iptables规则及生效 ipt, err := iptables.New() handleError(err) // 创建ipeset对象,用于管理ipset资源 ips := ipset.New(common.LogLogger()) // resetIPTables在filter表添加WEAVE-NPC-INGRESS链、WEAVE-NPC-DEFAULT链、 // WEAVE-NPC链,若已经存在则置空(FLUSH) handleError(resetIPTables(ipt)) // resetIPSets 将weave-npc创建的ipset,即名字为”weave-“开头的ipset的成员删除 handleError(resetIPSets(ips)) // createBaseRules初始化iptables规则 // weave-npc链中添加iptables规则 // -A WEAVE-NPC -m state --state RELATED,ESTABLISHED -j ACCEPT // -A WEAVE-NPC -d -j ACCEPT // -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-DEFAULT // -A WEAVE-NPC -m state --state NEW -j WEAVE-NPC-INGRESS // -A WEAVE-NPC -m set ! --match-set weave-local-pods dst -j ACCEPT handleError(createBaseRules(ipt, ips)) npc := npc.New(nodeName, ipt, ips) // 下面起了三个controller,利用client-go的informer来分别负责namespaces、pods和 // networkpolicies的创改删处理 nsController := makeController(client.Core().RESTClient(), "namespaces", &coreapi.Namespace{}, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { handleError(npc.AddNamespace(obj.(*coreapi.Namespace))) }, DeleteFunc: func(obj interface{}) { switch obj := obj.(type) { case *coreapi.Namespace: handleError(npc.DeleteNamespace(obj)) case cache.DeletedFinalStateUnknown: // We know this object has gone away, but its final state is no longer // available from the API server. Instead we use the last copy of it // that we have, which is good enough for our cleanup. handleError(npc.DeleteNamespace(obj.Obj.(*coreapi.Namespace))) } }, UpdateFunc: func(old, new interface{}) { handleError(npc.UpdateNamespace(old.(*coreapi.Namespace), new.(*coreapi.Namespace))) }}) podController := makeController(client.Core().RESTClient(), "pods", &coreapi.Pod{}, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { handleError(npc.AddPod(obj.(*coreapi.Pod))) }, DeleteFunc: func(obj interface{}) { switch obj := obj.(type) { case *coreapi.Pod: handleError(npc.DeletePod(obj)) case cache.DeletedFinalStateUnknown: // We know this object has gone away, but its final state is no longer // available from the API server. Instead we use the last copy of it // that we have, which is good enough for our cleanup. handleError(npc.DeletePod(obj.Obj.(*coreapi.Pod))) } }, UpdateFunc: func(old, new interface{}) { handleError(npc.UpdatePod(old.(*coreapi.Pod), new.(*coreapi.Pod))) }}) npController := makeController(client.Extensions().RESTClient(), "networkpolicies", &extnapi.NetworkPolicy{}, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { handleError(npc.AddNetworkPolicy(obj.(*extnapi.NetworkPolicy))) }, DeleteFunc: func(obj interface{}) { switch obj := obj.(type) { case *extnapi.NetworkPolicy: handleError(npc.DeleteNetworkPolicy(obj)) case cache.DeletedFinalStateUnknown: // We know this object has gone away, but its final state is no longer // available from the API server. Instead we use the last copy of it // that we have, which is good enough for our cleanup. handleError(npc.DeleteNetworkPolicy(obj.Obj.(*extnapi.NetworkPolicy))) } }, UpdateFunc: func(old, new interface{}) { handleError(npc.UpdateNetworkPolicy(old.(*extnapi.NetworkPolicy), new.(*extnapi.NetworkPolicy))) }}) go nsController.Run(wait.NeverStop) go podController.Run(wait.NeverStop) go npController.Run(wait.NeverStop) signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) common.Log.Fatalf("Exiting: %v", <-signals) }
network policy是namespace scoped,所以资源的处理都在namespace的基础上去实现的。
func (npc *controller) AddNamespace(obj *coreapi.Namespace) error { npc.Lock() defer npc.Unlock() common.Log.Infof("EVENT AddNamespace %s", js(obj)) return npc.withNS(obj.ObjectMeta.Name, func(ns *ns) error { return errors.Wrap(ns.addNamespace(obj), "add namespace") }) }
一个namespace会有一个podselectorSet,其中entry中包含多个selector,每对一个pod spec作provision的时候,若此pod spec之前没有处理过则会创建一个seletor,并将通过onNewPodSelector将匹配的pod ip加到selector的ipset中去。
func newNS(name, nodeName string, ipt iptables.Interface, ips ipset.Interface, nsSelectors *selectorSet) (*ns, error) { allPods, err := newSelectorSpec(&metav1.LabelSelector{}, name, ipset.HashIP) if err != nil { return nil, err } ns := &ns{ ipt: ipt, ips: ips, name: name, nodeName: nodeName, pods: make(map[types.UID]*coreapi.Pod), policies: make(map[types.UID]*extnapi.NetworkPolicy), uid: uuid.NewUUID(), allPods: allPods, nsSelectors: nsSelectors, rules: newRuleSet(ipt)} ns.podSelectors = newSelectorSet(ips, ns.onNewPodSelector) // 对allpods创建selector,因为allpods的spec是{},即选择所有的pod,因此allpods对应的selector的ipset中包含了该namespace下所有的pod。 if err := ns.podSelectors.provision(ns.uid, nil, map[string]*selectorSpec{ns.allPods.key: ns.allPods}); err != nil { return nil, err } return ns, nil }
k8s v1.7之前 network policy不支持default deny,default deny是使用annotation的方式实现的,yaml如下:
kind: Namespace
apiVersion: v1
name: myns
net.beta.kubernetes.io/network-policy: |
"ingress": {
"isolation": "DefaultDeny"
-A WEAVE-NPC-DEFAULT -m set --match-set weave-$(sha1_on_namespacename) dst -j ACCEPT –m comment –comment “DefaultAllow isolation for namespace: $(namespace_name)“
-A FORWARD -o weave -j WEAVE-NPC
-A FORWARD -o weave -m state --state NEW -j NFLOG --nflog-group 86
-A FORWARD -o weave -j DROP
func (ns *ns) addNamespace(obj *coreapi.Namespace) error { ns.namespace = obj if !isDefaultDeny(obj) { if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil { return err } } // Add namespace ipset to matching namespace selectors // 遍历所有的namespace selector,若某个namespace selector与刚添加的namespace label匹配,则将此namespace添加到该selector对应的ipset下。 return ns.nsSelectors.addToMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns))
func (ns *ns) updateNamespace(oldObj, newObj *coreapi.Namespace) error { ns.namespace = newObj // Update bypass rule if ingress default has changed oldDefaultDeny := isDefaultDeny(oldObj) newDefaultDeny := isDefaultDeny(newObj) // 对比新旧使用deaultdeny的用法,选择删除或者加上by pass的规则 if oldDefaultDeny != newDefaultDeny { common.Log.Infof("namespace DefaultDeny changed from %t to %t", oldDefaultDeny, newDefaultDeny) if oldDefaultDeny { if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil { return err } } if newDefaultDeny { if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil { return err } } } // Re-evaluate namespace selector membership if labels have changed if !equals(oldObj.ObjectMeta.Labels, newObj.ObjectMeta.Labels) { for _, selector := range ns.nsSelectors.entries { oldMatch := selector.matches(oldObj.ObjectMeta.Labels) newMatch := selector.matches(newObj.ObjectMeta.Labels) if oldMatch == newMatch { continue } if oldMatch { if err := selector.delEntry(string(ns.allPods.ipsetName)); err != nil { return err } } if newMatch { if err := selector.addEntry(string(ns.allPods.ipsetName), namespaceComment(ns)); err != nil { return err } } } } return nil }
删除namespace,操作与增加相反,删除bypass rule规则,将namespace从匹配的 nsselector中删除
func (ns *ns) deleteNamespace(obj *coreapi.Namespace) error { ns.namespace = nil // Remove bypass rule if !isDefaultDeny(obj) { if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil { return err } } // Remove namespace ipset from any matching namespace selectors return ns.nsSelectors.delFromMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName)) }
func (ns *ns) addPod(obj *coreapi.Pod) error { ns.pods[obj.ObjectMeta.UID] = obj // 若pod不是运行态或者使用的HostNetwork,则返回 if !hasIP(obj) { return nil } //若pod属于本机,则添加到weave-local-pods ipset中 if ns.checkLocalPod(obj) { ns.ips.AddEntry(LocalIpset, obj.Status.PodIP, podComment(obj)) } // 若pod label与pod selector匹配,则将此pod ip添加pod selector对应的ipset // allPodspec由于每个pod都会匹配,所以所有新增的pod都会加入allpod ipset中。 return ns.podSelectors.addToMatching(obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj)) } // 当pod有无状态切换时,从local ipset中添加或者删除;label切换时,根据匹配关系从pod selector ipset中添加或者删除 func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { delete(ns.pods, oldObj.ObjectMeta.UID) ns.pods[newObj.ObjectMeta.UID] = newObj if !hasIP(oldObj) && !hasIP(newObj) { return nil } if hasIP(oldObj) && !hasIP(newObj) { if ns.checkLocalPod(oldObj) { ns.ips.DelEntry(LocalIpset, oldObj.Status.PodIP) } return ns.podSelectors.delFromMatching(oldObj.ObjectMeta.Labels, oldObj.Status.PodIP) } if !hasIP(oldObj) && hasIP(newObj) { if ns.checkLocalPod(newObj) { ns.ips.AddEntry(LocalIpset, newObj.Status.PodIP, podComment(newObj)) } return ns.podSelectors.addToMatching(newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj)) } if !equals(oldObj.ObjectMeta.Labels, newObj.ObjectMeta.Labels) || oldObj.Status.PodIP != newObj.Status.PodIP { for _, ps := range ns.podSelectors.entries { oldMatch := ps.matches(oldObj.ObjectMeta.Labels) newMatch := ps.matches(newObj.ObjectMeta.Labels) if oldMatch == newMatch && oldObj.Status.PodIP == newObj.Status.PodIP { continue } if oldMatch { if err := ps.delEntry(oldObj.Status.PodIP); err != nil { return err } } if newMatch { if err := ps.addEntry(newObj.Status.PodIP, podComment(newObj)); err != nil { return err } } } } return nil }
从ipset weave-local-pods中删除该pod ip,并从匹配的pod selector ipset中删除
func (ns *ns) deletePod(obj *coreapi.Pod) error { delete(ns.pods, obj.ObjectMeta.UID) if !hasIP(obj) { return nil } if ns.checkLocalPod(obj) { ns.ips.DelEntry(LocalIpset, obj.Status.PodIP) } return ns.podSelectors.delFromMatching(obj.ObjectMeta.Labels, obj.Status.PodIP) } func (ns *ns) addNetworkPolicy(obj *extnapi.NetworkPolicy) error { ns.policies[obj.ObjectMeta.UID] = obj // Analyse policy, determine which rules and ipsets are required rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj) if err != nil { return err } // Provision required resources in dependency order if err := ns.nsSelectors.provision(obj.ObjectMeta.UID, nil, nsSelectors); err != nil { return err } if err := ns.podSelectors.provision(obj.ObjectMeta.UID, nil, podSelectors); err != nil { return err } return ns.rules.provision(obj.ObjectMeta.UID, nil, rules) }
func (ns *ns) addNetworkPolicy(obj *extnapi.NetworkPolicy) error { ns.policies[obj.ObjectMeta.UID] = obj // Analyse policy, determine which rules and ipsets are required rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj) if err != nil { return err } // Provision required resources in dependency order if err := ns.nsSelectors.provision(obj.ObjectMeta.UID, nil, nsSelectors); err != nil { return err } if err := ns.podSelectors.provision(obj.ObjectMeta.UID, nil, podSelectors); err != nil { return err } return ns.rules.provision(obj.ObjectMeta.UID, nil, rules) } func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( rules map[string]*ruleSpec, nsSelectors, podSelectors map[string]*selectorSpec, err error) { nsSelectors = make(map[string]*selectorSpec) podSelectors = make(map[string]*selectorSpec) rules = make(map[string]*ruleSpec) // pod selector 匹配目的pod dstSelector, err := newSelectorSpec(&policy.Spec.PodSelector, ns.name, ipset.HashIP) if err != nil { return nil, nil, nil, err } podSelectors[dstSelector.key] = dstSelector for _, ingressRule := range policy.Spec.Ingress { // 如果ports或者from 不为空,但是没有内容,表示无报文可以匹配 // ingress: // - ports:[] if ingressRule.Ports != nil && len(ingressRule.Ports) == 0 { // Ports is empty, this rule matches no ports (no traffic matches). continue } // ingress: // - from: [] // 如果ports或者from 不为空,但是没有内容,表示无报文可以匹配 if ingressRule.From != nil && len(ingressRule.From) == 0 { // From is empty, this rule matches no sources (no traffic matches). continue } if ingressRule.From == nil { // From is not provided, this rule matches all sources (traffic not restricted by source). if ingressRule.Ports == nil { // Ports is not provided, this rule matches all ports (traffic not restricted by port). rule := newRuleSpec(nil, nil, dstSelector, nil) rules[rule.key] = rule } else { // Ports is present and contains at least one item, then this rule allows traffic // only if the traffic matches at least one port in the ports list. withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { rule := newRuleSpec(&proto, nil, dstSelector, &port) rules[rule.key] = rule }) } } else { // From is present and contains at least on item, this rule allows traffic only if the // traffic matches at least one item in the from list. // (ingress.From)networkpolicyPeer在1.8版本中支持ipblock // ingress: // - from: // - namespaceSelector: // matchLabels: // project: myproject // - podSelector: // matchLabels: // role: frontend // ports: // - protocol: TCP // port: 6379 for _, peer := range ingressRule.From { var srcSelector *selectorSpec if peer.PodSelector != nil { srcSelector, err = newSelectorSpec(peer.PodSelector, ns.name, ipset.HashIP) if err != nil { return nil, nil, nil, err } podSelectors[srcSelector.key] = srcSelector } if peer.NamespaceSelector != nil { srcSelector, err = newSelectorSpec(peer.NamespaceSelector, "", ipset.ListSet) if err != nil { return nil, nil, nil, err } nsSelectors[srcSelector.key] = srcSelector } // 在前面两步选择了selector之后选择受限访问的port // 这里有个问题,如果namespace selector和pod selector都有的话, // 应该是有两个match set,而不是像下面这样只使用最后一个。 if ingressRule.Ports == nil { // Ports is not provided, this rule matches all ports (traffic not restricted by port). rule := newRuleSpec(nil, srcSelector, dstSelector, nil) rules[rule.key] = rule } else { // Ports is present and contains at least one item, then this rule allows traffic // only if the traffic matches at least one port in the ports list. withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { rule := newRuleSpec(&proto, srcSelector, dstSelector, &port) rules[rule.key] = rule }) } } } } return rules, nsSelectors, podSelectors, nil }
根据network policy规则翻译为iptables规则。weave最后有一条默认drop规则,规则都是以白名单的创建,也就是-j ACCPET
-p $(proto) –m set –match-set $(src_ipset) src –m set –match-set $(src_ipset) dst --dport $(dstPort) -j ACCPET
func newRuleSpec(proto *string, srcHost *selectorSpec, dstHost *selectorSpec, dstPort *string) *ruleSpec { args := []string{} if proto != nil { args = append(args, "-p", *proto) } srcComment := "anywhere" if srcHost != nil { args = append(args, "-m", "set", "--match-set", string(srcHost.ipsetName), "src") if srcHost.nsName != "" { srcComment = fmt.Sprintf("pods: namespace: %s, selector: %s", srcHost.nsName, srcHost.key) } else { srcComment = fmt.Sprintf("namespaces: selector: %s", srcHost.key) } } dstComment := "anywhere" if dstHost != nil { args = append(args, "-m", "set", "--match-set", string(dstHost.ipsetName), "dst") dstComment = fmt.Sprintf("pods: namespace: %s, selector: %s", dstHost.nsName, dstHost.key) } if dstPort != nil { args = append(args, "--dport", *dstPort) } args = append(args, "-j", "ACCEPT") args = append(args, "-m", "comment", "--comment", fmt.Sprintf("%s -> %s", srcComment, dstComment)) key := strings.Join(args, " ") return &ruleSpec{key, args} }
Implement Kubernetes 1.7 NetworkPolicy semantics https://github.com/weaveworks/weave/pull/3151