zoukankan      html  css  js  c++  java
  • kubernetes源码阅读笔记——API Server(之一)

    API Server是Kubernetes的核心组件之一,其作用是通过RESTFUL的方式,向所有客户端提供一个集群内资源的统一的增改删查的接口,并将资源的状态存储在etcd中。

    API Server入口函数的位置在cmd/kube-apiserver/apiserver.go中,也是通过cobra注册了kube-apiserver的命令。

    cmd/kube-apiserver/apiserver.go
    
    func main() {
        rand.Seed(time.Now().UnixNano())
    
        command := app.NewAPIServerCommand(server.SetupSignalHandler())
    
        // TODO: once we switch everything over to Cobra commands, we can go back to calling
        // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
        // normalize func and add the go flag set by hand.
        // utilflag.InitFlags()
        logs.InitLogs()
        defer logs.FlushLogs()
    
        if err := command.Execute(); err != nil {
            fmt.Fprintf(os.Stderr, "error: %v
    ", err)
            os.Exit(1)
        }
    }

    在NewAPIServerCommand方法里注册了kube-apiserver命令,其核心仍然是Run命令。

    进入cmd/kube-apiserver/app/server.go的Run方法:

    cmd/kube-apiserver/app/server.go
    
    func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    	// To help debugging, immediately log version
    	klog.Infof("Version: %+v", version.Get())
    
    	server, err := CreateServerChain(completeOptions, stopCh)
    	if err != nil {
    		return err
    	}
    
    	return server.PrepareRun().Run(stopCh)
    }

    方法很简单,主要包含两条语句。第一句创建了API Server,第二句运行这个server。Run方法后面再研究,首先重点看创建。

    一、CreateServerChain

    进入CreateServerChain:

    cmd/kube-apiserver/app/server.go
    
    func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
        nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
        if err != nil {
            return nil, err
        }
    
        kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
        if err != nil {
            return nil, err
        }
    
        // If additional API servers are added, they should be gated.
        apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
            serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
        if err != nil {
            return nil, err
        }
        apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
        if err != nil {
            return nil, err
        }
    
        kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
        if err != nil {
            return nil, err
        }
    
        // otherwise go down the normal path of standing the aggregator up in front of the API server
        // this wires up openapi
        kubeAPIServer.GenericAPIServer.PrepareRun()
    
        // This will wire up openapi for extension api server
        apiExtensionsServer.GenericAPIServer.PrepareRun()
    
        // aggregator comes last in the chain
        aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
        if err != nil {
            return nil, err
        }
        aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
        if err != nil {
            // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
            return nil, err
        }
    
        if insecureServingInfo != nil {
            insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
            if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
                return nil, err
            }
        }
    
        return aggregatorServer.GenericAPIServer, nil
    }

    主要进行了以下几件事:

    (1)调用CreateNodeDialer,创建与节点交互的工具。

    (2)配置API Server的Config。这里同时还配置了Extension API Server的Config,用于配置用户自己编写的API Server。

    (3)根据Config,创建API Server和Extension API Server。

    (4)运行API Server。通过调用PrepareRun方法实现。

    (5)创建并运行aggregator(将API Server和Extension API Server整合在一起,暂时不提)。

    第三步通过调用CreateKubeAPIServer实现,下面详细分析一下CreateKubeAPIServer。

    cmd/kube-apiserver/app/server.go

    func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook) return kubeAPIServer, nil }

    首先调用kubeAPIServerConfig.Complete().New方法生成一个kubeAPIServer实例,之后为这个实例添加启动后执行的钩子函数。可以看到,这个API Server将前面创建的Extension API Server作为了代理Server。

    New方法位于pkg/master/master.go中。进入New方法:

    pkg/master/master.go
    
    func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
        
            ...
    
        s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
        if err != nil {
            return nil, err
        }
    
        ...
    
        m := &Master{
            GenericAPIServer: s,
        }
    
        // install legacy rest storage
        if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
            legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
                StorageFactory:              c.ExtraConfig.StorageFactory,
                ProxyTransport:              c.ExtraConfig.ProxyTransport,
                KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
                EventTTL:                    c.ExtraConfig.EventTTL,
                ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
                ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
                LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
                ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
                ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
                APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
            }
            m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
        }
    
        // The order here is preserved in discovery.
        // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
        // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
        // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
        // with specific priorities.
        // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
        // handlers that we have.
        restStorageProviders := []RESTStorageProvider{
            auditregistrationrest.RESTStorageProvider{},
            authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
            authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
            autoscalingrest.RESTStorageProvider{},
            batchrest.RESTStorageProvider{},
            certificatesrest.RESTStorageProvider{},
            coordinationrest.RESTStorageProvider{},
            extensionsrest.RESTStorageProvider{},
            networkingrest.RESTStorageProvider{},
            policyrest.RESTStorageProvider{},
            rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
            schedulingrest.RESTStorageProvider{},
            settingsrest.RESTStorageProvider{},
            storagerest.RESTStorageProvider{},
            // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
            // See https://github.com/kubernetes/kubernetes/issues/42392
            appsrest.RESTStorageProvider{},
            admissionregistrationrest.RESTStorageProvider{},
            eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
        }
        m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
    
        if c.ExtraConfig.Tunneler != nil {
            m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
        }
    
        m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
    
        return m, nil
    }

    首先,通过GenericConfig.New,基于前面配置的Config创建一个API Server对象(并为这个对象添加默认的path,如/versions、/metrics等),再基于这个API Server对象创建一个master对象。

    其次,为这个master创建RestStorageProvider,并注册API。这通过调用InstallLegacyAPI和InstallAPIs方法实现。前者用于注册k8s前期的核心API(在/api路径下),后者用于注册k8s新增的API(在/apis路径下)。

    最后,为master添加钩子函数,并返回。

    下面对New、InstallLegacyAPI方法进行分析。InstallAPIs方法与InstallLegacyAPI的调用逻辑几乎一致。

    二、New

    进入GenericConfig.New方法,这个方法创建了一个GenericAPIServer:

    k8s.io/aposerver/pkg/server/config.go
    
    // New creates a new server which logically combines the handling chain with the passed server.
    // name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating.
    // delegationTarget may not be nil.
    func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
        ...
    
        handlerChainBuilder := func(handler http.Handler) http.Handler {
            return c.BuildHandlerChainFunc(handler, c.Config)
        }
        apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
    
        s := &GenericAPIServer{
            discoveryAddresses:     c.DiscoveryAddresses,
            LoopbackClientConfig:   c.LoopbackClientConfig,
            legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
            admissionControl:       c.AdmissionControl,
            Serializer:             c.Serializer,
            AuditBackend:           c.AuditBackend,
            Authorizer:             c.Authorization.Authorizer,
            delegationTarget:       delegationTarget,
            HandlerChainWaitGroup:  c.HandlerChainWaitGroup,
    
            minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
            ShutdownTimeout:   c.RequestTimeout,
    
            SecureServingInfo: c.SecureServing,
            ExternalAddress:   c.ExternalAddress,
    
            Handler: apiServerHandler,
    
            listedPathProvider: apiServerHandler,
    
            openAPIConfig: c.OpenAPIConfig,
    
            postStartHooks:         map[string]postStartHookEntry{},
            preShutdownHooks:       map[string]preShutdownHookEntry{},
            disabledPostStartHooks: c.DisabledPostStartHooks,
    
            healthzChecks: c.HealthzChecks,
    
            DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
    
            enableAPIResponseCompression: c.EnableAPIResponseCompression,
            maxRequestBodyBytes:          c.MaxRequestBodyBytes,
        }
    
        ...
    
        for k, v := range delegationTarget.PostStartHooks() {
            s.postStartHooks[k] = v
        }
    
        for k, v := range delegationTarget.PreShutdownHooks() {
            s.preShutdownHooks[k] = v
        }
    
        genericApiServerHookName := "generic-apiserver-start-informers"
        if c.SharedInformerFactory != nil && !s.isPostStartHookRegistered(genericApiServerHookName) {
            err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
                c.SharedInformerFactory.Start(context.StopCh)
                return nil
            })
            if err != nil {
                return nil, err
            }
        }
    
        ...
    
        s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}
    
        installAPI(s, c.Config)
    
        ...
    
        return s, nil
    }

    这一方法的核心是NewAPIServerHandler方法,此方法初始化了一个Container。这里的Container并非k8s中的容器,而是go-restful的一个概念,可参考https://www.cnblogs.com/ldaniel/p/5868384.html。大体说来,Container是一组WebService的集合,可以监听在不同的端口上,而WebService又是一组Route的集合,为这些Route创建统一的root path等。这个go-restful项目来源于https://github.com/emicklei/go-restful

    进入NewAPIServerHandler方法:

    k8s.io/aposerver/pkg/server/handler.go
    
    func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
        nonGoRestfulMux := mux.NewPathRecorderMux(name)
        if notFoundHandler != nil {
            nonGoRestfulMux.NotFoundHandler(notFoundHandler)
        }
    
        gorestfulContainer := restful.NewContainer()
        gorestfulContainer.ServeMux = http.NewServeMux()
        gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
        gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
            logStackOnRecover(s, panicReason, httpWriter)
        })
        gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
            serviceErrorHandler(s, serviceErr, request, response)
        })
    
        director := director{
            name:               name,
            goRestfulContainer: gorestfulContainer,
            nonGoRestfulMux:    nonGoRestfulMux,
        }
    
        return &APIServerHandler{
            FullHandlerChain:   handlerChainBuilder(director),
            GoRestfulContainer: gorestfulContainer,
            NonGoRestfulMux:    nonGoRestfulMux,
            Director:           director,
        }
    }

    可以看到,方法调用NewContainer方法初始化了一个gorestfulContainer,还调用了Router方法注册了路由类型为CurlyRouter。这些都与官方https://github.com/emicklei/go-restful/blob/master/examples/restful-curly-router.go上的操作相近。此外,方法还添加了路由选择器Mux,以及RecoverHandler和ServiceErrorHandler两个回调函数。

    回到New方法,在New方法中通过调用installAPI方法,为API Server预先注册了一些默认path。

    三、InstallLegacyAPI

    创建了API Server实例,初始化了Container,接下来就是最关键的注册路由的部分了。

    回到master.go,进入InstallLegacyAPI方法:

    pkg/master/master.go

    func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) { legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) if err != nil { klog.Fatalf("Error building core storage: %v", err) } controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { klog.Fatalf("Error in registering group versions: %v", err) } }

    包括以下几步:

    (1)调用NewLegacyRESTStorage方法,生成RESTStorage和APIGroupInfo。

    进入NewLegacyRESTStorage方法:

    pkg/registry/core/rest/storage_core.go
    
    func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
        apiGroupInfo := genericapiserver.APIGroupInfo{
            PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
            VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
            Scheme:                       legacyscheme.Scheme,
            ParameterCodec:               legacyscheme.ParameterCodec,
            NegotiatedSerializer:         legacyscheme.Codecs,
        }
    
        var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
        if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.IsVersionRegistered(policyGroupVersion) {
            var err error
            podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
            if err != nil {
                return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
            }
        }
        restStorage := LegacyRESTStorage{}
    
        podTemplateStorage := podtemplatestore.NewREST(restOptionsGetter)
    
        eventStorage := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
        limitRangeStorage := limitrangestore.NewREST(restOptionsGetter)
    
        resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotastore.NewREST(restOptionsGetter)
        secretStorage := secretstore.NewREST(restOptionsGetter)
        persistentVolumeStorage, persistentVolumeStatusStorage := pvstore.NewREST(restOptionsGetter)
        persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcstore.NewREST(restOptionsGetter)
        configMapStorage := configmapstore.NewREST(restOptionsGetter)
    
        namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespacestore.NewREST(restOptionsGetter)
    
        endpointsStorage := endpointsstore.NewREST(restOptionsGetter)
    
        nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    
        podStorage := podstore.NewStorage(
            restOptionsGetter,
            nodeStorage.KubeletConnectionInfo,
            c.ProxyTransport,
            podDisruptionClient,
        )
    
        ...
    
        restStorageMap := map[string]rest.Storage{
            "pods":             podStorage.Pod,
            "pods/attach":      podStorage.Attach,
            "pods/status":      podStorage.Status,
            "pods/log":         podStorage.Log,
            "pods/exec":        podStorage.Exec,
            "pods/portforward": podStorage.PortForward,
            "pods/proxy":       podStorage.Proxy,
            "pods/binding":     podStorage.Binding,
            "bindings":         podStorage.Binding,
    
            "podTemplates": podTemplateStorage,
    
            "replicationControllers":        controllerStorage.Controller,
            "replicationControllers/status": controllerStorage.Status,
    
            "services":        serviceRest,
            "services/proxy":  serviceRestProxy,
            "services/status": serviceStatusStorage,
    
            "endpoints": endpointsStorage,
    
            "nodes":        nodeStorage.Node,
            "nodes/status": nodeStorage.Status,
            "nodes/proxy":  nodeStorage.Proxy,
    
            "events": eventStorage,
    
            "limitRanges":                   limitRangeStorage,
            "resourceQuotas":                resourceQuotaStorage,
            "resourceQuotas/status":         resourceQuotaStatusStorage,
            "namespaces":                    namespaceStorage,
            "namespaces/status":             namespaceStatusStorage,
            "namespaces/finalize":           namespaceFinalizeStorage,
            "secrets":                       secretStorage,
            "serviceAccounts":               serviceAccountStorage,
            "persistentVolumes":             persistentVolumeStorage,
            "persistentVolumes/status":      persistentVolumeStatusStorage,
            "persistentVolumeClaims":        persistentVolumeClaimStorage,
            "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
            "configMaps":                    configMapStorage,
    
            "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
        }
        if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
            restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
        }
        if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
            restStorageMap["pods/eviction"] = podStorage.Eviction
        }
        if serviceAccountStorage.Token != nil {
            restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
        }
        apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
    
        return restStorage, apiGroupInfo, nil
    }

    我们看到,这个方法创建了大量的Storage,包括pod、node、pv等。这些Storage是APIServer与etcd对接时采用的存储结构。从包名也可以看出,这里注册的都是core组下的API。

    创建后,将这些Storage都添加到restStorageMap中,作为api和Storage的对应关系。这些还不是完整的api路径,还需要在前面加上诸如core/v1之类的前缀。

    最后,将这个restStorageMap放进apiGroupInfo中,并返回。

    (2)定义钩子函数。

    (3)调用InstallLegacyAPIGroup方法,为API Server注册“v1”路由。

    InstallLegacyAPIGroup方法为前面创建的apiGroupInfo添加处理器。这一部分很重要,下一篇详细分析。https://www.cnblogs.com/00986014w/p/10348489.html

  • 相关阅读:
    mybatis-Generator 代码自动生成报错 Result Maps collection already contains value for BaseResultMap
    Linux 常用的命令
    Linux 查找文件
    eclipse tasks
    Myeclipse 文件注释和解注释
    proxool连接池 异常
    EL 表达式 函数 操作 字符串
    Myeclipse 选中高亮
    Spring 定时作业
    linux配置java环境变量(详细)
  • 原文地址:https://www.cnblogs.com/00986014w/p/10333795.html
Copyright © 2011-2022 走看看