zoukankan      html  css  js  c++  java
  • Flink源码阅读 K8s上部署session模式的集群(入口配置)

    启动主类 KubernetesSessionCli

    KubernetesSessionCli#mian 主类入口

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#main
    public static void main(String[] args) {
        // 通过环境变量加载配置 ${FLINK_CONF_DIR}/flink-conf.yaml 
        final Configuration configuration = GlobalConfiguration.loadConfiguration();
        // 获取配置目录, 按优先级只取前一个符合存在条件的目录: FLINK_CONF_DIR, ../conf, conf
        final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
        int retCode;
        try {
            final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
            retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
        } catch (CliArgsException e) {
            retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
        } catch (Exception e) {
            retCode = AbstractCustomCommandLine.handleError(e, LOG);
        }
        System.exit(retCode);
    }

    KubernetesSessionCli#run 核心流程

    获取有效配置 -> 加载和匹配ClusterClientFactory -> 构造Descriptor -> 查询或创建服务 -> Descriptor配置集群内部细节 -> 附属管道检测stop,quit命令
    
    //org.apache.flink.kubernetes.cli.KubernetesSessionCli#run
    private int run(String[] args) throws FlinkException, CliArgsException {
        /*
        获取有效配置, 包括三部分:
          ${FLINK_CONF_DIR}/flink-conf.yaml
          其他 -Dk=v动态配置, 无v时视为true
          execution.target = kubernetes-session
        */
        final Configuration configuration = getEffectiveConfiguration(args);
        // loader加载所有ClusterClientFactory, 根据execution.target匹配唯一Factory
        final ClusterClientFactory<String> kubernetesClusterClientFactory =
                clusterClientServiceLoader.getClusterClientFactory(configuration);
        // 加载k8s配置以创建NamespacedKubernetesClient, 以此构造Descriptor
        final ClusterDescriptor<String> kubernetesClusterDescriptor =
                kubernetesClusterClientFactory.createClusterDescriptor(configuration);
        try {
            final ClusterClient<String> clusterClient;
            // kubernetes.cluster-id
            String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
            // execution.attached =  false
            final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
            // kubernetesClusterDescriptor已经存在一个client连接Cluster, 此处使用新的client用于连接Service
            final FlinkKubeClient kubeClient =
                    FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
            // Retrieve or create a session cluster.
            if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
                clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
            } else {
                clusterClient =
                        kubernetesClusterDescriptor
                                // 集群部署细节可以查看: Flink源码阅读 - K8s上部署session模式的集群(内部细节) 
                                .deploySessionCluster(
                                        kubernetesClusterClientFactory.getClusterSpecification(
                                                configuration))
                                .getClusterClient();
                clusterId = clusterClient.getClusterId();
            }
            // 如果为附属模式提交管道, 对stop,quit输入命令做集群关闭, client关闭
            try {
                if (!detached) {
                    Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
                    try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
                        // f0 = true 持续接收和解析用户交互输入
                        while (continueRepl.f0) {
                            continueRepl = repStep(in);
                        }
                    } catch (Exception e) {
                        LOG.warn(
                                "Exception while running the interactive command line interface.",
                                e);
                    }
                    // 用户输入quit/stop时, 不继续取输入, stop关闭集群
                    if (continueRepl.f1) {
                        kubernetesClusterDescriptor.killCluster(clusterId);
                    }
                }
                // 客户端退出
                clusterClient.close();
                kubeClient.close();
            } catch (Exception e) {
                LOG.info("Could not properly shutdown cluster client.", e);
            }
        } finally {
            try {
                kubernetesClusterDescriptor.close();
            } catch (Exception e) {
                LOG.info("Could not properly close the kubernetes cluster descriptor.", e);
            }
        }
        return 0;
    }

    getEffectiveConfiguration 获取有效配置

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#getEffectiveConfiguration
    Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
        // 解析主类收到的所有入参, GenericCLI cli
        final CommandLine commandLine = cli.parseCommandLineOptions(args, true);
        // 追溯调用链可知baseConfiguration内容即 ${FLINK_CONF_DIR}/flink-conf.yaml
        final Configuration effectiveConfiguration = new Configuration(baseConfiguration);
        
        // CommandLine 转 Configuration
        effectiveConfiguration.addAll(cli.toConfiguration(commandLine));
        // execution.target = kubernetes-session
        effectiveConfiguration.set(DeploymentOptions.TARGET, KubernetesSessionClusterExecutor.NAME);
        return effectiveConfiguration;
    }

    parseCommandLineOptions 解析为命令行

    // org.apache.flink.client.cli.CustomCommandLine#parseCommandLineOptions
    default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final Options options = new Options();
        // 添加原生options
        addGeneralOptions(options);
        // 添加运行时 options, GenericCLI实现为空操作
        addRunOptions(options);
        // 调用 common-cli 解析参数数组为 CommandLine
        return CliFrontendParser.parse(options, args, stopAtNonOptions);
    }
    
    // org.apache.flink.client.cli.GenericCLI#addGeneralOptions
    @Override
    public void addGeneralOptions(Options baseOptions) {
        // executorOption = e[executor], hasArg, 匹配示例: -e kubernetes-session
        baseOptions.addOption(executorOption);
        // targetOption = t[target], hasArg, 匹配示例: -t kubernetes-session
        baseOptions.addOption(targetOption);
        // 匹配 -Dk=v
        baseOptions.addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES);
    }
    
    // org.apache.flink.client.cli.CliFrontendParser#parse
    public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
            throws CliArgsException {
        final DefaultParser parser = new DefaultParser();
        try {
            return parser.parse(options, args, stopAtNonOptions);
        } catch (ParseException e) {
            throw new CliArgsException(e.getMessage());
        }
    }

    toConfiguration 解析为Flink配置

    // org.apache.flink.client.cli.GenericCLI#toConfiguration
    @Override
    public Configuration toConfiguration(final CommandLine commandLine) {
        final Configuration resultConfiguration = new Configuration();
    
        final String executorName = commandLine.getOptionValue(executorOption.getOpt());
        if (executorName != null) {
            resultConfiguration.setString(DeploymentOptions.TARGET, executorName);
        }
    
        final String targetName = commandLine.getOptionValue(targetOption.getOpt());
        if (targetName != null) {
            resultConfiguration.setString(DeploymentOptions.TARGET, targetName);
        }
        // 以上配置在KubernetesSessionCli调用此方法后被覆盖为  execution.target = kubernetes-session
    
        // CommandLine -> Properties -> Configuration
        DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration);
    
        // $internal.deployment.config-dir = FLINK_CONF_DIR[或 ../conf, conf]
        resultConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);
    
        return resultConfiguration;
    }
    
    // org.apache.flink.client.cli.DynamicPropertiesUtil#encodeDynamicProperties
    static void encodeDynamicProperties(
            final CommandLine commandLine, final Configuration effectiveConfiguration) {
        final Properties properties = commandLine.getOptionProperties(DYNAMIC_PROPERTIES.getOpt());
        properties
                .stringPropertyNames()
                .forEach(
                        key -> {
                            final String value = properties.getProperty(key);
                            if (value != null) {
                                // 匹配到 -Dk=v
                                effectiveConfiguration.setString(key, value);
                            } else {
                                // 匹配到 -Dk
                                effectiveConfiguration.setString(key, "true");
                            }
                        });
    }

    getClusterClientFactory 获取配置工厂

    // org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory
    @Override
    public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
            final Configuration configuration) {
        checkNotNull(configuration);
        // JAVA 基础 ServiceLoader 服务懒加载可自行研究
        final ServiceLoader<ClusterClientFactory> loader =
                ServiceLoader.load(ClusterClientFactory.class);
        final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
        /*
        迭代服务资源 META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
           org.apache.flink.client.deployment.StandaloneClientFactory
           org.apache.flink.kubernetes.KubernetesClusterClientFactory
           org.apache.flink.yarn.YarnClusterClientFactory
        因此对应到官方文档 Deployment.ResourceProviders 章节, 仅提供了三种资源模式: Standalone, Native Kubernetes, Yarn
        */
        final Iterator<ClusterClientFactory> factories = loader.iterator();
        while (factories.hasNext()) {
            try {
                final ClusterClientFactory factory = factories.next();
                // 检查工厂是否兼容提供的配置
                if (factory != null && factory.isCompatibleWith(configuration)) {
                    compatibleFactories.add(factory);
                }
            } catch (Throwable e) {
                if (e.getCause() instanceof NoClassDefFoundError) {
                    LOG.info("Could not load factory due to missing dependencies.");
                } else {
                    throw e;
                }
            }
        }
        if (compatibleFactories.size() > 1) {
            final List<String> configStr =
                    configuration.toMap().entrySet().stream()
                            .map(e -> e.getKey() + "=" + e.getValue())
                            .collect(Collectors.toList());
            throw new IllegalStateException(
                    "Multiple compatible client factories found for:\n"
                            + String.join("\n", configStr)
                            + ".");
        }
        if (compatibleFactories.isEmpty()) {
            throw new IllegalStateException(
                    "No ClusterClientFactory found. If you were targeting a Yarn cluster, "
                            + "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
                            + "classpath. For more information refer to the \"Deployment\" section of the official "
                            + "Apache Flink documentation.");
        }
        //仅允许一个有效工厂
        return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
    }
    
    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#isCompatibleWith
    @Override
    public boolean isCompatibleWith(Configuration configuration) {
        checkNotNull(configuration);
        // 此时 execution.target = kubernetes-session
        final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
        return KubernetesDeploymentTarget.isValidKubernetesTarget(deploymentTarget);
    }
    
    // org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget#isValidKubernetesTarget
    public static boolean isValidKubernetesTarget(final String configValue) {
        return configValue != null
                && Arrays.stream(KubernetesDeploymentTarget.values()) // values = [kubernetes-session, kubernetes-application]
                        .anyMatch(
                                kubernetesDeploymentTarget ->
                                        kubernetesDeploymentTarget.name.equalsIgnoreCase(
                                                configValue));
    }

    createClusterDescriptor 创建集群Descripter

    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#createClusterDescriptor
    @Override
    public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
        checkNotNull(configuration);
        if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
            final String clusterId = generateClusterId();
            configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
        }
        return new KubernetesClusterDescriptor(
                configuration,
                // 封装K8s原生Client的FlinkKubeClient
                FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
    }
    
    // org.apache.flink.kubernetes.KubernetesClusterClientFactory#generateClusterId
    // 生成 flink-cluster-xxx 的共45位的clusterId
    private String generateClusterId() {
        final String randomID = new AbstractID().toString();
        return (CLUSTER_ID_PREFIX + randomID)
                .substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
    }

    fromConfiguration 创建FlinkKubeClient

    Flink配置 -> K8s配置, namespace -> NamespacedKubernetesClient -> Fabric8FlinkKubeClient(Flink配置, K8sCli, IO线程池)
    
    // org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory#fromConfiguration
    public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
        final Config config;
        // kubernetes.context 基于配置的不同上下文管理不同flink集群
        final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
        if (kubeContext != null) {
            LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
        }
        // kubernetes.config.file
        final String kubeConfigFile =
                flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
        if (kubeConfigFile != null) {
            LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
            try {
                /* 
                Config构造 Config fromKubeconfig(String context, String kubeconfigContents, String kubeconfigPath)
                如果kubeContext为空,kubeConfigFile中的默认上下文将被使用。注意:第三个参数kubeconfigPath是可选
                的,设置为空。它仅用于在传递文件时重写kubecconfig内部的相对tls资产路径,并且在kubecconfig通过相对
                路径引用一些资产的情况下
                */
                config = Config.fromKubeconfig(kubeContext,
                        FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
            } catch (IOException e) {
                throw new KubernetesClientException("Load kubernetes config failed.", e);
            }
        } else {
            LOG.debug("Trying to load default kubernetes config.");
            config = Config.autoConfigure(kubeContext);
        }
        // kubernetes.namespace
        final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
        LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
        config.setNamespace(namespace);
        // This could be removed after we bump the fabric8 Kubernetes client version to 4.13.0+ or
        // use the a shared connection for all ConfigMap watches. See FLINK-22006 for more
        // information.  以后可能会移除
        trySetMaxConcurrentRequest(config);
        // 与K8s交互的底层Client
        final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
        
        // kubernetes.client.io-pool.size = 4
        final int poolSize =
                flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
        return new Fabric8FlinkKubeClient(
                flinkConfig, client, createThreadPoolForAsyncIO(poolSize, useCase));
    }
    
    @VisibleForTesting
    static void trySetMaxConcurrentRequest(Config config) {
        // kubernetes.max.concurrent.requests = 64
        final String configuredMaxConcurrentRequests =
                Utils.getSystemPropertyOrEnvVar(
                        Config.KUBERNETES_MAX_CONCURRENT_REQUESTS,
                        String.valueOf(Config.DEFAULT_MAX_CONCURRENT_REQUESTS));
        if (configuredMaxConcurrentRequests != null) {
            LOG.debug(
                    "Setting max concurrent requests of Kubernetes client to {}",
                    configuredMaxConcurrentRequests);
            config.setMaxConcurrentRequests(Integer.parseInt(configuredMaxConcurrentRequests));
        }
    }

    repStep 附属管道对命令的重复检测

    // org.apache.flink.kubernetes.cli.KubernetesSessionCli#repStep
    
    // 检查是否继续读取输入和关闭集群
    // f0=true 继续读取用户交互输入
    // f1=true 关闭集群
    private Tuple2<Boolean, Boolean> repStep(BufferedReader in)
            throws IOException, InterruptedException {
        final long startTime = System.currentTimeMillis();
        // 3s内输入流为空时一直循环等待
        while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
                && (!in.ready())) {
            Thread.sleep(200L);
        }
        // ------------- handle interactive command by user. ----------------------
        if (in.ready()) {
            final String command = in.readLine();
            switch (command) {
                case "quit":
                    return new Tuple2<>(false, false);
                case "stop":
                    return new Tuple2<>(false, true);
                case "help":
                    System.err.println(KUBERNETES_CLUSTER_HELP);
                    break;
                default:
                    System.err.println("Unknown command '" + command + "'. Showing help:");
                    System.err.println(KUBERNETES_CLUSTER_HELP);
                    break;
            }
        }
        return new Tuple2<>(true, false);
    }

    后续源码研读转移到 Gitee

  • 相关阅读:
    ‘Host’ is not allowed to connect to this mysql server
    centos7安装mysql
    further configuration avilable 不见了
    Dynamic Web Module 3.0 requires Java 1.6 or newer
    hadoop启动 datanode的live node为0
    ssh远程访问失败 Centos7
    Linux 下的各种环境安装
    Centos7 安装 python2.7
    安装scala
    Centos7 安装 jdk 1.8
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/15765666.html
Copyright © 2011-2022 走看看