zoukankan      html  css  js  c++  java
  • 2.深入Istio源码:Pilot服务发现

    转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com

    本文使用的Istio源码是 release 1.5。

    介绍

    pilot-discovery是在Pilot中的核心服务,在Pilot中名为pilot-discovery,主要功能是从注册中心(如 kubernetes 或者 consul)获取信息并汇集,从 Kubernetes API Server 中获取流量规则,并将服务信息和流量规则转化为数据面可以理解的格式,通过标准的数据面 API 下发到网格中的各个SideCar中。

    pilot-discovery包含了服务发现、配置规则发现、xDS配置下发。总体上打算分三篇来进行讲解,这一篇主要看看服务发现部分的实现。文章中有涉及xDS协议的一些东西,大家可以看看这篇文章:深入解读Service Mesh背后的技术细节

    istio

    Pilot服务发现指通过监听底层平台的服务注册中心来缓存Istio服务模型,并且监视服务模型的变化,再服务模型更新时触发相关事件回调处理函数的执行。

    服务发现工作机制

    Pilot初始化

    	discoveryCmd = &cobra.Command{
    		Use:   "discovery",
    		Short: "Start Istio proxy discovery service.",
    		Args:  cobra.ExactArgs(0),
    		RunE: func(c *cobra.Command, args []string) error {
    			...
    			//日志配置
    			if err := log.Configure(loggingOptions); err != nil {
    				return err
    			} 
    			... 
    			stop := make(chan struct{})
     
    			// 创建xDs服务器
    			discoveryServer, err := bootstrap.NewServer(&serverArgs)
    			if err != nil {
    				return fmt.Errorf("failed to create discovery service: %v", err)
    			} 
    			// 启动服务器
    			if err := discoveryServer.Start(stop); err != nil {
    				return fmt.Errorf("failed to start discovery service: %v", err)
    			}
    			//等待进程推出
    			cmd.WaitSignal(stop) 
    			discoveryServer.WaitUntilCompletion()
    			return nil
    		},
    	}
    

    Pilot服务在初始化的时候首先会初始化日志配置,然后创建xDs服务器,这里的xDs指的是x Discovery Service的意思,x代表了一系列的组件如:Cluster、Endpoint、Listener、Route 等。

    func NewServer(args *PilotArgs) (*Server, error) {
    	 
    	args.Default()
    	e := &model.Environment{
    		ServiceDiscovery: aggregate.NewController(),
    		PushContext:      model.NewPushContext(),
    	}
    
    	s := &Server{
    		basePort:       args.BasePort,
    		clusterID:      getClusterID(args),
    		environment:    e,
    		EnvoyXdsServer: envoyv2.NewDiscoveryServer(e, args.Plugins),
    		forceStop:      args.ForceStop,
    		mux:            http.NewServeMux(),
    	}
     
    	// 初始化处理Istio Config的控制器
    	if err := s.initConfigController(args); err != nil {
    		return nil, fmt.Errorf("config controller: %v", err)
    	}
    	// 初始化处理Service Discovery的控制器
    	if err := s.initServiceControllers(args); err != nil {
    		return nil, fmt.Errorf("service controllers: %v", err)
    	} 
    	... 
    	//初始化xDS服务端
    	if err := s.initDiscoveryService(args); err != nil {
    		return nil, fmt.Errorf("discovery service: %v", err)
    	}
    	... 
    	// Webhook 回调服务
    	if err := s.initHTTPSWebhookServer(args); err != nil {
    		return nil, fmt.Errorf("injectionWebhook server: %v", err)
    	}
        //sidecar注入相关
    	if err := s.initSidecarInjector(args); err != nil {
    		return nil, fmt.Errorf("sidecar injector: %v", err)
    	}
    
    	... 
    	return s, nil
    }
    

    NewServer方法里面初始化了很多模块,这里挑相关的看看initConfigController是和配置服务相关的,我们之后再看,这里我们主要看initServiceControllers。

    ServiceControllers

    服务发现的主要逻辑在Pilot中由ServiceController(服务控制器)实现,通过监听底层平台的服务注册中心来缓存Istio服务模型,并监视服务模型的变化,在服务模型更新时触发相关事件回调处理函数的执行。

    serviceController

    初始化

    Controller的初始化执行流程很简单,这里用一张图来描述,initServiceControllers方法最后会调用到NewController方法来进行初始化。

    Group 1

    func NewController(client kubernetes.Interface, options Options) *Controller {
    	log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
    		options.WatchedNamespace, options.ResyncPeriod)
    
    	// The queue requires a time duration for a retry delay after a handler error
    	// 初始化Controller
    	c := &Controller{
    		domainSuffix:               options.DomainSuffix,
    		client:                     client,
    		//控制器任务队列
    		queue:                      queue.NewQueue(1 * time.Second),
    		clusterID:                  options.ClusterID,
    		xdsUpdater:                 options.XDSUpdater,
    		servicesMap:                make(map[host.Name]*model.Service),
    		externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
    		networksWatcher:            options.NetworksWatcher,
    		metrics:                    options.Metrics,
    	}
    	//获取informer
    	sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
    	//注册 informer处理器
    	c.services = sharedInformers.Core().V1().Services().Informer()
    	//Services Handler
    	registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)
    	//endpoints Handler
    	switch options.EndpointMode {
    	case EndpointsOnly:
    		c.endpoints = newEndpointsController(c, sharedInformers)
    	case EndpointSliceOnly:
    		c.endpoints = newEndpointSliceController(c, sharedInformers)
    	}
    	//Nodes Handler
    	c.nodes = sharedInformers.Core().V1().Nodes().Informer()
    	registerHandlers(c.nodes, c.queue, "Nodes", c.onNodeEvent)
    
    	podInformer := sharedInformers.Core().V1().Pods().Informer()
    	c.pods = newPodCache(podInformer, c)
    	//Pods Handler
    	registerHandlers(podInformer, c.queue, "Pods", c.pods.onEvent)
    
    	return c
    }
    

    NewController方法里面首先是初始化Controller,然后获取informer后分别注册Services Handler、endpoints Handler、Nodes Handler、Pods Handler。

    核心功能就是监听k8s相关资源(Service、Endpoint、Pod、Node)的更新事件,执行相应的事件处理回调函数。

    这里的Controller结构体实现了Controller接口:

    type Controller interface {
    	// AppendServiceHandler notifies about changes to the service catalog.
    	AppendServiceHandler(f func(*Service, Event)) error
    
    	// AppendInstanceHandler notifies about changes to the service instances
    	// for a service.
    	AppendInstanceHandler(f func(*ServiceInstance, Event)) error
    
    	// Run until a signal is received
    	Run(stop <-chan struct{})
    }
    

    再注册完毕后会调用其Run方法异步执行。

    //异步调用Run方法
    go serviceControllers.Run(stop)
    //run方法里面会遍历GetRegistries列表,并异步执行其Run方法
    func (c *Controller) Run(stop <-chan struct{}) {
    
    	for _, r := range c.GetRegistries() {
    		go r.Run(stop)
    	}
    
    	<-stop
    	log.Info("Registry Aggregator terminated")
    }
    

    到这里ServiceController为四种资源分别创建了一个监听器,用于监听K8s的资源更新,并注册EventHandler。

    Group 1

    Service处理器

    func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
    	if err := c.checkReadyForEvents(); err != nil {
    		return err
    	}
    
    	svc, ok := curr.(*v1.Service)
    	if !ok {
    		tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
    		if !ok {
    			log.Errorf("Couldn't get object from tombstone %#v", curr)
    			return nil
    		}
    		svc, ok = tombstone.Obj.(*v1.Service)
    		if !ok {
    			log.Errorf("Tombstone contained object that is not a service %#v", curr)
    			return nil
    		}
    	}
    
    	log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
    	//将k8s service 转换成 istio service
    	svcConv := kube.ConvertService(*svc, c.domainSuffix, c.clusterID)
    	//根据事件类型处理事件
    	switch event {
    	//删除事件
    	case model.EventDelete:
    		c.Lock()
    		delete(c.servicesMap, svcConv.Hostname)
    		delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
    		c.Unlock()
    		// EDS needs to just know when service is deleted.
    		//更新服务缓存
    		c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
    	default:
    		// instance conversion is only required when service is added/updated.
    		instances := kube.ExternalNameServiceInstances(*svc, svcConv)
    		c.Lock()
    		c.servicesMap[svcConv.Hostname] = svcConv
    		if instances == nil {
    			delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
    		} else {
    			c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
    		}
    		c.Unlock()
    		//更新服务缓存
    		c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
    	}
    
    	// Notify service handlers.
    	// 触发XDS事件处理器
    	for _, f := range c.serviceHandlers {
    		f(svcConv, event)
    	}
    
    	return nil
    }
    

    Service事件处理器会将根据事件的类型更新缓存,然后调用serviceHandlers的事件处理器进行回调。serviceHandlers事件处理器是在初始化DiscoveryService的时候设置的。

    serviceHandler := func(svc *model.Service, _ model.Event) {
    		pushReq := &model.PushRequest{
    			Full:               true,
    			NamespacesUpdated:  map[string]struct{}{svc.Attributes.Namespace: {}},
    			ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
    			Reason:             []model.TriggerReason{model.ServiceUpdate},
    		}
            //配置更新
    		s.EnvoyXdsServer.ConfigUpdate(pushReq)
    	}
    

    Endpoint处理器

    Endpoint处理器会在调用newEndpointsController创建endpointsController的时候进行注册

    func newEndpointsController(c *Controller, sharedInformers informers.SharedInformerFactory) *endpointsController {
    	informer := sharedInformers.Core().V1().Endpoints().Informer()
    	out := &endpointsController{
    		kubeEndpoints: kubeEndpoints{
    			c:        c,
    			informer: informer,
    		},
    	}
    	//注册处理器
    	out.registerEndpointsHandler()
    	return out
    }
    

    在回调的时候会调用到endpointsController的onEvent方法:

    func (e *endpointsController) onEvent(curr interface{}, event model.Event) error {
    	... 
    	return e.handleEvent(ep.Name, ep.Namespace, event, curr, func(obj interface{}, event model.Event) {
    		ep := obj.(*v1.Endpoints)
    		//EDS更新处理
    		e.c.updateEDS(ep, event)
    	})
    }
    

    这里会调用updateEDS进行EDS(Endpoint Discovery service)更新处理。

    func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
    	hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
    
    	endpoints := make([]*model.IstioEndpoint, 0)
    	if event != model.EventDelete {
    		for _, ss := range ep.Subsets {
    			for _, ea := range ss.Addresses {
    				//获取Endpoint对应的Pod实例
    				pod := c.pods.getPodByIP(ea.IP)
    				...   
    				// 将Endpoint转换成Istio模型IstioEndpoint
    				for _, port := range ss.Ports {
    					endpoints = append(endpoints, &model.IstioEndpoint{
    						Address:         ea.IP,
    						EndpointPort:    uint32(port.Port),
    						ServicePortName: port.Name,
    						Labels:          labelMap,
    						UID:             uid,
    						ServiceAccount:  sa,
    						Network:         c.endpointNetwork(ea.IP),
    						Locality:        locality,
    						Attributes:      model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
    						TLSMode:         tlsMode,
    					})
    				}
    			}
    		}
    	} 
    	//使用xdsUpdater更新EDS
    	_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(hostname), ep.Namespace, endpoints)
    }
    

    在这里会重新封装endpoints然后调用EDSUpdate进行更新。

    func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
    	istioEndpoints []*model.IstioEndpoint) error {
    	inboundEDSUpdates.Increment()
    	s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false)
    	return nil
    }
    
    func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string,
    	istioEndpoints []*model.IstioEndpoint, internal bool) { 
    	s.mutex.Lock()
    	defer s.mutex.Unlock()
    	requireFull := false
     
    	...
    	//找到之前缓存的服务
    	if _, f := s.EndpointShardsByService[serviceName]; !f { 
    		s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{}
    	}
    	ep, f := s.EndpointShardsByService[serviceName][namespace]
    	//不存在则初始化
    	if !f { 
    		ep = &EndpointShards{
    			Shards:          map[string][]*model.IstioEndpoint{},
    			ServiceAccounts: map[string]bool{},
    		}
    		s.EndpointShardsByService[serviceName][namespace] = ep
    		if !internal {
    			adsLog.Infof("Full push, new service %s", serviceName)
    			requireFull = true
    		}
    	} 
    	... 
    	ep.mutex.Lock()
    	ep.Shards[clusterID] = istioEndpoints
    	ep.ServiceAccounts = serviceAccounts
    	ep.mutex.Unlock() 
    	
    	if !internal {
    		var edsUpdates map[string]struct{}
    		if !requireFull {
    			edsUpdates = map[string]struct{}{serviceName: {}}
    		}
    		//配置更新
    		s.ConfigUpdate(&model.PushRequest{
    			Full:               requireFull,
    			NamespacesUpdated:  map[string]struct{}{namespace: {}},
    			ConfigTypesUpdated: map[resource.GroupVersionKind]struct{}{collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(): {}},
    			EdsUpdates:         edsUpdates,
    			Reason:             []model.TriggerReason{model.EndpointUpdate},
    		})
    	}
    }
    

    edsUpdate方法里面实际上就是做了两件事,一是更新缓存,二是调用ConfigUpdate进行配置更新。

    ConfigUpdate资源更新实际上就是通过事件分发执行xDS分发,这块的细节我们稍后再讲。

    总结

    通过这篇我们掌握了服务发现是通过k8s的Informer来注册监听Service、EndPoint、nodes、pods等资源的更新事件,然后通过事件驱动模型执行回调函数,再调用xDS的ConfigUpdate来执行异步更新配置的操作。

    Reference

    https://www.servicemesher.com/blog/istio-analysis-4/

    https://www.cnblogs.com/163yun/p/8962278.html

    https://www.servicemesher.com/blog/envoy-proxy-config-deep-dive/

  • 相关阅读:
    (转载)什么时候需要用到try-catch
    直接打印Java的对象时输出的到底是什么
    关于图像语义分割的总结和感悟(转载)
    面经
    石家庄停车位在线预约平台16
    石家庄停车位在线预约平台15
    石家庄停车位在线预约平台14
    石家庄停车位在线预约平台13
    石家庄停车位在线预约平台12
    石家庄停车位在线预约平台11
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/14021107.html
Copyright © 2011-2022 走看看