zoukankan      html  css  js  c++  java
  • apiserver源码分析——启动流程

    前言

    apiserver是k8s控制面的一个组件,在众多组件中唯一一个对接etcd,对外暴露http服务的形式为k8s中各种资源提供增删改查等服务。它是RESTful风格,每个资源的URI都会形如
    /apis/{apiGroup}/{version}/namsspaces/{ns-name}/{resource-kind}/{resource-name}

    /apis/{apiGroup}/{version}/{resource-kind}/{resource-name}
    apiserver中包含3个server组件,apiserver依靠这3个组件来对不同类型的请求提供处理

    • APIExtensionServer: 主要负责处理CustomResourceDefination(CRD)方面的请求
    • KubeAPIServer: 主要负责处理k8s内置资源的请求,此外还会包括通用处理,认证、鉴权等
    • AggregratorServer: 主要负责aggregrate方面的处理,它充当一个代理服务器,将请求转发到聚合进来的k8s service中。

    启动流程

    本篇阅读源码版本1.19

    apiserver同样使用了corbra命令行框架处理启动命令,它从命令行的RunE回调函数来到了Run函数,开始执行启动流程。Run函数做3件事

    1. 启动apiserver的3个server组件的路由
    2. 注册健康检查,就绪探针,存活探针的地址
    3. 启动http服务

    代码位于 /cmd/kube-apiserver/app/server.go

    func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    
    	//注册三个server的路由
    	server, err := CreateServerChain(completeOptions, stopCh)
    	if err != nil {
    		return err
    	}
    
    	//注册健康检查,就绪,存活探针的地址
    	prepared, err := server.PrepareRun()
    	if err != nil {
    		return err
    	}
    
    	//运行http server
    	return prepared.Run(stopCh)
    }
    

    三个server的创建流程

    CreateServerChain函数的调用如下

    func CreateServerChain(...)(...){
    
    	kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    	if err != nil {
    		return nil, err
    	}
    
    	// If additional API servers are added, they should be gated.
    	apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
    		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    	if err != nil {
    		return nil, err
    	}
    	//创建APIExtensionsServer并注册路由
    	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    	if err != nil {
    		return nil, err
    	}
    
    	//创建KubeAPIServer并注册路由
    	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
    	if err != nil {
    		return nil, err
    	}
    
    	// aggregator comes last in the chain
    	aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    	if err != nil {
    		return nil, err
    	}
    	//创建aggregatorServer并注册路由
    	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    	if err != nil {
    		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
    		return nil, err
    	}
    }
    

    创建每个server都要有对应它的config。apiExtensionServer和aggregatorServer的Config需要依赖kubeAPIServerConfig,而这几个ServerConfig都需要依赖GenericConfig,CreateKubeAPIServerConfig创建kubeAPIServerConfig,而CreateKubeAPIServerConfig调用buildGenericConfig创建GenericConfig。

    func buildGenericConfig(
    	s *options.ServerRunOptions,
    	proxyTransport *http.Transport,
    )(...){
    	//创建一个genericConfig对象
    	genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    	//设置genericConfig的字段,代码不展示
    	//创建认证实例
    	if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
    		return
    	}
    	//创建鉴权实例
    	genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
    	//准入控制器
    	err = s.Admission.ApplyTo(
    		genericConfig,
    		versionedInformers,
    		kubeClientConfig,
    		feature.DefaultFeatureGate,
    		pluginInitializers...)
    
    }
    

    APIExtensionServer

    APIExtensionServer的创建流程大致包含以下几个步骤

    • 创建GeneriAPIServer
    • 实例化CustomResourceDefinitions
    • 实例化APIGroupInfo
    • InstallAPIGroup

    三种类型的Server底层都需要依赖GeneriAPIServer。第二步创建的CustomResourceDefinitions是本类型Server的对象,用于后续进行路由注册。APIGroupInfo是用于每个版本、每个资源类型对应的存储对象。最后调用InstallAPIGroup进行路由注册,把每一个资源的版本,类型映射到一个URI地址中。代码如下所示

    func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
    	return apiextensionsConfig.Complete().New(delegateAPIServer)
    }
    
    //代码位于 /vendor/k8s.io/apiextensions-apiserver/apiserver.go
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    	//创建Generic
    	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
    	//实例化 CustomResourceDefinitions
    	s := &CustomResourceDefinitions{
    		GenericAPIServer: genericServer,
    	}
    	//实例化APIGroupInfo
    	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    	if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
    		storage := map[string]rest.Storage{}
    		// customresourcedefinitions
    		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    		if err != nil {
    			return nil, err
    		}
    		storage["customresourcedefinitions"] = customResourceDefinitionStorage
    		storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
    
    		apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
    	}
    	//另一个版本的类似,不作展示
    	//InstallAPIGroup注册
    	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
    		return nil, err
    	}
    }
    

    KubeAPIServer

    KubeAPIServer处理k8s内置资源请求,它的创建流程与APIExtensionServer类似,包含下面几个步骤

    • 创建GeneriAPIServer
    • 实例化Instance
    • installLegacyAPI
    • installAPI

    其中Instance是KubeAPIServer的Server对象。KubeAPIServer创建和Install的APIGroup需要调用两个方法,一个是installLegacyAPI,另一个是installAPI,原因在于k8s的apiGroup分了/api和/apis两种。初期的资源其实没有apiGroup这个概念,而后期引入了groupVersion为了兼容原有的才把旧的资源类型的URI地址都归属于/api这个路径下的,新的全部在/apis这个路径下,因此在创建注册APIGroup时都分了两类。代码如下

    func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
    	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    	if err != nil {
    		return nil, err
    	}
    
    	return kubeAPIServer, nil
    }
    
    //代码位于 /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
    	//创建Generic
    	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    	//1.14版本的是Master,当前版本是Instance
    	m := &Instance{
    		GenericAPIServer:          s,
    		ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
    	}
    	//实例化核心API
    	if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
    		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
    			StorageFactory:              c.ExtraConfig.StorageFactory,
    			ProxyTransport:              c.ExtraConfig.ProxyTransport,
    			KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
    			EventTTL:                    c.ExtraConfig.EventTTL,
    			ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
    			SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
    			ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
    			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
    			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
    			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
    			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
    			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
    		}
    		if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
    			return nil, err
    		}
    	}
    
    	restStorageProviders := []RESTStorageProvider{...}
    	//InstallAPIs,内部包含InstallAPIGroup
    	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
    		return nil, err
    	}
    }
    
    注册核心apiGroups

    进入m.InstallLegacyAPI,这个方法包含了实例化ApiGroupInfo和InstalAPIGroup两个操作,这部分资源是k8s的核心资源

    func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
    	//实例化ApiGroupInfo
    	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    	if err != nil {
    		return fmt.Errorf("error building core storage: %v", err)
    	}
    
    	controllerName := "bootstrap-controller"
    	coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    	bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    	m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    	m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
    
    	//相当于调用InstallAPIGroup
    	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
    		return fmt.Errorf("error in registering group versions: %v", err)
    	}
    	return nil
    }
    

    实例化APIGroupInfo的代码局部如下,代码篇幅较长,只摘取pod一部分的源码展示,代码位于/pkg/registry/core/rest/storage_core.go

    func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
    	apiGroupInfo := genericapiserver.APIGroupInfo{
    		PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
    		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
    		Scheme:                       legacyscheme.Scheme,
    		ParameterCodec:               legacyscheme.ParameterCodec,
    		NegotiatedSerializer:         legacyscheme.Codecs,
    	}
    	podStorage, err := podstore.NewStorage(
    		restOptionsGetter,
    		nodeStorage.KubeletConnectionInfo,
    		c.ProxyTransport,
    		podDisruptionClient,
    	)
    	restStorageMap := map[string]rest.Storage{
    		"pods":             podStorage.Pod,
    		"pods/attach":      podStorage.Attach,
    		"pods/status":      podStorage.Status,
    		"pods/log":         podStorage.Log,
    		"pods/exec":        podStorage.Exec,
    		"pods/portforward": podStorage.PortForward,
    		"pods/proxy":       podStorage.Proxy,
    		"pods/binding":     podStorage.Binding,
    		"bindings":         podStorage.LegacyBinding,
    		.....
    	}
    }
    

    m.GenericAPIServer.InstallLegacyAPIGroup的第一个参数是apiPrefix,值是/api;第二个参数是上面创建好的,包含资源存储方式的apiGroupInfo
    与InstallAPIGroup类似地,InstallLegacyAPIGroup需要经过两层调用才会到达InstallREST,调用链如下

    m.GenericAPIServer.InstallLegacyAPIGroup
    |--s.installAPIResources
       |--apiGroupVersion.InstallREST
    

    InstallREST的入参是restful.Container,他是golang http框架go-restful里面的一个重要对象,在InstallREST里面构造出installer,installer包含资源的存储方法和资源对应api的前缀,利用installer.Install()来创建出go-restful的webservice,webservice加入到传入得container,即完成api的注册。
    代码位于/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go

    func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    	installer := &APIInstaller{
    		group:             g,
    		prefix:            prefix,
    		minRequestTimeout: g.MinRequestTimeout,
    	}
    
    	apiResources, ws, registrationErrors := installer.Install()
    	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
    	versionDiscoveryHandler.AddToWebService(ws)
    	container.Add(ws)
    	return utilerrors.NewAggregate(registrationErrors)
    }
    

    installer.Install()创建了webservice,api中各个URL的路由注册,handler的绑定也会在里面实现。由于这部分代码也涉及到apiserver如何响应处理一个http请求,本篇先不探讨

    go-restful框架

    不过上面提及到go-restful框架的几个概念,在这里进行一个简单的科普

    • container:在http的角度就是一个Server,里面就包含若干个webservice
    • webservice:webservice从结构来说是承上启下的一个角色,它包含了一组route,而且这组route都会有一个共同的basePath或者说他们的URL的prefix是相同的
    • route:route对应具体的一个URL,它需要指定具体的路径Path,请求方法Method和处理函数Handler,以及一些参数Parameter等等。

    他们的层次结构如下

    container
    |--webservice
       |--Route
    

    AggregratorServer

    用于处理聚合进来的api请求,实际是做七层转发,它的创建流程与APIExtensionServer的最为相似

    • 创建GeneriAPIServer
    • 实例化Aggregrator
    • 实例化APIGroupInfo
    • InstallAPIGroup

    实际创建AggregratorServer的代码位于/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go

    func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
    	//创建GeneriAPIServer
    	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
    	//实例化Aggregrator
    	s := &APIAggregator{...}
    	//实例化APIGroupInfo
    	apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
    	//InstallAPIGroup
    	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
    		return nil, err
    	}
    }
    

    运行http server

    api的路由绑定完毕,最后就是要把http server跑起来,prepared.Run调用的是由preparedGenericAPIServer实现的Run方法,经过多层调用最终把server跑起来,调用链如下

    prepared.Run		/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
    |--s.runnable.Run(stopCh)	
    |==preparedGenericAPIServer.Run	/vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
       |--s.NonBlockingRun
          |--s.SecureServingInfo.Serve	/vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
             |--&http.Server{}
             |--RunServer
    

    http server的对象是在SecureServingInfo.Serve创建的,即调用链的s.SecureServingInfo.Serve,最终让server开始监听是在RunServer处。它接收了http.Server作为参数。至此apiserver运行起来,接收来自各个组件或客户端的请求。

    小结

    本篇讲述了k8s-apiserver的启动流程,介绍了apiserver包含了3个server组件,apiserver的服务实际上由这三个组件提供,讲述了他们创建流程,实例化底层的GenericServer,实例化各自的Server类,实例化ApiGroupInfo来建立资源与存储操作间的映射关系,最后InstallAPI。还专门挑了k8s核心资源类型的ApiGroup注册过程介绍。整个启动过程的调用链如下

    Run		/cmd/kube-apiserver/app/server.go
    |--CreateServerChain
    |  |--CreateKubeAPIServerConfig
    |  |  |--buildGenericConfig
    |  |     |--genericapiserver.NewConfig		
    |  |     |--s.Authentication.ApplyTo
    |  |     |--BuildAuthorizer
    |  |     |--s.Admission.ApplyTo
    |  |--createAPIExtensionsConfig
    |  |--createAPIExtensionsServer
    |  |  |--apiextensionsConfig.Complete().New	/vendor/k8s.io/apiextensions-apiserver/apiserver.go
    |  |     |--c.GenericConfig.New
    |  |     |--&CustomResourceDefinitions{}
    |  |     |--genericapiserver.NewDefaultAPIGroupInfo
    |  |     |--s.GenericAPIServer.InstallAPIGroup
    |  |--CreateKubeAPIServer
    |  |  |--kubeAPIServerConfig.Complete().New	/pkg/controlplane/instance.go
    |  |  |  |--c.GenericConfig.New
    |  |  |  |--&Instance{}
    |  |  |  |--m.InstallLegacyAPI					/pkg/controlplane/instance.go
    |  |  |  |  |--legacyRESTStorageProvider.NewLegacyRESTStorage	/pkg/registry/core/rest/storage_core.go
    |  |  |  |  |--m.GenericAPIServer.InstallLegacyAPIGroup  ##相当于新版本的InstallAPIGroup
    |  |  |  |  |  |--s.installAPIResources
    |  |  |  |  |  |  |--apiGroupVersion.InstallREST
    |  |  |  |--m.InstallAPIs
    |  |--createAggregatorConfig
    |  |--createAggregatorServer
    |     |--aggregatorConfig.Complete().NewWithDelegate	/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
    |        |--c.GenericConfig.New				
    |        |--&APIAggregator{}
    |        |--apiservicerest.NewRESTStorage		
    |        |--s.GenericAPIServer.InstallAPIGroup
    |--server.PrepareRun			/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
    |  |--s.GenericAPIServer.PrepareRun	/vendor/k8s.io/kube-aggregrator/pkg/server/genericapiserver.go
    |     |--s.installHealthz()		
    |     |--s.installLivez()
    |     |--s.installReadyz()
    |--prepared.Run		/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
       |--s.runnable.Run(stopCh)	
       |==preparedGenericAPIServer.Run	/vendor/k8s.io/apiserver/pkg/server/genericapiserver.go
          |--s.NonBlockingRun
             |--s.SecureServingInfo.Serve	/vendor/k8s.io/kube-aggregrator/pkg/server/secure_serving.go
                |--&http.Server{}
                |--RunServer
    

    如有兴趣,可阅读鄙人“k8s源码之旅”系列的其他文章
    kubelet源码分析——kubelet简介与启动
    kubelet源码分析——启动Pod
    kubelet源码分析——关闭Pod
    kubelet源码分析——监控Pod变更
    scheduler源码分析——调度流程
    apiserver源码分析——启动流程
    apiserver源码分析——处理请求

  • 相关阅读:
    PHP编程基础学习(一)——数据类型
    6-6 带头结点的链式表操作集(20 分)
    6-5 链式表操作集(20 分)
    6-4 链式表的按序号查找(10 分)
    6-3 求链式表的表长(10 分)
    6-2 顺序表操作集(20 分)
    6-1 单链表逆转(20 分)
    学生成绩管理系统(六):项目总结
    学生成绩管理系统(五):系统的完善与数据库的链接
    学生成绩管理系统(四)
  • 原文地址:https://www.cnblogs.com/HopeGi/p/15366532.html
Copyright © 2011-2022 走看看