zoukankan      html  css  js  c++  java
  • nginx动态化~1 nginx-ingress

    从入口来看下

    cmd/nginx/main.go 初始化各种逻辑,开启监控数据收集,初始化web服务,初始化Nginx等

    func main() {
        ...
        kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
        ...
        reg := prometheus.NewRegistry()
        mc := metric.NewDummyCollector()
        ...
        ngx := controller.NewNGINXController(conf, mc)
    
        mux := http.NewServeMux()
        registerHealthz(nginx.HealthPath, ngx, mux)
        registerMetrics(reg, mux)
    
        go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
        go ngx.Start()
    ....

    ingress/controller/nginx.go

    // NewNGINXController creates a new NGINX Ingress controller.
    func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
       //定义事件处理。初始化
       //webhoot处理
           if n.cfg.ValidationWebhook != "" {
            n.validationWebhookServer = &http.Server{
                Addr:      config.ValidationWebhook,
                Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
                TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
                // disable http/2
                // https://github.com/kubernetes/kubernetes/issues/80313
                // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
                TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
            }
        }
       // 每个元素入队都会调n.syncIngress
       n.syncQueue = task.NewTaskQueue(n.syncIngress).
       // 文件变更回调注册
       onTemplateChange := func() {
            template, err := ngx_template.NewTemplate(nginx.TemplatePath)
            if err != nil {
                // this error is different from the rest because it must be clear why nginx is not working
                klog.ErrorS(err, "Error loading new template")
                return
            }
    
            n.t = template
            klog.InfoS("New NGINX configuration template loaded")
            n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
        }
    
        ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
        if err != nil {
            klog.Fatalf("Invalid NGINX configuration template: %v", err)
        }
    
        n.t = ngxTpl
    
        _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
    ...

    start

    func (n *NGINXController) Start() {
       n.store.Run(n.stopCh)
       ...
       go n.syncQueue.Run(time.Second, n.stopCh)
        // force initial sync
        n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
       for {
            select {
            case err := <-n.ngxErrCh:
                if n.isShuttingDown {
                    return
                }
    
                // if the nginx master process dies, the workers continue to process requests
                // until the failure of the configured livenessProbe and restart of the pod.
                if process.IsRespawnIfRequired(err) {
                    return
                }
    
            case event := <-n.updateCh.Out():
                if n.isShuttingDown {
                    break
                }
    
                if evt, ok := event.(store.Event); ok {
                    klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
                    if evt.Type == store.ConfigurationEvent {
                        // TODO: is this necessary? Consider removing this special case
                        n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                        continue
                    }
    
                    n.syncQueue.EnqueueSkippableTask(evt.Obj)
                } else {
                    klog.Warningf("Unexpected event type received %T", event)
                }
            case <-n.stopCh:
                return
            }
        }
    ...

    syncIngress

    func (n *NGINXController) syncIngress(interface{}) error {
        n.syncRateLimiter.Accept()
    
        if n.syncQueue.IsShuttingDown() {
            return nil
        }
    
        ings := n.store.ListIngresses()
        hosts, servers, pcfg := n.getConfiguration(ings)
    
        n.metricCollector.SetSSLExpireTime(servers)
    
        if n.runningConfig.Equal(pcfg) {
            klog.V(3).Infof("No configuration change detected, skipping backend reload")
            return nil
        }
    
        n.metricCollector.SetHosts(hosts)
    
        if !n.IsDynamicConfigurationEnough(pcfg) {
            klog.InfoS("Configuration changes detected, backend reload required")
    
            hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
                TagName: "json",
            })
    
            pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
    
            err := n.OnUpdate(*pcfg)
            if err != nil {
                n.metricCollector.IncReloadErrorCount()
                n.metricCollector.ConfigSuccess(hash, false)
                klog.Errorf("Unexpected failure reloading the backend:
    %v", err)
                n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
                return err
            }
    
            klog.InfoS("Backend successfully reloaded")
            n.metricCollector.ConfigSuccess(hash, true)
            n.metricCollector.IncReloadCount()
    
            n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
        }
    
        isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
        if isFirstSync {
            // For the initial sync it always takes some time for NGINX to start listening
            // For large configurations it might take a while so we loop and back off
            klog.InfoS("Initial sync, sleeping for 1 second")
            time.Sleep(1 * time.Second)
        }
    
        retry := wait.Backoff{
            Steps:    15,
            Duration: 1 * time.Second,
            Factor:   0.8,
            Jitter:   0.1,
        }
    
        err := wait.ExponentialBackoff(retry, func() (bool, error) {
            err := n.configureDynamically(pcfg)
            if err == nil {
                klog.V(2).Infof("Dynamic reconfiguration succeeded.")
                return true, nil
            }
    
            klog.Warningf("Dynamic reconfiguration failed: %v", err)
            return false, err
        })
        if err != nil {
            klog.Errorf("Unexpected failure reconfiguring NGINX:
    %v", err)
            return err
        }
    
        ri := getRemovedIngresses(n.runningConfig, pcfg)
        re := getRemovedHosts(n.runningConfig, pcfg)
        n.metricCollector.RemoveMetrics(ri, re)
    
        n.runningConfig = pcfg
    
        return nil
    }

    更新具体动作

    func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
        cfg := n.store.GetBackendConfiguration()
        cfg.Resolver = n.resolver
    
        content, err := n.generateTemplate(cfg, ingressCfg)
        if err != nil {
            return err
        }
    
        err = createOpentracingCfg(cfg)
        if err != nil {
            return err
        }
    
        err = n.testTemplate(content)
        if err != nil {
            return err
        }
    
        if klog.V(2).Enabled() {
            src, _ := os.ReadFile(cfgPath)
            if !bytes.Equal(src, content) {
                tmpfile, err := os.CreateTemp("", "new-nginx-cfg")
                if err != nil {
                    return err
                }
                defer tmpfile.Close()
                err = os.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)
                if err != nil {
                    return err
                }
    
                diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()
                if err != nil {
                    if exitError, ok := err.(*exec.ExitError); ok {
                        ws := exitError.Sys().(syscall.WaitStatus)
                        if ws.ExitStatus() == 2 {
                            klog.Warningf("Failed to executing diff command: %v", err)
                        }
                    }
                }
    
                klog.InfoS("NGINX configuration change", "diff", string(diffOutput))
    
                // we do not defer the deletion of temp files in order
                // to keep them around for inspection in case of error
                os.Remove(tmpfile.Name())
            }
        }
    
        err = os.WriteFile(cfgPath, content, file.ReadWriteByUser)
        if err != nil {
            return err
        }
    
        o, err := n.command.ExecCommand("-s", "reload").CombinedOutput()
        if err != nil {
            return fmt.Errorf("%v
    %v", err, string(o))
        }
    
        return nil
    }
    // generateTemplate returns the nginx configuration file content
    func (n NGINXController) generateTemplate(cfg ngx_config.Configuration, ingressCfg ingress.Configuration) ([]byte, error) {
    
        if n.cfg.EnableSSLPassthrough {
            servers := []*TCPServer{}
            for _, pb := range ingressCfg.PassthroughBackends {
                svc := pb.Service
                if svc == nil {
                    klog.Warningf("Missing Service for SSL Passthrough backend %q", pb.Backend)
                    continue
                }
                port, err := strconv.Atoi(pb.Port.String()) // #nosec
                if err != nil {
                    for _, sp := range svc.Spec.Ports {
                        if sp.Name == pb.Port.String() {
                            port = int(sp.Port)
                            break
                        }
                    }
                } else {
                    for _, sp := range svc.Spec.Ports {
                        if sp.Port == int32(port) {
                            port = int(sp.Port)
                            break
                        }
                    }
                }
    
                // TODO: Allow PassthroughBackends to specify they support proxy-protocol
                servers = append(servers, &TCPServer{
                    Hostname:      pb.Hostname,
                    IP:            svc.Spec.ClusterIP,
                    Port:          port,
                    ProxyProtocol: false,
                })
            }
    
            n.Proxy.ServerList = servers
        }
    
        // NGINX cannot resize the hash tables used to store server names. For
        // this reason we check if the current size is correct for the host
        // names defined in the Ingress rules and adjust the value if
        // necessary.
        // https://trac.nginx.org/nginx/ticket/352
        // https://trac.nginx.org/nginx/ticket/631
        var longestName int
        var serverNameBytes int
    
        for _, srv := range ingressCfg.Servers {
            hostnameLength := len(srv.Hostname)
            if srv.RedirectFromToWWW {
                hostnameLength += 4
            }
            if longestName < hostnameLength {
                longestName = hostnameLength
            }
    
            for _, alias := range srv.Aliases {
                if longestName < len(alias) {
                    longestName = len(alias)
                }
            }
    
            serverNameBytes += hostnameLength
        }
    
        nameHashBucketSize := nginxHashBucketSize(longestName)
        if cfg.ServerNameHashBucketSize < nameHashBucketSize {
            klog.V(3).InfoS("Adjusting ServerNameHashBucketSize variable", "value", nameHashBucketSize)
            cfg.ServerNameHashBucketSize = nameHashBucketSize
        }
    
        serverNameHashMaxSize := nextPowerOf2(serverNameBytes)
        if cfg.ServerNameHashMaxSize < serverNameHashMaxSize {
            klog.V(3).InfoS("Adjusting ServerNameHashMaxSize variable", "value", serverNameHashMaxSize)
            cfg.ServerNameHashMaxSize = serverNameHashMaxSize
        }
    
        if cfg.MaxWorkerOpenFiles == 0 {
            // the limit of open files is per worker process
            // and we leave some room to avoid consuming all the FDs available
            maxOpenFiles := rlimitMaxNumFiles() - 1024
            klog.V(3).InfoS("Maximum number of open file descriptors", "value", maxOpenFiles)
            if maxOpenFiles < 1024 {
                // this means the value of RLIMIT_NOFILE is too low.
                maxOpenFiles = 1024
            }
            klog.V(3).InfoS("Adjusting MaxWorkerOpenFiles variable", "value", maxOpenFiles)
            cfg.MaxWorkerOpenFiles = maxOpenFiles
        }
    
        if cfg.MaxWorkerConnections == 0 {
            maxWorkerConnections := int(float64(cfg.MaxWorkerOpenFiles * 3.0 / 4))
            klog.V(3).InfoS("Adjusting MaxWorkerConnections variable", "value", maxWorkerConnections)
            cfg.MaxWorkerConnections = maxWorkerConnections
        }
    
        setHeaders := map[string]string{}
        if cfg.ProxySetHeaders != "" {
            cmap, err := n.store.GetConfigMap(cfg.ProxySetHeaders)
            if err != nil {
                klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.ProxySetHeaders, err)
            } else {
                setHeaders = cmap.Data
            }
        }
    
        addHeaders := map[string]string{}
        if cfg.AddHeaders != "" {
            cmap, err := n.store.GetConfigMap(cfg.AddHeaders)
            if err != nil {
                klog.Warningf("Error reading ConfigMap %q from local store: %v", cfg.AddHeaders, err)
            } else {
                addHeaders = cmap.Data
            }
        }
    
        sslDHParam := ""
        if cfg.SSLDHParam != "" {
            secretName := cfg.SSLDHParam
    
            secret, err := n.store.GetSecret(secretName)
            if err != nil {
                klog.Warningf("Error reading Secret %q from local store: %v", secretName, err)
            } else {
                nsSecName := strings.Replace(secretName, "/", "-", -1)
                dh, ok := secret.Data["dhparam.pem"]
                if ok {
                    pemFileName, err := ssl.AddOrUpdateDHParam(nsSecName, dh)
                    if err != nil {
                        klog.Warningf("Error adding or updating dhparam file %v: %v", nsSecName, err)
                    } else {
                        sslDHParam = pemFileName
                    }
                }
            }
        }
    
        cfg.SSLDHParam = sslDHParam
    
        cfg.DefaultSSLCertificate = n.getDefaultSSLCertificate()
    
        tc := ngx_config.TemplateConfig{
            ProxySetHeaders:          setHeaders,
            AddHeaders:               addHeaders,
            BacklogSize:              sysctlSomaxconn(),
            Backends:                 ingressCfg.Backends,
            PassthroughBackends:      ingressCfg.PassthroughBackends,
            Servers:                  ingressCfg.Servers,
            TCPBackends:              ingressCfg.TCPEndpoints,
            UDPBackends:              ingressCfg.UDPEndpoints,
            Cfg:                      cfg,
            IsIPV6Enabled:            n.isIPV6Enabled && !cfg.DisableIpv6,
            NginxStatusIpv4Whitelist: cfg.NginxStatusIpv4Whitelist,
            NginxStatusIpv6Whitelist: cfg.NginxStatusIpv6Whitelist,
            RedirectServers:          buildRedirects(ingressCfg.Servers),
            IsSSLPassthroughEnabled:  n.cfg.EnableSSLPassthrough,
            ListenPorts:              n.cfg.ListenPorts,
            PublishService:           n.GetPublishService(),
            EnableMetrics:            n.cfg.EnableMetrics,
            MaxmindEditionFiles:      n.cfg.MaxmindEditionFiles,
            HealthzURI:               nginx.HealthPath,
            MonitorMaxBatchSize:      n.cfg.MonitorMaxBatchSize,
            PID:                      nginx.PID,
            StatusPath:               nginx.StatusPath,
            StatusPort:               nginx.StatusPort,
            StreamPort:               nginx.StreamPort,
        }
    
        tc.Cfg.Checksum = ingressCfg.ConfigurationChecksum
    
        return n.t.Write(tc)
    }
  • 相关阅读:
    模型驱动架构探索之游戏引擎设计 (二)开始建模
    模型驱动架构探索之游戏引擎设计 (二)粒度统一
    模型驱动架构探索之游戏引擎设计 (一)
    模型驱动架构探索之游戏引擎设计 (序)
    时隔几年,再写传统的简单问题算法,又有何不同?
    【自学笔记】0基础自学机器学习 (第三天)
    【自学笔记】0基础自学机器学习 (第二天)
    JavaSE-知识点总结
    Java框架之SpringBoot 09-Web构建-yml-模块-注解
    Java框架之SpringSecurity 08-权限系统
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15417082.html
Copyright © 2011-2022 走看看