zoukankan      html  css  js  c++  java
  • Flink源码阅读 K8s上部署session模式的集群(入口配置)

    启动主类 KubernetesSessionCli

    KubernetesSessionCli#mian 主类入口

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#main
    public static void main(String[] args) {
        // 通过环境变量加载配置 ${FLINK_CONF_DIR}/flink-conf.yaml 
        final Configuration configuration = GlobalConfiguration.loadConfiguration();
        // 获取配置目录, 按优先级只取前一个符合存在条件的目录: FLINK_CONF_DIR, ../conf, conf
        final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
        int retCode;
        try {
            final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
        } catch (CliArgsException e) {
            retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
        } catch (Exception e) {
            retCode = AbstractCustomCommandLine.handleError(e, LOG);
        }
        System.exit(retCode);
    }

    KubernetesSessionCli#run 核心流程

    获取有效配置 -> 加载和匹配ClusterClientFactory -> 构造Descriptor -> 查询或创建服务 -> Descriptor配置集群内部细节 -> 附属管道检测stop,quit命令
    
    //org.apache.flink.kubernetes.cli.KubernetesSessionCli#run
    private int run(String[] args) throws FlinkException, CliArgsException {
        /*
        获取有效配置, 包括三部分:
          ${FLINK_CONF_DIR}/flink-conf.yaml
          其他 -Dk=v动态配置, 无v时视为true
          execution.target = kubernetes-session
        */
        final Configuration configuration = getEffectiveConfiguration(args);
        // loader加载所有ClusterClientFactory, 根据execution.target匹配唯一Factory
        final ClusterClientFactory<String> kubernetesClusterClientFactory =
                clusterClientServiceLoader.getClusterClientFactory(configuration);
        // 加载k8s配置以创建NamespacedKubernetesClient, 以此构造Descriptor
        final ClusterDescriptor<String> kubernetesClusterDescriptor =
                kubernetesClusterClientFactory.createClusterDescriptor(configuration);
        try {
            final ClusterClient<String> clusterClient;
            // kubernetes.cluster-id
            String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
            // execution.attached =  false
            final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
            // kubernetesClusterDescriptor已经存在一个client连接Cluster, 此处使用新的client用于连接Service
            final FlinkKubeClient kubeClient =
                    FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
            // Retrieve or create a session cluster.
            if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
                clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
            } else {
                clusterClient =
                        kubernetesClusterDescriptor
                                // 集群部署细节可以查看: Flink源码阅读 - K8s上部署session模式的集群(内部细节) 
                                .deploySessionCluster(
                                        kubernetesClusterClientFactory.getClusterSpecification(
                                                configuration))
                                .getClusterClient();
                clusterId = clusterClient.getClusterId();
            }
            // 如果为附属模式提交管道, 对stop,quit输入命令做集群关闭, client关闭
            try {
                if (!detached) {
                    Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
                    try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
                        // f0 = true 持续接收和解析用户交互输入
                        while (continueRepl.f0) {
                            continueRepl = repStep(in);
                        }
                    } catch (Exception e) {
                        LOG.warn(
                                "Exception while running the interactive command line interface.",
                                e);
                    }
                    // 用户输入quit/stop时, 不继续取输入, stop关闭集群
                    if (continueRepl.f1) {
                        kubernetesClusterDescriptor.killCluster(clusterId);
                    }
                }
                // 客户端退出
                clusterClient.close();
                kubeClient.close();
            } catch (Exception e) {
                LOG.info("Could not properly shutdown cluster client.", e);
            }
        } finally {
            try {
                kubernetesClusterDescriptor.close();
            } catch (Exception e) {
                LOG.info("Could not properly close the kubernetes cluster descriptor.", e);
            }
        }
        return 0;
    }

    getEffectiveConfiguration 获取有效配置

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#getEffectiveConfiguration
    Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
        // 解析主类收到的所有入参, GenericCLI cli
        final CommandLine commandLine = cli.parseCommandLineOptions(args, true);
        // 追溯调用链可知baseConfiguration内容即 ${FLINK_CONF_DIR}/flink-conf.yaml
        final Configuration effectiveConfiguration = new Configuration(baseConfiguration);
        
        // CommandLine 转 Configuration
        effectiveConfiguration.addAll(cli.toConfiguration(commandLine));
        // execution.target = kubernetes-session
        effectiveConfiguration.set(DeploymentOptions.TARGET, KubernetesSessionClusterExecutor.NAME);
        return effectiveConfiguration;
    }

    parseCommandLineOptions 解析为命令行

    // org.apache.flink.client.cli.CustomCommandLine#parseCommandLineOptions
    default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final Options options = new Options();
        // 添加原生options
        addGeneralOptions(options);
        // 添加运行时 options, GenericCLI实现为空操作
        addRunOptions(options);
        // 调用 common-cli 解析参数数组为 CommandLine
        return CliFrontendParser.parse(options, args, stopAtNonOptions);
    }
    
    // org.apache.flink.client.cli.GenericCLI#addGeneralOptions
    @Override
    public void addGeneralOptions(Options baseOptions) {
        // executorOption = e[executor], hasArg, 匹配示例: -e kubernetes-session
        baseOptions.addOption(executorOption);
        // targetOption = t[target], hasArg, 匹配示例: -t kubernetes-session
        baseOptions.addOption(targetOption);
        // 匹配 -Dk=v
        baseOptions.addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES);
    }
    
    // org.apache.flink.client.cli.CliFrontendParser#parse
    public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final DefaultParser parser = new DefaultParser();
        try {
            return parser.parse(options, args, stopAtNonOptions);
        } catch (ParseException e) {
            throw new CliArgsException(e.getMessage());
        }
    }

    toConfiguration 解析为Flink配置

    // org.apache.flink.client.cli.GenericCLI#toConfiguration
    @Override
    public Configuration toConfiguration(final CommandLine commandLine) {
        final Configuration resultConfiguration = new Configuration();
    
        final String executorName = commandLine.getOptionValue(executorOption.getOpt());
        if (executorName != null) {
            resultConfiguration.setString(DeploymentOptions.TARGET, executorName);
        }
    
        final String targetName = commandLine.getOptionValue(targetOption.getOpt());
        if (targetName != null) {
            resultConfiguration.setString(DeploymentOptions.TARGET, targetName);
        }
        // 以上配置在KubernetesSessionCli调用此方法后被覆盖为  execution.target = kubernetes-session
    
        // CommandLine -> Properties -> Configuration
        DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration);
    
        // $internal.deployment.config-dir = FLINK_CONF_DIR[或 ../conf, conf]
        resultConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);
    
        return resultConfiguration;
    }
    
    // org.apache.flink.client.cli.DynamicPropertiesUtil#encodeDynamicProperties
    static void encodeDynamicProperties(
            final CommandLine commandLine, final Configuration effectiveConfiguration) {
        final Properties properties = commandLine.getOptionProperties(DYNAMIC_PROPERTIES.getOpt());
        properties
                .stringPropertyNames()
                .forEach(
                        key -> {
                            final String value = properties.getProperty(key);
                            if (value != null) {
                                // 匹配到 -Dk=v
                                effectiveConfiguration.setString(key, value);
                            } else {
                                // 匹配到 -Dk
                                effectiveConfiguration.setString(key, "true");
                            }
                        });
    }

    getClusterClientFactory 获取配置工厂

    // org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory
    @Override
    public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
            final Configuration configuration) {
        checkNotNull(configuration);
        // JAVA 基础 ServiceLoader 服务懒加载可自行研究
        final ServiceLoader<ClusterClientFactory> loader =
                ServiceLoader.load(ClusterClientFactory.class);
        final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
        /*
        迭代服务资源 META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
           org.apache.flink.client.deployment.StandaloneClientFactory
           org.apache.flink.kubernetes.KubernetesClusterClientFactory
           org.apache.flink.yarn.YarnClusterClientFactory
        因此对应到官方文档 Deployment.ResourceProviders 章节, 仅提供了三种资源模式: Standalone, Native Kubernetes, Yarn
        */
        final Iterator<ClusterClientFactory> factories = loader.iterator();
        while (factories.hasNext()) {
            try {
                final ClusterClientFactory factory = factories.next();
                // 检查工厂是否兼容提供的配置
                if (factory != null && factory.isCompatibleWith(configuration)) {
                    compatibleFactories.add(factory);
                }
            } catch (Throwable e) {
                if (e.getCause() instanceof NoClassDefFoundError) {
                    LOG.info("Could not load factory due to missing dependencies.");
                } else {
                    throw e;
                }
            }
        }
        if (compatibleFactories.size() > 1) {
            final List<String> configStr =
                    configuration.toMap().entrySet().stream()
                            .map(e -> e.getKey() + "=" + e.getValue())
                            .collect(Collectors.toList());
            throw new IllegalStateException(
                    "Multiple compatible client factories found for:\n"
                            + String.join("\n", configStr)
                            + ".");
        }
        if (compatibleFactories.isEmpty()) {
            throw new IllegalStateException(
                    "No ClusterClientFactory found. If you were targeting a Yarn cluster, "
                            + "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
                            + "classpath. For more information refer to the \"Deployment\" section of the official "
                            + "Apache Flink documentation.");
        }
        //仅允许一个有效工厂
        return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
    }
    
    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#isCompatibleWith
    @Override
    public boolean isCompatibleWith(Configuration configuration) {
        checkNotNull(configuration);
        // 此时 execution.target = kubernetes-session
        final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
        return KubernetesDeploymentTarget.isValidKubernetesTarget(deploymentTarget);
    }
    
    // org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget#isValidKubernetesTarget
    public static boolean isValidKubernetesTarget(final String configValue) {
        return configValue != null
                && Arrays.stream(KubernetesDeploymentTarget.values()) // values = [kubernetes-session, kubernetes-application]
                        .anyMatch(
                                kubernetesDeploymentTarget ->
                                        kubernetesDeploymentTarget.name.equalsIgnoreCase(
                                                configValue));
    }

    createClusterDescriptor 创建集群Descripter

    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#createClusterDescriptor
    @Override
    public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
        checkNotNull(configuration);
        if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
            final String clusterId = generateClusterId();
            configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
        }
        return new KubernetesClusterDescriptor(
                configuration,
                // 封装K8s原生Client的FlinkKubeClient
                FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
    }
    
    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#generateClusterId
    // 生成 flink-cluster-xxx 的共45位的clusterId
    private String generateClusterId() {
        final String randomID = new AbstractID().toString();
        return (CLUSTER_ID_PREFIX + randomID)
                .substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
    }

    fromConfiguration 创建FlinkKubeClient

    Flink配置 -> K8s配置, namespace -> NamespacedKubernetesClient -> Fabric8FlinkKubeClient(Flink配置, K8sCli, IO线程池)
    
    // org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory#fromConfiguration
    public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
        final Config config;
        // kubernetes.context 基于配置的不同上下文管理不同flink集群
        final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
        if (kubeContext != null) {
            LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
        }
        // kubernetes.config.file
        final String kubeConfigFile =
                flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
        if (kubeConfigFile != null) {
            LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
            try {
                /* 
                Config构造 Config fromKubeconfig(String context, String kubeconfigContents, String kubeconfigPath)
                如果kubeContext为空,kubeConfigFile中的默认上下文将被使用。注意:第三个参数kubeconfigPath是可选
                的,设置为空。它仅用于在传递文件时重写kubecconfig内部的相对tls资产路径,并且在kubecconfig通过相对
                路径引用一些资产的情况下
                */
                config = Config.fromKubeconfig(kubeContext,
                        FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
            } catch (IOException e) {
                throw new KubernetesClientException("Load kubernetes config failed.", e);
            }
        } else {
            LOG.debug("Trying to load default kubernetes config.");
            config = Config.autoConfigure(kubeContext);
        }
        // kubernetes.namespace
        final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
        LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
        config.setNamespace(namespace);
        // This could be removed after we bump the fabric8 Kubernetes client version to 4.13.0+ or
        // use the a shared connection for all ConfigMap watches. See FLINK-22006 for more
        // information.  以后可能会移除
        trySetMaxConcurrentRequest(config);
        // 与K8s交互的底层Client
        final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
        
        // kubernetes.client.io-pool.size = 4
        final int poolSize =
                flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
        return new Fabric8FlinkKubeClient(
                flinkConfig, client, createThreadPoolForAsyncIO(poolSize, useCase));
    }
    
    @VisibleForTesting
    static void trySetMaxConcurrentRequest(Config config) {
        // kubernetes.max.concurrent.requests = 64
        final String configuredMaxConcurrentRequests =
                Utils.getSystemPropertyOrEnvVar(
                        Config.KUBERNETES_MAX_CONCURRENT_REQUESTS,
                        String.valueOf(Config.DEFAULT_MAX_CONCURRENT_REQUESTS));
        if (configuredMaxConcurrentRequests != null) {
            LOG.debug(
                    "Setting max concurrent requests of Kubernetes client to {}",
                    configuredMaxConcurrentRequests);
            config.setMaxConcurrentRequests(Integer.parseInt(configuredMaxConcurrentRequests));
        }
    }

    repStep 附属管道对命令的重复检测

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#repStep
    
    // 检查是否继续读取输入和关闭集群
    // f0=true 继续读取用户交互输入
    // f1=true 关闭集群
    private Tuple2<Boolean, Boolean> repStep(BufferedReader in)
            throws IOException, InterruptedException {
        final long startTime = System.currentTimeMillis();
        // 3s内输入流为空时一直循环等待
        while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
                && (!in.ready())) {
            Thread.sleep(200L);
        }
        // ------------- handle interactive command by user. ----------------------
        if (in.ready()) {
            final String command = in.readLine();
            switch (command) {
                case "quit":
                    return new Tuple2<>(false, false);
                case "stop":
                    return new Tuple2<>(false, true);
                case "help":
                    System.err.println(KUBERNETES_CLUSTER_HELP);
                    break;
                default:
                    System.err.println("Unknown command '" + command + "'. Showing help:");
                    System.err.println(KUBERNETES_CLUSTER_HELP);
                    break;
            }
        }
        return new Tuple2<>(true, false);
    }

    后续源码研读转移到 Gitee

  • 相关阅读:
    hi35183e增加exfat文件系统的支持(转)
    UDP 单播、广播和多播(转)
    linux系统中,kill -3查看java进程状态无效的解决方法
    js是用什么语言编写实现的
    云海天教程
    Docker下安装MySQL
    如何延长手机的使用寿命时间
    如何延长空调使用寿命?
    Linux 发送邮件
    Linux jstack命令
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/15765666.html
Copyright © 2011-2022 走看看