zoukankan      html  css  js  c++  java
  • apiserver源码分析——处理请求

    前言

    上一篇说道k8s-apiserver如何启动,本篇则介绍apiserver启动后,接收到客户端请求的处理流程。如下图所示
    avatar
    认证与授权一般系统都会使用到,认证是鉴别访问apiserver的请求方是谁,一般情况下服务端是需要知晓客户端是谁方可接受请求,除了允许匿名访问这种场景,同时认证也为后续的授权提供基础。授权是为了判断当前请求的客户端是否具备请求当前资源的权限,具备则放行让其继续往后走,否则拒绝本次请求。准入控制器为请求处理流程提供了一个扩展的口,它提供了两个回调的钩子,能让用户在资源持久化前再额外对资源的值作改动或者验证,如果验证出错同样可以终止整个处理流程。最后对资源的变更会持久化到Etcd。

    本篇以创建pod为例,探索apiserver如何处理。

    Authentication

    请求到达apiserver后第一个是需要进行认证,辨别请求来源的身份。认证方式的配置在上一篇讲述构建genericConfig的时候有提及,在执行buildGenericConfig函数时调用s.Authentication.ApplyTo配置

    代码位于/pkg/kubeapiserver/options/authentication.go

    func (o *BuiltInAuthenticationOptions) ApplyTo(authInfo *genericapiserver.AuthenticationInfo,.....) error {
    	//创建出authenticatorConfig
    	authenticatorConfig, err := o.ToAuthenticationConfig()
    	//对authenticatorConfig字段进行设置
    	...
    	//创建出Authenticator
    	authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
    }
    

    ApplyTo先创建出认证相关配置authenticatorConfig,然后初始化部分认证方式的Provider,最终调用authenticatorConfig.New方法将按照认证的配置信息构造出一个Authenticator,传递给authInfo.Authenticator

    Authenticator.New方法如下所示,定义了两个数组用于存放启用的authenticators和token类的authenticators,根据Config的配置信息按需启用认证方式,再将token类的authenticators转换成普通的authenticators。最终将这个authenticator传递给一个Wrapper类型UnionAuthenticator返回

    代码位于/pkg/kubeapiserver/authenticator/config.go

    func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
    	var authenticators []authenticator.Request
    	var tokenAuthenticators []authenticator.Token
    	//各种认证方式的初始化操作
    	...
    
    	if len(tokenAuthenticators) > 0 {
    		// Union the token authenticators
    		tokenAuth := tokenunion.New(tokenAuthenticators...)
    		// Optionally cache authentication results
    		if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
    			tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
    		}
    		authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
    		securityDefinitions["BearerToken"] = &spec.SecurityScheme{
    			SecuritySchemeProps: spec.SecuritySchemeProps{
    				Type:        "apiKey",
    				Name:        "authorization",
    				In:          "header",
    				Description: "Bearer Token authentication",
    			},
    		}
    	}
    
    	if len(authenticators) == 0 {
    		if config.Anonymous {
    			return anonymous.NewAuthenticator(), &securityDefinitions, nil
    		}
    		return nil, &securityDefinitions, nil
    	}
    
    	authenticator := union.New(authenticators...)
    
    	authenticator = group.NewAuthenticatedGroupAdder(authenticator)
    
    }
    

    在这里简单列举一下上述提到的多种认证类型,包括9种,分别是:BasicAuth,TokenAuth,BootstrapToken,OIDC,RequesHeader,WebhookTokenAuth,Anonymous,ClientCA,ServiceAccountAuth。鄙人为了方便记忆分别将他们归为3类

    • token类:TokenAuth,BootstrapToken,WebhookTokenAuth,OIDC
    • 证书类:ClientCA,ServiceAccountAuth
    • 其他类:BasicAuth,RequesHeader,Anonymous

    由于篇幅原因各种认证类型的特点则不展开介绍

    特别地提及一下,pod里面访问apiserver一般用的是ServiceAccountAuth;在进行apiserver-aggregrate双向认证的时候会用到clientCA;往k8s添加新节点时kubelet会用到BootstrapToken

    认证在请求过程是一个HandlerChain串起来的,每个handler函数的构建时都会里层的handler函数,待本层handler处理完毕后才会执行里层的handler,这样一层层执行最后才执行到真正的请求响应逻辑,如Pod创建

    回归到上篇介绍的buildGenericConfig函数,一开始调用了 genericapiserver.NewConfig,NewConfig创建Config结构时给BuildHandlerChainFunc字段传入DefaultBuildHandlerChain这个函数

    代码位于 /vendor/k8s.io/apiserver/pkg/server/config.go

    func NewConfig(codecs serializer.CodecFactory) *Config {
    	return &Config{
    		Serializer:                  codecs,
    		BuildHandlerChainFunc:       DefaultBuildHandlerChain,
    		...
    	}
    }
    
    func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    	handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
    	if c.FlowControl != nil {
    		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
    	} else {
    		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    	}
    	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
    	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
    	failedHandler := genericapifilters.Unauthorized(c.Serializer)
    	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
    	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
    	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
    	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
    	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
    	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
    		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
    	}
    	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
    	handler = genericapifilters.WithWarningRecorder(handler)
    	handler = genericapifilters.WithCacheControl(handler)
    	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
    	handler = genericfilters.WithPanicRecovery(handler)
    	return handler
    }
    

    DefaultBuildHandlerChain函数就是上面构建HandlerChain串的地方,查看genericapifilters.WithAuthentication定义,代码位于/vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go

    func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences) http.Handler {
    	if auth == nil {
    		klog.Warningf("Authentication is disabled")
    		return handler
    	}
    	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    		authenticationStart := time.Now()
    
    		if len(apiAuds) > 0 {
    			req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
    		}
    		resp, ok, err := auth.AuthenticateRequest(req)
    		defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart)
    		if err != nil || !ok {
    			if err != nil {
    				klog.Errorf("Unable to authenticate the request due to an error: %v", err)
    			}
    			failed.ServeHTTP(w, req)
    			return
    		}
    
    		if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
    			err = fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
    			klog.Error(err)
    			failed.ServeHTTP(w, req)
    			return
    		}
    
    		// authorization header is not required anymore in case of a successful authentication.
    		req.Header.Del("Authorization")
    
    		req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
    		handler.ServeHTTP(w, req)
    	})
    }
    

    代码中auth.AuthenticateRequest就是执行认证逻辑的地方,如果认证失败则会返回返回失败。认证成功会把请求头中Authorization去掉,再调用里层的handler函数handler.ServeHTTP(w, req)

    func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
    	var errlist []error
    	for _, currAuthRequestHandler := range authHandler.Handlers {
    		resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
    		if err != nil {
    			if authHandler.FailOnError {
    				return resp, ok, err
    			}
    			errlist = append(errlist, err)
    			continue
    		}
    
    		if ok {
    			return resp, ok, err
    		}
    	}
    
    	return nil, false, utilerrors.NewAggregate(errlist)
    }
    

    它就是遍历了所有启用的认证方式,只有一个成功了就可以了。

    Authorization

    与认证的类似,授权方式的配置也是在buildGenericConfig函数中,调用BuildAuthorizer函数创建,返回时将authorizer.Authorizer赋予给genericConfig.Authorization.Authorizer

    buildGenericConfig最终调用authorizationConfig.New完成Authorizer的创建,代码位于 /pkg/kubeapiserver/authorizer/config.go

    func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
    	if len(config.AuthorizationModes) == 0 {
    		return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
    	}
    
    	var (
    		authorizers   []authorizer.Authorizer
    		ruleResolvers []authorizer.RuleResolver
    	)
    
    	for _, authorizationMode := range config.AuthorizationModes {
    		// Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go.
    		switch authorizationMode {
    		case modes.ModeNode:
    			graph := node.NewGraph()
    			node.AddGraphEventHandlers(
    				graph,
    				config.VersionedInformerFactory.Core().V1().Nodes(),
    				config.VersionedInformerFactory.Core().V1().Pods(),
    				config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
    				config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
    			)
    			nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
    			authorizers = append(authorizers, nodeAuthorizer)
    			ruleResolvers = append(ruleResolvers, nodeAuthorizer)
    
    		case modes.ModeAlwaysAllow:
    			alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
    			authorizers = append(authorizers, alwaysAllowAuthorizer)
    			ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
    		case modes.ModeAlwaysDeny:
    			alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
    			authorizers = append(authorizers, alwaysDenyAuthorizer)
    			ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
    		case modes.ModeABAC:
    			abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
    			if err != nil {
    				return nil, nil, err
    			}
    			authorizers = append(authorizers, abacAuthorizer)
    			ruleResolvers = append(ruleResolvers, abacAuthorizer)
    		case modes.ModeWebhook:
    			webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
    				config.WebhookVersion,
    				config.WebhookCacheAuthorizedTTL,
    				config.WebhookCacheUnauthorizedTTL,
    				config.CustomDial)
    			if err != nil {
    				return nil, nil, err
    			}
    			authorizers = append(authorizers, webhookAuthorizer)
    			ruleResolvers = append(ruleResolvers, webhookAuthorizer)
    		case modes.ModeRBAC:
    			rbacAuthorizer := rbac.New(
    				&rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
    				&rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
    				&rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
    				&rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
    			)
    			authorizers = append(authorizers, rbacAuthorizer)
    			ruleResolvers = append(ruleResolvers, rbacAuthorizer)
    		default:
    			return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
    		}
    	}
    
    	return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
    }
    

    函数一开始也是创建了一个authorizers的数组,用于存放启用的授权方式。遍历config.AuthorizationModes,对对应的授权方式进行实例化。最后调用union.New(authorizers...),以一个unionAuthzHandler作为支持的所有授权方式的wrapper返回回去。

    授权方式有6种,分别是AlwaysAllow,AlwaysDeny,RBAC,ABAC,Node,Webhook。其中最常用的就是RBAC,k8s里面给sa绑定role和clusterrole进行授权的就是这个RBAC。

    授权逻辑跟前文介绍认证一样通过HandlerChain串起来,同样在DefaultBuildHandlerChain函数中被加到HandlerChain中,调用了genericapifilters.WithAuthorization函数,代码位于/vendor/k8s.io/apiserver/pkg/endpoints/filters/authorization.go

    func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
    	if a == nil {
    		klog.Warningf("Authorization is disabled")
    		return handler
    	}
    	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    		ctx := req.Context()
    		ae := request.AuditEventFrom(ctx)
    
    		attributes, err := GetAuthorizerAttributes(ctx)
    		if err != nil {
    			responsewriters.InternalError(w, req, err)
    			return
    		}
    		authorized, reason, err := a.Authorize(ctx, attributes)
    		// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
    		if authorized == authorizer.DecisionAllow {
    			audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
    			audit.LogAnnotation(ae, reasonAnnotationKey, reason)
    			handler.ServeHTTP(w, req)
    			return
    		}
    		if err != nil {
    			audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
    			responsewriters.InternalError(w, req, err)
    			return
    		}
    
    		klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
    		audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
    		audit.LogAnnotation(ae, reasonAnnotationKey, reason)
    		responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
    	})
    }
    

    处理函数中,先调用GetAuthorizerAttributes获取认证后得到的user信息以及请求资源的相关信息requestInfo,统一放到attributes,再调用授权的方法 a.Authorize。同样它也是一个接口,它与认证时类似,先调用一个unionAuthzHandler的wrapper,在这个wrapper里遍历各个启用的authorizer。只要里面有一个allow或deny的结果就立马返回,代码位于/vendor/k8s.io/apiserver/pkg/authorization/union/union.go

    func (authzHandler unionAuthzHandler) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
    	var (
    		errlist    []error
    		reasonlist []string
    	)
    
    	for _, currAuthzHandler := range authzHandler {
    		decision, reason, err := currAuthzHandler.Authorize(ctx, a)
    
    		if err != nil {
    			errlist = append(errlist, err)
    		}
    		if len(reason) != 0 {
    			reasonlist = append(reasonlist, reason)
    		}
    		switch decision {
    		case authorizer.DecisionAllow, authorizer.DecisionDeny:
    			return decision, reason, err
    		case authorizer.DecisionNoOpinion:
    			// continue to the next authorizer
    		}
    	}
    
    	return authorizer.DecisionNoOpinion, strings.Join(reasonlist, "
    "), utilerrors.NewAggregate(errlist)
    }
    

    AdmissionWebhook

    AdmissionWebhook是准入控制器,它作为k8s-apiserver对外暴露的一种扩展方式,主要针对增删改资源时对暴露两个hook点。一个是Mutate,可修改提交上来的资源;另一个是Validate,是对提交上来的资源进行验证。当然Mutate里面也可以包含验证操作。但是本篇不对这两种准入控制器的使用实例作介绍。

    准入控制器的配置在buildGenericConfig函数中,通过调用s.Admission.ApplyTo方法进行配置。经过两层调用后到达AdmissionOptions.ApplyTo执行实际的创建逻辑,即: s.Admission.ApplyTo->a.GenericAdmission.ApplyTo。代码位于 /vendor/k8s.io/apiserver/pkg/server/options/admission.go

    func (a *AdmissionOptions) ApplyTo(
    	c *server.Config,
    	informers informers.SharedInformerFactory,
    	kubeAPIServerClientConfig *rest.Config,
    	features featuregate.FeatureGate,
    	pluginInitializers ...admission.PluginInitializer,
    ) error {
    	if a == nil {
    		return nil
    	}
    
    	// Admission depends on CoreAPI to set SharedInformerFactory and ClientConfig.
    	if informers == nil {
    		return fmt.Errorf("admission depends on a Kubernetes core API shared informer, it cannot be nil")
    	}
    
    	pluginNames := a.enabledPluginNames()
    	//获取各个准入控制器的provider
    	pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
    	if err != nil {
    		return fmt.Errorf("failed to read plugin config: %v", err)
    	}
    
    	clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
    	if err != nil {
    		return err
    	}
    	genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
    	initializersChain := admission.PluginInitializers{}
    	pluginInitializers = append(pluginInitializers, genericInitializer)
    	initializersChain = append(initializersChain, pluginInitializers...)
    	//将准入控制器集合串成一个admissionChain,再外面包一个Wrapper,类似于之前处理认证与授权一样的方式
    	admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
    	if err != nil {
    		return err
    	}
    	//又在外面套一个可统计指标的wrapper
    	c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
    	return nil
    }
    //代码位于 /vendor/k8s.io/apiserver/pkg/admission/plugins.go
    func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) {
    	handlers := []Interface{}
    	mutationPlugins := []string{}
    	validationPlugins := []string{}
    	for _, pluginName := range pluginNames {
    		pluginConfig, err := configProvider.ConfigFor(pluginName)
    		if err != nil {
    			return nil, err
    		}
    
    		plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
    		if err != nil {
    			return nil, err
    		}
    		if plugin != nil {
    			if decorator != nil {
    				handlers = append(handlers, decorator.Decorate(plugin, pluginName))
    			} else {
    				handlers = append(handlers, plugin)
    			}
    
    			if _, ok := plugin.(MutationInterface); ok {
    				mutationPlugins = append(mutationPlugins, pluginName)
    			}
    			if _, ok := plugin.(ValidationInterface); ok {
    				validationPlugins = append(validationPlugins, pluginName)
    			}
    		}
    	}
    	if len(mutationPlugins) != 0 {
    		klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ","))
    	}
    	if len(validationPlugins) != 0 {
    		klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ","))
    	}
    	return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
    }
    

    准入控制器除了自定义的,从上述代码中也可以观察到也有内置的,内置的准入控制器大概有30+种。

    但是准入控制器的调用却不像认证与授权那样在调用DefaultBuildHandlerChain时加入到handler调用链中,它是每个增删改的实际处理函数中被调用,GenericConfig的AdmissionControl字段也是在初始化GenericServer的时候传递给后者的同名字段

    registerResourceHandlers方法

    延续上篇介绍apiserver启动流程时,调用installer.Install方法,创建了webservice,api中各个URL的路由注册,实现了对应地址的handler,这个handler是通过registerResourceHandlers,方法篇幅即长(约900行),包含了对一个资源的增删改查各种请求的处理,对其只能分段介绍。代码位于/vendor/k8s.io/apiserver/pkg/endpoints/installer.go

    这个方法有三个入参

    • 代表URL的path
    • 资源存储相关的类storage
    • 用于存放路由的go-rest对象webservice

    先从path以及APIInstaller对象中获取group,version,kind,分辨这种资源是cluster scope还是namespace scope的

    	admit := a.group.Admit
    
    	optionsExternalVersion := a.group.GroupVersion
    	if a.group.OptionsExternalVersion != nil {
    		optionsExternalVersion = *a.group.OptionsExternalVersion
    	}
    
    	resource, subresource, err := splitSubresource(path)
    	if err != nil {
    		return nil, err
    	}
    
    	group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version
    
    	fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
    	if err != nil {
    		return nil, err
    	}
    
    	versionedPtr, err := a.group.Creater.New(fqKindToRegister)
    	if err != nil {
    		return nil, err
    	}
    	defaultVersionedObject := indirectArbitraryPointer(versionedPtr)
    	kind := fqKindToRegister.Kind
    	isSubresource := len(subresource) > 0
    
    	// If there is a subresource, namespace scoping is defined by the parent resource
    	namespaceScoped := true
    	if isSubresource {
    		parentStorage, ok := a.group.Storage[resource]
    		if !ok {
    			return nil, fmt.Errorf("missing parent storage: %q", resource)
    		}
    		scoper, ok := parentStorage.(rest.Scoper)
    		if !ok {
    			return nil, fmt.Errorf("%q must implement scoper", resource)
    		}
    		namespaceScoped = scoper.NamespaceScoped()
    
    	} else {
    		scoper, ok := storage.(rest.Scoper)
    		if !ok {
    			return nil, fmt.Errorf("%q must implement scoper", resource)
    		}
    		namespaceScoped = scoper.NamespaceScoped()
    	}
    

    接着是一系列的判定操作,根据当前这个storage是否有实现对应接口来判定能否提供对应服务,如 创建操作。这个结果会影响后面是否添加对应操作请求的路由

    	creater, isCreater := storage.(rest.Creater)
    

    然后就创建对应请求的Options,如CreateOptions。这个用于在后面创建路由时作为参数,平时使用client-go时也要传入metav1包的CreateOption,ListOption,DeleteOption等,就是这个参数了。

    	versionedCreateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("CreateOptions"))
    	if err != nil {
    		return nil, err
    	}
    

    下一步按照资源类型是cluster scope还是namespace scope来将支持的操作类型组成action集合,这个action集合的动作则是对应http的请求方法,如创建的

    		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
    

    往后就是遍历action集合,为各个操作绑定路由,将其添加到路由集合中,如创建的

    		case "POST": // Create a resource.
    			var handler restful.RouteFunction
    			if isNamedCreater {
    				handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
    			} else {
    				handler = restfulCreateResource(creater, reqScope, admit)
    			}
    			handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
    			if enableWarningHeaders {
    				handler = utilwarning.AddWarningsHandler(handler, warnings)
    			}
    			article := GetArticleForNoun(kind, " ")
    			doc := "create" + article + kind
    			if isSubresource {
    				doc = "create " + subresource + " of" + article + kind
    			}
    			route := ws.POST(action.Path).To(handler).
    				Doc(doc).
    				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
    				Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
    				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
    				Returns(http.StatusOK, "OK", producedObject).
    				// TODO: in some cases, the API may return a v1.Status instead of the versioned object
    				// but currently go-restful can't handle multiple different objects being returned.
    				Returns(http.StatusCreated, "Created", producedObject).
    				Returns(http.StatusAccepted, "Accepted", producedObject).
    				Reads(defaultVersionedObject).
    				Writes(producedObject)
    			if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
    				return nil, err
    			}
    			addParams(route, action.Params)
    			routes = append(routes, route)
    

    最后才把这些路由添加到webservice中

    	for kubeVerb := range kubeVerbs {
    		apiResource.Verbs = append(apiResource.Verbs, kubeVerb)
    	}
    

    回头看创建POST路由时,同样按照资源是否命名空间级别的创建赌赢的handler,后面则是go-restful创建路由的代码

    pod是属于命名空间级别的资源,进入restfulCreateNamedResource函数,经过三层调用到达createHandler函数,调用链如下

    restfulCreateNamedResource->handlers.CreateNamedResource->createHandler
    

    createHandler大概逻辑如下

    • 从请求中获取资源的namespace,name,GVK等信息
    • 从RequestScope中获取资源的反序列化器,将body的数据反序列化为runtimeObject
    • 执行mutating准入控制器
    • 调用storage的create,同时传入Validate准入控制器,准备持久化到Etcd
    • 将处理结果写到响应

    代码位于/vendor/k8s.io/apiserver/pkg/endpoints/handlers/create.go

    func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    	return func(w http.ResponseWriter, req *http.Request) {
    		timeout := parseTimeout(req.URL.Query().Get("timeout"))
    		//从请求中获取资源的namespace,name,GVK等信息
    		namespace, name, err := scope.Namer.Name(req)
    		gv := scope.Kind.GroupVersion()
    
    		//从RequestScope中获取资源的反序列化器,将body的数据反序列化为runtimeObject
    		decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
    
    		body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
    
    		obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
    		
    		//调用storage的create,同时传入Validate准入控制器,准备持久化到Etcd
    		requestFunc := func() (runtime.Object, error) {
    			return r.Create(
    				ctx,
    				name,
    				obj,
    				rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
    				options,
    			)
    		}
    		result, err := finishRequest(timeout, func() (runtime.Object, error) {
    			if scope.FieldManager != nil {
    				liveObj, err := scope.Creater.New(scope.Kind)
    				if err != nil {
    					return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
    				}
    				obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
    			}
    			//执行mutating准入控制器
    			if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
    				if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
    					return nil, err
    				}
    			}
    			result, err := requestFunc()
    			// If the object wasn't committed to storage because it's serialized size was too large,
    			// it is safe to remove managedFields (which can be large) and try again.
    			if isTooLargeError(err) {
    				if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
    					accessor.SetManagedFields(nil)
    					result, err = requestFunc()
    				}
    			}
    			return result, err
    		})
    
    		//将处理结果写到响应
    		//如果创建成功的结果按照请求来源时的格式序列化,写到响应体里面
    		transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
    	}
    }
    

    由此段代码可得,Mutate 准入控制器要比Validate 准入控制器先执行

    继续追r.Create方法调用,r.Create==>namedCreaterAdapter.Create-->c.Creater.Create

    到Creater.Create是一个接口的调用,这里实现太多,无法单纯通过goland去找到实现。但这个Creater已经是storage的一个接口,在目录中找pod的storage相关定义在 /pkg/registry/core/pod/storage/storage.go中
    对应的结构定义如下

    type REST struct {
    	*genericregistry.Store
    	proxyTransport http.RoundTripper
    }
    

    它继承于genericregistry.Store,自身并没有再去实现Creater接口了

    genericregistry.Store的定义在/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

    所实现的Create方法大概包含下面步骤

    • 调用了validate准入控制器验证资源
    • 生成name,key等信息用于后续持久化到Etcd
    • 创建一个新的空的资源用于成功时返回结果
    • 调用storage的Create,准备持久化到Etcd
    func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
    	if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
    		return nil, err
    	}
    	// at this point we have a fully formed object.  It is time to call the validators that the apiserver
    	// handling chain wants to enforce.
    	//调用了validate准入控制器验证资源
    	if createValidation != nil {
    		if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
    			return nil, err
    		}
    	}
    
    	//生成name,key等信息用于后续持久化到Etcd
    	name, err := e.ObjectNameFunc(obj)
    	if err != nil {
    		return nil, err
    	}
    	key, err := e.KeyFunc(ctx, name)
    	if err != nil {
    		return nil, err
    	}
    	qualifiedResource := e.qualifiedResourceFromContext(ctx)
    	ttl, err := e.calculateTTL(obj, 0, false)
    	if err != nil {
    		return nil, err
    	}
    	//创建一个新的空的资源用于成功时返回结果
    	out := e.NewFunc()
    	//调用storage的Create,准备持久化到Etcd
    	//如果持久化成功,out里面就会填上持久化后的所有信息到里面
    	if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
    		err = storeerr.InterpretCreateError(err, qualifiedResource, name)
    		err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
    		if !apierrors.IsAlreadyExists(err) {
    			return nil, err
    		}
    		if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
    			return nil, err
    		}
    		accessor, errGetAcc := meta.Accessor(out)
    		if errGetAcc != nil {
    			return nil, err
    		}
    		if accessor.GetDeletionTimestamp() != nil {
    			msg := &err.(*apierrors.StatusError).ErrStatus.Message
    			*msg = fmt.Sprintf("object is being deleted: %s", *msg)
    		}
    		return nil, err
    	}
    	if e.AfterCreate != nil {
    		if err := e.AfterCreate(out); err != nil {
    			return nil, err
    		}
    	}
    	if e.Decorator != nil {
    		if err := e.Decorator(out); err != nil {
    			return nil, err
    		}
    	}
    	return out, nil
    }
    

    持久化到Etcd

    从e.Storage.Create经过两层调用到达store.Create方法,因为有可能包含dryRun,如果dryRun就不需要持久化到Etcd,在这里将看到

    • 将资源转换成无版本类型,即__internal版本
    • 再将资源转换成适合存储的格式
    • 调用Etcd检查资源是否已经存在了
    • 不存在才调用Put把资源存进去
    • 成功了才从etcd的响应中把存储结果反序列化成传进来时的格式

    代码位于 /vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go

    func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
    	if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
    		return errors.New("resourceVersion should not be set on objects to be created")
    	}
    	if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
    		return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
    	}
    	//将资源转换成无版本类型
    	data, err := runtime.Encode(s.codec, obj)
    	if err != nil {
    		return err
    	}
    	key = path.Join(s.pathPrefix, key)
    
    	opts, err := s.ttlOpts(ctx, int64(ttl))
    	if err != nil {
    		return err
    	}
    
    	//再将资源转换成适合存储的格式
    	newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
    	if err != nil {
    		return storage.NewInternalError(err.Error())
    	}
    
    	startTime := time.Now()
    	//检查资源是否已经存在了
    	txnResp, err := s.client.KV.Txn(ctx).If(
    		notFound(key),
    	).Then(
    	//不存在才调用Put把资源存进去
    		clientv3.OpPut(key, string(newData), opts...),
    	).Commit()
    	metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
    	if err != nil {
    		return err
    	}
    	if !txnResp.Succeeded {
    		return storage.NewKeyExistsError(key, 0)
    	}
    
    	//转换响应结果
    	if out != nil {
    		putResp := txnResp.Responses[0].GetResponsePut()
    		return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    	}
    	return nil
    }
    

    至此,资源已落库,创建请求已完毕,apiserver也将结果响应给客户端。

    小结

    本篇衔接前一篇apiserver的启动流程,讲述了认证器,授权器,准入控制器如何被配置的,如果根据APIGroupInfo映射好的storage创建处理请求的handler。当一个请求来的时候如何执行认证操作,授权操作,接着经过Mutate准入控制器和Validate准入控制器等一系列校验,最终转换资源的版本,调用Etcd客户端将资源持久化,也将结果响应回给客户端。

    如有兴趣,可阅读鄙人“k8s源码之旅”系列的其他文章
    kubelet源码分析——kubelet简介与启动
    kubelet源码分析——启动Pod
    kubelet源码分析——关闭Pod
    kubelet源码分析——监控Pod变更
    scheduler源码分析——调度流程
    apiserver源码分析——启动流程
    apiserver源码分析——处理请求

  • 相关阅读:
    UVa 12174 (滑动窗口) Shuffle
    UVa 1607 (二分) Gates
    CodeForces ZeptoLab Code Rush 2015
    HDU 1525 (博弈) Euclid's Game
    HDU 2147 (博弈) kiki's game
    UVa 11093 Just Finish it up
    UVa 10954 (Huffman 优先队列) Add All
    CodeForces Round #298 Div.2
    UVa 12627 (递归 计数 找规律) Erratic Expansion
    UVa 714 (二分) Copying Books
  • 原文地址:https://www.cnblogs.com/HopeGi/p/15370176.html
Copyright © 2011-2022 走看看