zoukankan      html  css  js  c++  java
  • 一文读懂SuperEdge拓扑算法

    前言

    SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问

    在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性

    Kubernetes service topology awareness 特性于v1.17发布alpha版本,用于实现路由拓扑以及就近访问特性。用户需要在 service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的endpoint会被访问到,目前有三种 topologyKeys可供选择:

    • "kubernetes.io/hostname":访问本节点内(kubernetes.io/hostname label value相同)的 endpoint,如果没有则 service 访问失败
    • "topology.kubernetes.io/zone":访问相同zone域内(topology.kubernetes.io/zone label value 相同)的 endpoint,如果没有则 service 访问失败
    • "topology.kubernetes.io/region":访问相同region域内(topology.kubernetes.io/region label value相同)的 endpoint,如果没有则 service 访问失败

    除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内的 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则访问同一个 region 内的 endpoint,如果都不存在则访问失败

    另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:

    # A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.
    apiVersion: v1
    kind: Service
    metadata:
      name: my-service
    spec:
      selector:
        app: my-app
      ports:
        - protocol: TCP
          port: 80
          targetPort: 9376
      topologyKeys:
        - "kubernetes.io/hostname"
        - "topology.kubernetes.io/zone"
        - "topology.kubernetes.io/region"
        - "*"
    

    而service group实现的拓扑感知和社区对比,有如下区别:

    • service group 拓扑key可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone" 以及 "topology.kubernetes.io/region"
    • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问

    service group 实现的拓扑感知,service 配置如下:

    # A Service that only prefers node zone1al endpoints.
    apiVersion: v1
    kind: Service
    metadata:
      annotations:
        topologyKeys: '["zone1"]'
      labels:
        superedge.io/grid-selector: servicegrid-demo
      name: servicegrid-demo-svc
    spec:
      ports:
      - port: 80
        protocol: TCP
        targetPort: 8080
      selector:
        appGrid: echo
    

    在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

    # step1: labels edge nodes
    $ kubectl  get nodes
    NAME    STATUS   ROLES    AGE   VERSION
    node0   Ready    <none>   16d   v1.16.7
    node1   Ready    <none>   16d   v1.16.7
    node2   Ready    <none>   16d   v1.16.7
    # nodeunit1(nodegroup and servicegroup zone1)
    $ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  
    # nodeunit2(nodegroup and servicegroup zone1)
    $ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2
    $ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2
    ...
    # step3: deploy echo ServiceGrid
    $ cat <<EOF | kubectl --kubeconfig config apply -f -
    apiVersion: superedge.io/v1
    kind: ServiceGrid
    metadata:
      name: servicegrid-demo
      namespace: default
    spec:
      gridUniqKey: zone1
      template:
        selector:
          appGrid: echo
        ports:
        - protocol: TCP
          port: 80
          targetPort: 8080
    EOF
    servicegrid.superedge.io/servicegrid-demo created
    # note that there is only one relevant service generated
    $ kubectl  get svc
    NAME                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGE
    kubernetes             ClusterIP   192.168.0.1       <none>        443/TCP   16d
    servicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m
    # step4: access servicegrid-demo-svc(service topology and closed-looped)
    # execute on node0
    $ curl 192.168.6.139|grep "node name"
            node name:      node0
    # execute on node1 and node2
    $ curl 192.168.6.139|grep "node name"
            node name:      node2
    $ curl 192.168.6.139|grep "node name"
            node name:      node1
    

    在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service (包含由serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

    ServiceGrid Controller 分析

    ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

    • 1、创建并维护 service group 需要的若干CRDs(包括:ServiceGrid)
    • 2、监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出 ServiceGrid 进行解析,创建并且维护对应的 service
    • 3、监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑

    注意这里区别于 DeploymentGrid Controller:

    • 一个 ServiceGrid 对象只产生一个 service
    • 只需额外监听 service event,无需监听 node 事件。因为 node的CRUD 与 ServiceGrid 无关
    • ServiceGrid 对应产生的 service,命名为:{ServiceGrid}-svc
    func (sgc *ServiceGridController) syncServiceGrid(key string) error {
        startTime := time.Now()
        klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)
        defer func() {
            klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))
        }()
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            return err
        }
        sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)
        if errors.IsNotFound(err) {
            klog.V(2).Infof("service grid %v has been deleted", key)
            return nil
        }
        if err != nil {
            return err
        }
        if sg.Spec.GridUniqKey == "" {
            sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key")
            return nil
        }
        // get service workload list of this grid
        svcList, err := sgc.getServiceForGrid(sg)
        if err != nil {
            return err
        }
        if sg.DeletionTimestamp != nil {
            return nil
        }
        // sync service grid relevant services workload
        return sgc.reconcile(sg, svcList)
    }
    func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {
        svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())
        if err != nil {
            return nil, err
        }
        labelSelector, err := common.GetDefaultSelector(sg.Name)
        if err != nil {
            return nil, err
        }
        canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
            fresh, err := sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})
            if err != nil {
                return nil, err
            }
            if fresh.UID != sg.UID {
                return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,
                    sg.Name, fresh.UID, sg.UID)
            }
            return fresh, nil
        })
        cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)
        return cm.ClaimService(svcList)
    }
    func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList []*corev1.Service) error {
        var (
            adds    []*corev1.Service
            updates []*corev1.Service
            deletes []*corev1.Service
        )
        sgTargetSvcName := util.GetServiceName(g)
        isExistingSvc := false
        for _, svc := range svcList {
            if svc.Name == sgTargetSvcName {
                isExistingSvc = true
                template := util.KeepConsistence(g, svc)
                if !apiequality.Semantic.DeepEqual(template, svc) {
                    updates = append(updates, template)
                }
            } else {
                deletes = append(deletes, svc)
            }
        }
        if !isExistingSvc {
            adds = append(adds, util.CreateService(g))
        }
        return sgc.syncService(adds, updates, deletes)
    }
    func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {
        svc := &corev1.Service{
            ObjectMeta: metav1.ObjectMeta{
                Name:      GetServiceName(sg),
                Namespace: sg.Namespace,
                // Append existed ServiceGrid labels to service to be created
                Labels: func() map[string]string {
                    if sg.Labels != nil {
                        newLabels := sg.Labels
                        newLabels[common.GridSelectorName] = sg.Name
                        newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey
                        return newLabels
                    } else {
                        return map[string]string{
                            common.GridSelectorName:        sg.Name,
                            common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey,
                        }
                    }
                }(),
                Annotations: make(map[string]string),
            },
            Spec: sg.Spec.Template,
        }
        keys := make([]string, 1)
        keys[0] = sg.Spec.GridUniqKey
        keyData, _ := json.Marshal(keys)
        svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)
        return svc
    }
    

    由于逻辑与DeploymentGrid类似,这里不展开细节,重点关注 application-grid-wrapper 部分

    application-grid-wrapper 分析

    在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

    apiVersion: v1
    kind: Service
    metadata:
      annotations:
        topologyKeys: '["zone1"]'
      creationTimestamp: "2021-03-03T07:33:30Z"
      labels:
        superedge.io/grid-selector: servicegrid-demo
      name: servicegrid-demo-svc
      namespace: default
      ownerReferences:
      - apiVersion: superedge.io/v1
        blockOwnerDeletion: true
        controller: true
        kind: ServiceGrid
        name: servicegrid-demo
        uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581
      resourceVersion: "127987090"
      selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc
      uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
    spec:
      clusterIP: 192.168.161.1
      ports:
      - port: 80
        protocol: TCP
        targetPort: 8080
      selector:
        appGrid: echo
      sessionAffinity: None
      type: ClusterIP
    status:
      loadBalancer: {}
    

    为了实现 Kubernetes 零侵入,需要在 kube-proxy与apiserver 通信之间添加一层 wrapper,架构如下:

    img

    调用链路如下:

    kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
    

    因此application-grid-wrapper会起服务,接受来自kube-proxy的请求,如下:

    func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {
        ...
        klog.Infof("Start to run interceptor server")
        /* filter
         */
        server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}
        if insecure {
            return server.ListenAndServe()
        }
        ...
        server.TLSConfig = tlsConfig
        return server.ListenAndServeTLS("", "")
    }
    func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {
        handler := http.Handler(http.NewServeMux())
        handler = s.interceptEndpointsRequest(handler)
        handler = s.interceptServiceRequest(handler)
        handler = s.interceptEventRequest(handler)
        handler = s.interceptNodeRequest(handler)
        handler = s.logger(handler)
        if debug {
            handler = s.debugger(handler)
        }
        return handler
    }
    

    这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:

    • debug:接受 debug 请求,返回 wrapper pprof 运行信息

    • logger:打印请求日志

    • node:接受 kube-proxy node GET(/api/v1/nodes/{node}) 请求,并返回node信息

    • event:接受 kube-proxy events POST (/events)请求,并将请求转发给 lite-apiserver

      func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {
                handler.ServeHTTP(w, r)
                return
            }
            targetURL, _ := url.Parse(s.restConfig.Host)
            reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)
            reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)
            reverseProxy.ServeHTTP(w, r)
        })
      }
      
    • service:接受 kube-proxy service List&Watch(/api/v1/services) 请求,并根据 storageCache 内容返回(GetServices)

    • endpoint:接受 kube-proxy endpoint List&Watch(/api/v1/endpoints) 请求,并根据 storageCache内容返回 (GetEndpoints)

    下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑

    wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

    type storageCache struct {
        // hostName is the nodeName of node which application-grid-wrapper deploys on
        hostName         string
        wrapperInCluster bool
        // mu lock protect the following map structure
        mu           sync.RWMutex
        servicesMap  map[types.NamespacedName]*serviceContainer
        endpointsMap map[types.NamespacedName]*endpointsContainer
        nodesMap     map[types.NamespacedName]*nodeContainer
        // service watch channel
        serviceChan chan<- watch.Event
        // endpoints watch channel
        endpointsChan chan<- watch.Event
    }
    ...
    func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {
        msc := &storageCache{
            hostName:         hostName,
            wrapperInCluster: wrapperInCluster,
            servicesMap:      make(map[types.NamespacedName]*serviceContainer),
            endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),
            nodesMap:         make(map[types.NamespacedName]*nodeContainer),
            serviceChan:      serviceNotifier,
            endpointsChan:    endpointsNotifier,
        }
        return msc
    }
    ...
    func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {
        ...
        if err := s.setupInformers(ctx.Done()); err != nil {
            return err
        }
        klog.Infof("Start to run interceptor server")
        /* filter
         */
        server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}
        ...
        return server.ListenAndServeTLS("", "")
    }
    func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {
        klog.Infof("Start to run service and endpoints informers")
        noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
        if err != nil {
            klog.Errorf("can't parse proxy label, %v", err)
            return err
        }
        noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
        if err != nil {
            klog.Errorf("can't parse headless label, %v", err)
            return err
        }
        labelSelector := labels.NewSelector()
        labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
        resyncPeriod := time.Minute * 5
        client := kubernetes.NewForConfigOrDie(s.restConfig)
        nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
        informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,
            informers.WithTweakListOptions(func(options *metav1.ListOptions) {
                options.LabelSelector = labelSelector.String()
            }))
        nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()
        serviceInformer := informerFactory.Core().V1().Services().Informer()
        endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()
        /*
         */
        nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)
        serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)
        endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)
        go nodeInformer.Run(stop)
        go serviceInformer.Run(stop)
        go endpointsInformer.Run(stop)
        if !cache.WaitForNamedCacheSync("node", stop,
            nodeInformer.HasSynced,
            serviceInformer.HasSynced,
            endpointsInformer.HasSynced) {
            return fmt.Errorf("can't sync informers")
        }
        return nil
    }
    func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {
        return &nodeHandler{cache: sc}
    }
    func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {
        return &serviceHandler{cache: sc}
    }
    func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {
        return &endpointsHandler{cache: sc}
    }
    

    这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:

    1、NodeEventHandler

    NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到storageCache.nodesMap 中 (key 为 nodeName,value 为 node 以及 node labels)

    func (nh *nodeHandler) add(node *v1.Node) {
        sc := nh.cache
        sc.mu.Lock()
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}
        klog.Infof("Adding node %v", nodeKey)
        sc.nodesMap[nodeKey] = &nodeContainer{
            node:   node,
            labels: node.Labels,
        }
        // update endpoints
        changedEps := sc.rebuildEndpointsMap()
        sc.mu.Unlock()
        for _, eps := range changedEps {
            sc.endpointsChan <- eps
        }
    }
    func (nh *nodeHandler) update(node *v1.Node) {
        sc := nh.cache
        sc.mu.Lock()
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}
        klog.Infof("Updating node %v", nodeKey)
        nodeContainer, found := sc.nodesMap[nodeKey]
        if !found {
            sc.mu.Unlock()
            klog.Errorf("Updating non-existed node %v", nodeKey)
            return
        }
        nodeContainer.node = node
        // return directly when labels of node stay unchanged
        if reflect.DeepEqual(node.Labels, nodeContainer.labels) {
            sc.mu.Unlock()
            return
        }
        nodeContainer.labels = node.Labels
        // update endpoints
        changedEps := sc.rebuildEndpointsMap()
        sc.mu.Unlock()
        for _, eps := range changedEps {
            sc.endpointsChan <- eps
        }
    }
    ...
    

    同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap

    // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
    func (sc *storageCache) rebuildEndpointsMap() []watch.Event {
        evts := make([]watch.Event, 0)
        for name, endpointsContainer := range sc.endpointsMap {
            newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)
            if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {
                continue
            }
            sc.endpointsMap[name].modified = newEps
            evts = append(evts, watch.Event{
                Type:   watch.Modified,
                Object: newEps,
            })
        }
        return evts
    }
    

    rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

    // pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels
    func pruneEndpoints(hostName string,
        nodes map[types.NamespacedName]*nodeContainer,
        services map[types.NamespacedName]*serviceContainer,
        eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {
        epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}
        if wrapperInCluster {
            eps = genLocalEndpoints(eps)
        }
        // dangling endpoints
        svc, ok := services[epsKey]
        if !ok {
            klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)
            return eps
        }
        // normal service
        if len(svc.keys) == 0 {
            klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)
            return eps
        }
        // topology endpoints
        newEps := eps.DeepCopy()
        for si := range newEps.Subsets {
            subnet := &newEps.Subsets[si]
            subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)
            subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)
        }
        klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets)
        return newEps
    }
    // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
    func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,
        addresses []v1.EndpointAddress) []v1.EndpointAddress {
        hostNode, found := nodes[types.NamespacedName{Name: hostName}]
        if !found {
            return nil
        }
        filteredEndpointAddresses := make([]v1.EndpointAddress, 0)
        for i := range addresses {
            addr := addresses[i]
            if nodeName := addr.NodeName; nodeName != nil {
                epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]
                if !found {
                    continue
                }
                if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {
                    filteredEndpointAddresses = append(filteredEndpointAddresses, addr)
                }
            }
        }
        return filteredEndpointAddresses
    }
    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {
        if n1 == nil || n2 == nil {
            return false
        }
        for _, key := range keys {
            val1, v1found := n1[key]
            val2, v2found := n2[key]
            if v1found && v2found && val1 == val2 {
                return true
            }
        }
        return false
    }
    

    算法逻辑如下:

    • 判断 endpoint 是否为 default kubernetes service,如果是,则将该 endpoint 转化为 wrapper 所在边缘节点的 lite-apiserver 地址(127.0.0.1)和端口(51003)
    apiVersion: v1
    kind: Endpoints
    metadata:
      annotations:
        superedge.io/local-endpoint: 127.0.0.1
        superedge.io/local-port: "51003"
      name: kubernetes
      namespace: default
    subsets:
    - addresses:
      - ip: 172.31.0.60
      ports:
      - name: https
        port: xxx
        protocol: TCP
    
    func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {
        if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {
            return eps
        }
        klog.V(4).Infof("begin to gen local ep %v", eps)
        ipAddress, e := eps.Annotations[EdgeLocalEndpoint]
        if !e {
            return eps
        }
        portStr, e := eps.Annotations[EdgeLocalPort]
        if !e {
            return eps
        }
        klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)
        port, err := strconv.ParseInt(portStr, 10, 32)
        if err != nil {
            klog.Errorf("parse int %s err %v", portStr, err)
            return eps
        }
        ip := net.ParseIP(ipAddress)
        if ip == nil {
            klog.Warningf("parse ip %s nil", ipAddress)
            return eps
        }
        nep := eps.DeepCopy()
        nep.Subsets = []v1.EndpointSubset{
            {
                Addresses: []v1.EndpointAddress{
                    {
                        IP: ipAddress,
                    },
                },
                Ports: []v1.EndpointPort{
                    {
                        Protocol: v1.ProtocolTCP,
                        Port:     int32(port),
                        Name:     "https",
                    },
                },
            },
        }
        klog.V(4).Infof("gen new endpoint complete %v", nep)
        return nep
    }
    

    这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver

    • 从 storageCache.servicesMap cache 中根据 endpoint 名称 (namespace/name) 取出对应 service,如果该 service 没有 topologyKeys 则无需做拓扑转化 (非service group)
    func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {
        if !hasTopologyKey(objectMeta) {
            return nil
        }
        var keys []string
        keyData := objectMeta.Annotations[TopologyAnnotationsKey]
        if err := json.Unmarshal([]byte(keyData), &keys); err != nil {
            klog.Errorf("can't parse topology keys %s, %v", keyData, err)
            return nil
        }
        return keys
    }
    
    • 调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint
    // filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
    func filterConcernedAddresses(topologyKeys []string, hostName string, nodes map[types.NamespacedName]*nodeContainer,
        addresses []v1.EndpointAddress) []v1.EndpointAddress {
        hostNode, found := nodes[types.NamespacedName{Name: hostName}]
        if !found {
            return nil
        }
        filteredEndpointAddresses := make([]v1.EndpointAddress, 0)
        for i := range addresses {
            addr := addresses[i]
            if nodeName := addr.NodeName; nodeName != nil {
                epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]
                if !found {
                    continue
                }
                if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {
                    filteredEndpointAddresses = append(filteredEndpointAddresses, addr)
                }
            }
        }
        return filteredEndpointAddresses
    }
    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {
        if n1 == nil || n2 == nil {
            return false
        }
        for _, key := range keys {
            val1, v1found := n1[key]
            val2, v2found := n2[key]
            if v1found && v2found && val1 == val2 {
                return true
            }
        }
        return false
    }
    

    注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service

    回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache.endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。

    func (nh *nodeHandler) add(node *v1.Node) {
        sc := nh.cache
        sc.mu.Lock()
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}
        klog.Infof("Adding node %v", nodeKey)
        sc.nodesMap[nodeKey] = &nodeContainer{
            node:   node,
            labels: node.Labels,
        }
        // update endpoints
        changedEps := sc.rebuildEndpointsMap()
        sc.mu.Unlock()
        for _, eps := range changedEps {
            sc.endpointsChan <- eps
        }
    }
    // rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
    func (sc *storageCache) rebuildEndpointsMap() []watch.Event {
        evts := make([]watch.Event, 0)
        for name, endpointsContainer := range sc.endpointsMap {
            newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)
            if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {
                continue
            }
            sc.endpointsMap[name].modified = newEps
            evts = append(evts, watch.Event{
                Type:   watch.Modified,
                Object: newEps,
            })
        }
        return evts
    }
    

    另外,如果 endpoints (拓扑感知后修改的 endpoints) 发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理

    2、ServiceEventHandler

    storageCache.servicesMap 结构体 key 为 service 名称 (namespace/name),value 为 serviceContainer,包含如下数据:

    • svc:service对象
    • keys:service topologyKeys

    对于 service 资源的改动,这里用 Update event 说明:

    func (sh *serviceHandler) update(service *v1.Service) {
        sc := sh.cache
        sc.mu.Lock()
        serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
        klog.Infof("Updating service %v", serviceKey)
        newTopologyKeys := getTopologyKeys(&service.ObjectMeta)
        serviceContainer, found := sc.servicesMap[serviceKey]
        if !found {
            sc.mu.Unlock()
            klog.Errorf("update non-existed service, %v", serviceKey)
            return
        }
        sc.serviceChan <- watch.Event{
            Type:   watch.Modified,
            Object: service,
        }
        serviceContainer.svc = service
        // return directly when topologyKeys of service stay unchanged
        if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {
            sc.mu.Unlock()
            return
        }
        serviceContainer.keys = newTopologyKeys
        // update endpoints
        changedEps := sc.rebuildEndpointsMap()
        sc.mu.Unlock()
        for _, eps := range changedEps {
            sc.endpointsChan <- eps
        }
    }
    

    逻辑如下:

    • 获取 service topologyKeys
    • 构建 service event.Modified event
    • 比较 service topologyKeys 与已经存在的是否有差异
    • 如果有差异则更新 topologyKeys,且调用 rebuildEndpointsMap刷新该 service 对应的 endpoints,如果 endpoints 发生变化,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理

    3、EndpointsEventHandler

    storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

    • endpoints:拓扑修改前的 endpoints
    • modified:拓扑修改后的 endpoints

    对于 endpoints 资源的改动,这里用 Update event 说明:

    func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {
        sc := eh.cache
        sc.mu.Lock()
        endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
        klog.Infof("Updating endpoints %v", endpointsKey)
        endpointsContainer, found := sc.endpointsMap[endpointsKey]
        if !found {
            sc.mu.Unlock()
            klog.Errorf("Updating non-existed endpoints %v", endpointsKey)
            return
        }
        endpointsContainer.endpoints = endpoints
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)
        changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)
        if changed {
            endpointsContainer.modified = newEps
        }
        sc.mu.Unlock()
        if changed {
            sc.endpointsChan <- watch.Event{
                Type:   watch.Modified,
                Object: newEps,
            }
        }
    }
    

    逻辑如下:

    • 更新 endpointsContainer.endpoint 为新的 endpoints 对象
    • 调用 pruneEndpoints 获取拓扑刷新后的 endpoints
    • 比较 endpointsContainer.modified 与新刷新后的 endpoints
    • 如果有差异则更新 endpointsContainer.modified,则构建 endpoints watch event,传递给 endpoints handler (interceptEndpointsRequest) 处理

    在分析完NodeEventHandler,ServiceEventHandler以及EndpointsEventHandler之后,我们回到具体的http handler List&Watch处理逻辑上,这里以endpoints为例:

    func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            if r.Method != http.MethodGet || !strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {
                handler.ServeHTTP(w, r)
                return
            }
            queries := r.URL.Query()
            acceptType := r.Header.Get("Accept")
            info, found := s.parseAccept(acceptType, s.mediaSerializer)
            if !found {
                klog.Errorf("can't find %s serializer", acceptType)
                w.WriteHeader(http.StatusBadRequest)
                return
            }
            encoder := scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)
            // list request
            if queries.Get("watch") == "" {
                w.Header().Set("Content-Type", info.MediaType)
                allEndpoints := s.cache.GetEndpoints()
                epsItems := make([]v1.Endpoints, 0, len(allEndpoints))
                for _, eps := range allEndpoints {
                    epsItems = append(epsItems, *eps)
                }
                epsList := &v1.EndpointsList{
                    Items: epsItems,
                }
                err := encoder.Encode(epsList, w)
                if err != nil {
                    klog.Errorf("can't marshal endpoints list, %v", err)
                    w.WriteHeader(http.StatusInternalServerError)
                    return
                }
                return
            }
            // watch request
            timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")
            timeout := time.Minute
            if timeoutSecondsStr != "" {
                timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))
            }
            timer := time.NewTimer(timeout)
            defer timer.Stop()
            flusher, ok := w.(http.Flusher)
            if !ok {
                klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
                w.WriteHeader(http.StatusMethodNotAllowed)
                return
            }
            e := restclientwatch.NewEncoder(
                streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),
                    scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),
                encoder)
            if info.MediaType == runtime.ContentTypeProtobuf {
                w.Header().Set("Content-Type", runtime.ContentTypeProtobuf+";stream=watch")
            } else {
                w.Header().Set("Content-Type", runtime.ContentTypeJSON)
            }
            w.Header().Set("Transfer-Encoding", "chunked")
            w.WriteHeader(http.StatusOK)
            flusher.Flush()
            for {
                select {
                case <-r.Context().Done():
                    return
                case <-timer.C:
                    return
                case evt := <-s.endpointsWatchCh:
                    klog.V(4).Infof("Send endpoint watch event: %+#v", evt)
                    err := e.Encode(&evt)
                    if err != nil {
                        klog.Errorf("can't encode watch event, %v", err)
                        return
                    }
                    if len(s.endpointsWatchCh) == 0 {
                        flusher.Flush()
                    }
                }
            }
        })
    }
    

    逻辑如下:

    • 如果为List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回
    func (sc *storageCache) GetEndpoints() []*v1.Endpoints {
        sc.mu.RLock()
        defer sc.mu.RUnlock()
        epList := make([]*v1.Endpoints, 0, len(sc.endpointsMap))
        for _, v := range sc.endpointsMap {
            epList = append(epList, v.modified)
        }
        return epList
    }
    
    • 如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回

    interceptServiceRequest 逻辑与 interceptEndpointsRequest 一致,这里不再赘述。

    总结

    • SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问
    • service group 实现的拓扑感知和 Kubernetes 社区原生实现对比,有如下区别:
      • service group 拓扑 key 可以自定义,也即为 gridUniqKey,使用起来更加灵活;而社区实现目前只有三种选择:"kubernetes.io/hostname","topology.kubernetes.io/zone"以及"topology.kubernetes.io/region"
      • service group 只能填写一个拓扑 key,也即只能访问本拓扑域内有效的 endpoint,无法访问其它拓扑域的 endpoint;而社区可以通过 topologyKey 列表以及"*"实现其它备选拓扑域 endpoint 的访问
    • ServiceGrid Controller 负责根据 ServiceGrid 产生对应的 service(包含由serviceGrid.Spec.GridUniqKey构成的 topologyKeys annotations),逻辑和 DeploymentGrid Controller 整体一致,如下:
      • 创建并维护 service group 需要的若干 CRDs (包括:ServiceGrid)
      • 监听 ServiceGrid event,并填充 ServiceGrid 到工作队列中;循环从队列中取出ServiceGrid进行解析,创建并且维护对应的 service
      • 监听 service event,并将相关的 ServiceGrid 塞到工作队列中进行上述处理,协助上述逻辑达到整体 reconcile 逻辑
    • 为了实现Kubernetes零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,调用链路如下:kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
    • application-grid-wrapper 是一个 http server,接受来自 kube-proxy 的请求,同时维护一个资源缓存,处理函数由外到内依次如下:
      • debug:接受 debug 请求,返回 wrapper pprof 运行信息
      • logger:打印请求日志
      • node:接受kube-proxy node GET (/api/v1/nodes/{node}) 请求,并返回 node 信息
      • event:接受 kube-proxy events POST (/events) 请求,并将请求转发给 lite-apiserver
      • service:接受 kube-proxy service List&Watch (/api/v1/services) 请求,并根据 storageCache 内容返回 (GetServices)。
      • endpoint:接受 kube-proxy endpoint List&Watch(/api/v1/endpoints) 请求,并根据storageCache 内容返回 (GetEndpoints)。
    • wrapper 为了实现拓扑感知,维护了一个资源cache,包括:node,service,endpoint,同时注册了相关 event 处理函数。核心拓扑算法逻辑为:调用 filterConcernedAddresses 过滤 endpoint.Subsets Addresses 以及 NotReadyAddresses,只保留同一个 service topologyKeys 中的 endpoint。另外,如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。
    • wrapper 接受来自 kube-proxy 对 endpoints 以及 service 的 List&Watch 请求,以 endpoints 为例:如果为List请求,则调用 GetEndpoints 获取拓扑修改后的 endpoints 列表,并返回;如果为 Watch 请求,则不断从 storageCache.endpointsWatchCh 管道中接受 watch event,并返回。service逻辑与 endpoints 一致。

    展望

    目前 SuperEdge service group 实现的拓扑算法功能更加灵活方便,如何处理与 Kubernetes 社区 service topology awareness 之间的关系值得探索,建议将 SuperEdge 拓扑算法推到社区

    Refs

  • 相关阅读:
    Java 序列化和反序列化(三)Serializable 源码分析
    Java 序列化和反序列化(二)Serializable 源码分析
    Java 序列化和反序列化(一)Serializable 使用场景
    JUC源码分析-其它工具类(一)ThreadLocalRandom
    JUC源码分析-线程池篇(三)ScheduledThreadPoolExecutor
    JUC源码分析-线程池篇(二)FutureTask
    JUC源码分析-线程池篇(三)Timer
    JUC源码分析-线程池篇(一):ThreadPoolExecutor
    JUC源码分析-集合篇:并发类容器介绍
    JUC源码分析-集合篇(十)LinkedTransferQueue
  • 原文地址:https://www.cnblogs.com/tencent-cloud-native/p/14554124.html
Copyright © 2011-2022 走看看