version:1.9.6
导航:
1.寻找入口
2.构建命令行
3.创建服务链 CreateServerChain
4.启动服务 s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
1.命令行入口
cmd/kube-apiserver/apiserver.go
命令行解析框架 &cobra 需要先去了解一下:
https://o-my-chenjian.com/2017/09/20/Using-Cobra-With-Golang/
入口函数:cmd/kube-apiserver/apiserver.go
通过cmd的Execute启动服务。
func main(){
... command := app.NewAPIServerCommand() ... if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v ", err) os.Exit(1) } }
2. 构建命令行
cmd 结构体
cmd := &cobra.Command{ Use: "kube-apiserver", Long: `The Kubernetes API server validates and configures data for the api objects which include pods, services, replicationcontrollers, and others. The API Server services REST operations and provides the frontend to the cluster's shared state through which all other components interact.`, RunE: func(cmd *cobra.Command, args []string) error { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) // set default options completedOptions, err := Complete(s) if err != nil { return err } // validate options if errs := completedOptions.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } return Run(completedOptions, stopCh) }, }
各组件程序都是用 cobra
来管理、解析命令行参数的,main 包下面还有 app 包,app 包才是包含创建 cobra 命令逻辑的地方,所以其实 main 包的逻辑特别简单,主要是调用执行函数就可以了。
app.NewAPIServerCommand(server.SetupSignalHandler()) 返回*cobra.Command, 执行 command.Execute()最终会调用 Command结构体中定义的Run函数
上面的代码中RunE是运行并且返回Error的意思
我们可以看到RunE中返回了Run(completedOptions, stopCh)
3. 创建服务链
4. 创建服务
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
这里面创建了一个 server
,经过 PrepareRun()
返回 preparedGenericAPIServer
并最终调用其方法 Run()
GenericAPIServer 结构体:
// GenericAPIServer contains state for a Kubernetes cluster api server. type GenericAPIServer struct { ..... // admissionControl is used to build the RESTStorage that backs an API Group. admissionControl admission.Interface // "Outputs" // Handler holds the handlers being used by this API server Handler *APIServerHandler 。。。。。 // DiscoveryGroupManager serves /apis DiscoveryGroupManager discovery.GroupManager // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config // PostStartHooks are each called after the server has started listening, in a separate go func for each // with no guarantee of ordering between them. The map key is a name used for error reporting. // It may kill the process with a panic if it wishes to by returning an error. postStartHookLock sync.Mutex postStartHooks map[string]postStartHookEntry postStartHooksCalled bool disabledPostStartHooks sets.String preShutdownHookLock sync.Mutex preShutdownHooks map[string]preShutdownHookEntry preShutdownHooksCalled bool 。。。。。 // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup }
// Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { err := s.NonBlockingRun(stopCh) if err != nil { return err } <-stopCh err = s.RunPreShutdownHooks() if err != nil { return err } // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() return nil }
我们看到它又调用了 s.NonBlockingRun()
,看方法名就知道是非阻塞运行即里面会创建新的 goroutine 最终运行 http 服务器,提供 http
接口给其它 kubernetes 组件调用,也是 kubernetes 集群控制的核心机制。然后到 <-stopCh
这里阻塞,如果这个 channel 被 close,
这里就会停止阻塞并处理关闭逻辑最后函数执行结束,s.NonBlockingRun()
这个函数也传入了 stopCh
,同样也是出于类似的考虑,让程序优雅关闭,
stopCh
最初是 NewAPIServerCommand()
中创建的:
stopCh := server.SetupSignalHandler()
很容易看出来这个 channel 跟系统信号量绑定了,即 ctrl + c 或 kill 通知程序关闭的时候会 close 这个 channel ,然后调用<-stopCh
的地方就会停止阻塞,
做关闭程序需要的一些清理操作实现优雅关闭
var onlyOneSignalHandler = make(chan struct{}) // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. func SetupSignalHandler() <-chan struct{} { close(onlyOneSignalHandler) // panics when called twice stop := make(chan struct{}) c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // second signal. Exit directly. }() return stop }
我们再来看看 NonBlockingRun()
这个函数的实现
// NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { ... // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if s.SecureServingInfo != nil && s.Handler != nil { if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil { close(internalStopCh) return err } } ... return nil }
可以看到又调用了 s.SecureServingInfo.Serve()
来启动 http 服务器,继续深入进去
// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) { if s.Listener == nil { return nil, fmt.Errorf("listener must not be nil") } secureServer := &http.Server{ Addr: s.Listener.Addr().String(), Handler: handler, MaxHeaderBytes: 1 << 20, TLSConfig: &tls.Config{ NameToCertificate: s.SNICerts, // Can't use SSLv3 because of POODLE and BEAST // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher // Can't use TLSv1.1 because of RC4 cipher usage MinVersion: tls.VersionTLS12, // enable HTTP2 for go's 1.7 HTTP Server NextProtos: []string{"h2", "http/1.1"}, }, } ....... return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) }
这一步创建了http.Server, 并且调用RunServer
// RunServer listens on the given port if listener is not given, // then spawns a go-routine continuously serving until the stopCh is closed. // It returns a stoppedCh that is closed when all non-hijacked active requests // have been processed. // This function does not block // TODO: make private when insecure serving is gone from the kube-apiserver func RunServer( server *http.Server, ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, ) (<-chan struct{}, error) { if ln == nil { return nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. stoppedCh := make(chan struct{}) go func() { defer close(stoppedCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) cancel() }() go func() { defer utilruntime.HandleCrash() var listener net.Listener listener = tcpKeepAliveListener{ln.(*net.TCPListener)} if server.TLSConfig != nil { listener = tls.NewListener(listener, server.TLSConfig) } err := server.Serve(listener) msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String()) select { case <-stopCh: klog.Info(msg) default: panic(fmt.Sprintf("%s due to error: %v", msg, err)) } }() return stoppedCh, nil }
最终看到在后面那个新的 goroutine 中,调用了server.Serve(listener) 来启动 http 服务器,正常启动的情况下会一直阻塞在这里。
至此,我们初步把 kube-apiserver 源码的主线理清楚了,具体还有很多细节我们后面再继续深入。要理清思路我们就需要尽量先屏蔽细节,寻找我们想知道的逻辑路线。
听说学习是从模仿开始的: 感谢源作者 https://cloud.tencent.com/developer/article/1326541