zoukankan      html  css  js  c++  java
  • Flink源码阅读 K8s上部署session模式的集群(内部细节)

    Flink源码分支: releas-1.13

    deploySessionCluster 部署入口
    // org.apache.flink.kubernetes.KubernetesClusterDescriptor#deploySessionCluster
    
    @Override
    public ClusterClientProvider<String> deploySessionCluster(ClusterSpecification clusterSpecification)
      throws ClusterDeploymentException {
        final ClusterClientProvider<String> clusterClientProvider =
                //内部集群部署, main step
                deployClusterInternal(
                        KubernetesSessionClusterEntrypoint.class.getName(),
                        clusterSpecification,
                        false);
    
        try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
            LOG.info(
                    "Create flink session cluster {} successfully, JobManager Web Interface: {}",
                    clusterId,
                    clusterClient.getWebInterfaceURL());
        }
        return clusterClientProvider;
    }
    deployClusterInternal K8s上部署FLink集群的核心过程
    执行模式 -> 端口 -> 高可用 -> K8s JobManager参数 -> 主容器 -> 规格描述 -> 创建组件 -> ClientProvider
    
    private ClusterClientProvider<String> deployClusterInternal(
            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
            throws ClusterDeploymentException {
        // 当前类158 208两处调用(Session模式, Application模式), 都指定detached=false, 
        // 因此K8s部署的Flink会等待Job返回执行结果, 而不是任务结束就关闭
        final ClusterEntrypoint.ExecutionMode executionMode = detached
                 ? ClusterEntrypoint.ExecutionMode.DETACHED
                 : ClusterEntrypoint.ExecutionMode.NORMAL;
        // 配置 internal.cluster.execution-mode = ExecutionMode.NORMAL
        flinkConfig.setString(
                ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
        // checkAndUpdatePortConfigOption(Configuration flinkConfig, ConfigOption<String> port, int fallbackPort) 
        // 检查flinkConfig配置中键为port的端口值为0时, 则配置port = fallbackPort 
        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
        // blob.server.port = 6124
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
        // taskmanager.rpc.port = 6122
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
        // rest.bind-port = 8081
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
        // 检查高可用 high-availability(或 recovery.mode) = NONE(F), ZOOKEEPER(T), FACTORY_CLASS(T)
        if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
            // 配置 high-availability.cluster-id = {clusterId}, 对象构造必须有clusterId
            flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
            // high-availability.jobmanager.port = {jobmanager.rpc.port} 6123 或 {recovery.jobmanager.port}
            KubernetesUtils.checkAndUpdatePortConfigOption(
                    flinkConfig,
                    // high-availability.jobmanager.port(DeprecatedKeys: recovery.jobmanager.port)
                    HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
                    flinkConfig.get(JobManagerOptions.PORT));
        }
        try {
            final KubernetesJobManagerParameters kubernetesJobManagerParameters =
                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
            final FlinkPod podTemplate = kubernetesJobManagerParameters
                    // kubernetes.pod-template-file.jobmanager
                    .getPodTemplateFilePath()
                    // 加载 Template Yaml 生成主容器 flink-main-container (JobManager)
                    .map(file -> KubernetesUtils.loadPodFromTemplateFile(client,
                            file, Constants.MAIN_CONTAINER_NAME))
                    // template yaml不存在时新建容器 FlinkPod
                    .orElse(new FlinkPod.Builder().build());
            // 创建 JobManager 规格描述(即添加各种资源描述和环境描述的装饰器)
            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                            podTemplate, kubernetesJobManagerParameters);
            // 根据规格描述创建 JobManager 的各种组件
            client.createJobManagerComponent(kubernetesJobManagerSpec);
            // 返回一个Provider, 可以生成RestClusterClient用于访问K8s的Flink集群
            return createClusterClientProvider(clusterId);
        } catch (Exception e) {
            try {
                LOG.warn(
                        "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.",
                        clusterId);
                
                // 部署失败, 清理资源
                client.stopAndCleanupCluster(clusterId);
            } catch (Exception e1) {
                LOG.info(
                        "Failed to stop and clean up the Kubernetes cluster \"{}\".",
                        clusterId,
                        e1);
            }
            throw new ClusterDeploymentException(
                    "Could not create Kubernetes cluster \"" + clusterId + "\".", e);
        }
    }
    ExecutionMode
    //仅用于 MiniDispatcher
    /** Execution mode of the {@link MiniDispatcher}. */
    public enum ExecutionMode {
        
        // MiniDispatcher requestJobResult() 96行判断是否等待job结果
        /** Waits until the job result has been served. */
        NORMAL,
        // MiniDispatcher jobReachedTerminalState() 125行判断是否直接关闭
        /** Directly stops after the job has finished. */
        DETACHED
    }
    isHighAvailabilityModeActivated 高可用判断
    /**
     * High availability mode for Flink's cluster execution. Currently supported modes are:
     *
     * 
     *  NONE: No high availability. 
     *  ZooKeeper: JobManager high availability via ZooKeeper, ZooKeeper is used to select a leader 
     *      among a group of JobManager. This JobManager is responsible for the job execution. Upon 
     *      failure of the leader a new leader is elected which will take over the responsibilities 
     *      of the old leader.
     *  FACTORY_CLASS: Use implementation of {@link
     * org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory} specified in
     * configuration property high-availability
     */
    public enum HighAvailabilityMode {
        NONE(false),
        ZOOKEEPER(true),
        FACTORY_CLASS(true);
    
        private final boolean haActive;
    
        HighAvailabilityMode(boolean haActive) {
            this.haActive = haActive;
        }
        
        public static HighAvailabilityMode fromConfig(Configuration config) {
    
            //high-availability (deprecatedKeys: recovery.mode)
            String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
    
            if (haMode == null) {
                return HighAvailabilityMode.NONE;
    
            //@Deprecated public static final String DEFAULT_RECOVERY_MODE = "standalone";
            } else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
                // Map old default to new default
                return HighAvailabilityMode.NONE;
            } else {
                try {
                    return HighAvailabilityMode.valueOf(haMode.toUpperCase());
                } catch (IllegalArgumentException e) {
                    return FACTORY_CLASS;
                }
            }
        }
    
        public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
            HighAvailabilityMode mode = fromConfig(configuration);
            return mode.haActive;
        }
    }
    KubernetesJobManagerParameters
    // 提供一个属性和配置获取方法
    public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
    	ClusterSpecification clusterSpecification
        
        getAnnotations
    	getBlobServerPort
    	getEntrypointClass
    	getEnvironments
    	getJobManagerCPU
    	getJobManagerMemoryMB
    	getLabels
    	getNodeSelector
    	getOwnerReference
    	getPodTemplateFilePath
    	getRestBindPort
    	getRestPort
    	getRestServiceAnnotations
    	getRestServiceExposedType
    	getRPCPort
    	getServiceAccount
    	getTolerations
    	isInternalServiceEnabled
    
    // K8s部署Flink时, JM和TM配置的共同配置获取
    public abstract class AbstractKubernetesParameters implements KubernetesParameters
    	Configuration flinkConfig
        
        getClusterId
    	getCommonLabels
    	getConfigDirectory
    	getContainerEntrypoint
    	getEnvironmentsFromSecrets
    	getExistingHadoopConfigurationConfigMap
    	getFlinkConfDirInPod
    	getFlinkConfiguration
    	getFlinkLogDirInPod
    	getImage
    	getImagePullPolicy
    	getImagePullSecrets
    	getLocalHadoopConfigurationDirectory
    	getNamespace
    	getSecretNamesToMountPaths
    	hasLog4j
    	hasLogback
    
    // K8s通用配置获取,所有方法已实现在 AbstractKubernetesParameters
    public interface KubernetesParameters
    
    
    loadPodFromTemplateFile 加载template文件
    // 加载后, 选取name = flink-main-container 的容器, 器作为主容器
    
    // org.apache.flink.kubernetes.utils.KubernetesUtils#loadPodFromTemplateFile
    public static FlinkPod loadPodFromTemplateFile(
            FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName) {
        final KubernetesPod pod = kubeClient.loadPodFromTemplateFile(podTemplateFile);
        final List<Container> otherContainers = new ArrayList<>();
        Container mainContainer = null;
    
        for (Container container : pod.getInternalResource().getSpec().getContainers()) {
            if (mainContainerName.equals(container.getName())) {
                mainContainer = container;
            } else {
                otherContainers.add(container);
            }
        }
    
        if (mainContainer == null) {
            LOG.info(
                    "Could not find main container {} in pod template, using empty one to initialize.",
                    mainContainerName);
            mainContainer = new ContainerBuilder().build();
        }
    
        pod.getInternalResource().getSpec().setContainers(otherContainers);
        return new FlinkPod(pod.getInternalResource(), mainContainer);
    }
    
    // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient#loadPodFromTemplateFile Flink的K8s接口方法
    
    // org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient#loadPodFromTemplateFile 唯一Client实现
    @Override
    public KubernetesPod loadPodFromTemplateFile(File file) {
        if (!file.exists()) {
            throw new FlinkRuntimeException(
                    String.format("Pod template file %s does not exist.", file));
        }
        return new KubernetesPod(this.internalClient.pods().load(file).get());
    }
    
    // 其中的 internalClient 为 k8s 实际提供的 NamespacedKubernetesClient, 
    // 至于[k8s Client如何建立Pod, 如何加载template文件] 可另再研究, 此处重心为Flink源码
    
    Flink 封装的 k8s 资源
    // 对接上个代码块的 org/apache/flink/kubernetes/utils/KubernetesUtils.java:378 pod.getInternalResource().getSpec().getContainers() 
    
    // 最高资源抽象, T类型即K8s的某种资源(Pod, Service等)
    public abstract class KubernetesResource<T> {
    
        private T internalResource;
    
        public KubernetesResource(T internalResource) {
            this.internalResource = internalResource;
        }
    
        public T getInternalResource() {
            return internalResource;
        }
    
        public void setInternalResource(T resource) {
            this.internalResource = resource;
        }
    }
    
    // 此处用到实现的 Pod 资源 
    public class KubernetesPod extends KubernetesResource<Pod>
    
    // pod.getInternalResource().getSpec().getContainers() 中涉及的 K8s资源包括
    @JsonPropertyOrder({"apiVersion", "kind", "metadata", "spec", "status"})
    public class Pod implements HasMetadata
    
    @JsonPropertyOrder({"apiVersion", "kind", "metadata", "activeDeadlineSeconds", "affinity", 
    "automountServiceAccountToken", "containers", "dnsConfig", "dnsPolicy", "enableServiceLinks", 
    "ephemeralContainers", "hostAliases", "hostIPC", "hostNetwork", "hostPID", "hostname", 
    "imagePullSecrets", "initContainers", "nodeName", "nodeSelector", "overhead", "preemptionPolicy", 
    "priority", "priorityClassName", "readinessGates", "restartPolicy", "runtimeClassName", 
    "schedulerName", "securityContext", "serviceAccount", "serviceAccountName", "shareProcessNamespace", 
    "subdomain", "terminationGracePeriodSeconds", "tolerations", "topologySpreadConstraints", "volumes"})
    public class PodSpec implements KubernetesResource
    
    @JsonPropertyOrder({"apiVersion", "kind", "metadata", "args", "command", "env", "envFrom", "image", 
    "imagePullPolicy", "lifecycle", "livenessProbe", "name", "ports", "readinessProbe", "resources", 
    "securityContext", "startupProbe", "stdin", "stdinOnce", "terminationMessagePath", 
    "terminationMessagePolicy", "tty", "volumeDevices", "volumeMounts", "workingDir"})
    public class Container implements KubernetesResource
    
    buildKubernetesJobManagerSpecification 构建JobManager
    //org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory#buildKubernetesJobManagerSpecification
    
    public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification(
            FlinkPod podTemplate, KubernetesJobManagerParameters kubernetesJobManagerParameters)
         throws IOException {
        FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
        // HasMetadata是K8s中的资源抽象接口, 定义资源必须提供 Kind,ApiVersion的获取实现,
        // 并将基本属性封装到 ObjectMeta, 具体参考K8s源码 io.fabric8.kubernetes.api.model.HasMetadata
        List<HasMetadata> accompanyingResources = new ArrayList<>();
        // 添加所有 K8s 装饰器, 顺序即Pod配置步骤
        final KubernetesStepDecorator[] stepDecorators =
                new KubernetesStepDecorator[] {
                    new InitJobManagerDecorator(kubernetesJobManagerParameters),
                    new EnvSecretsDecorator(kubernetesJobManagerParameters),
                    new MountSecretsDecorator(kubernetesJobManagerParameters),
                    new CmdJobManagerDecorator(kubernetesJobManagerParameters),
                    new InternalServiceDecorator(kubernetesJobManagerParameters),
                    new ExternalServiceDecorator(kubernetesJobManagerParameters),
                    new HadoopConfMountDecorator(kubernetesJobManagerParameters),
                    new KerberosMountDecorator(kubernetesJobManagerParameters),
                    new FlinkConfMountDecorator(kubernetesJobManagerParameters),
                    new PodTemplateMountDecorator(kubernetesJobManagerParameters)
                };
        for (KubernetesStepDecorator stepDecorator : stepDecorators) {
            // Pod 装饰
            flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
            // 构建协同资源
            accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
        }
        // 创建 Deployment
        final Deployment deployment = createJobManagerDeployment(flinkPod,
                kubernetesJobManagerParameters);
        // 返回 K8s JobManager 的规格信息
        return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
    }
    InitJobManagerDecorator
    主要完成以下配置
    podWithoutMainContainer
        apiVersion
        Spec(serviceAccount,serviceAccountName)
        Metadata(label, annotation)
        Spec(imagePullSecrets, nodeSelector, tolerations)
    
    mainContainer
    	Memory CPU
        name, image, pullPolicy, resource
    	ports(rest, rpc, blob), env, status.podIP
        
    apiVersion: v1
    metadata:
      labels:
        {kubernetes.jobmanager.labels}
        type: flink-native-kubernetes
        app: {kubernetes.cluster-id}
        component: jobmanager
      annotations:
    	{kubernetes.jobmanager.annotations}
    spec:
      serviceAccount: {kubernetes.jobmanager.service-account} defualt
      serviceAccountName: {kubernetes.jobmanager.service-account} defualt
      imagePullSecrets: 
    	{kubernetes.container.image.pull-secrets}
      nodeSelector:
        {kubernetes.jobmanager.node-selector}
      tolerations:
        {kubernetes.jobmanager.tolerations}
      containers:
        - name: flink-main-container
    	  image: {kubernetes.container.image} apache/flink:{tag}
          imagePullPolicy: {kubernetes.container.image.pull-policy} IfNotPresent
          resources:
            - requests:
                {memory}: {jobmanager.memory.process.size}
                {cpu}: {kubernetes.jobmanager.cpu} 1.0
            - limits: 
                {memory}: {jobmanager.memory.process.size}
                {cpu}: {kubernetes.jobmanager.cpu} 1.0
          ports:
            - name: rest
              containerPort: {rest.port} 8081
            - name: jobmanager-rpc
              containerPort: {jobmanager.rpc.port} 6123
            - name: blobserver
              containerPort: {blob.server.port} 8081
    	  env:
    	    - name: {containerized.master.env.*  key}
    		  value: {containerized.master.env.*  value}
    	    - name: _POD_IP_ADDRESS
    		  valueFrom:
    		    fieldRef:
    			  apiVersion: v1
    			  fieldPath: {status.podIP}
    
    // org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());
        // Overwrite fields
        final String serviceAccountName =
                KubernetesUtils.resolveUserDefinedValue(
                        flinkConfig,
                        // kubernetes.jobmanager.service-account
                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
                        kubernetesJobManagerParameters.getServiceAccount(),
                        KubernetesUtils.getServiceAccount(flinkPod),
                        "service account");
        if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() != null) {
            logger.info(
                    "The restart policy of JobManager pod will be overwritten to 'always' "
                            + "since it is controlled by the Kubernetes deployment.");
        }
        // Add apiVersion, Spec(serviceAccount,serviceAccountName)
        basicPodBuilder
                .withApiVersion(API_VERSION)
                .editOrNewSpec()
                .withServiceAccount(serviceAccountName)
                .withServiceAccountName(serviceAccountName)
                .endSpec();
        // Add Metadata(label, annotation), Spec(imagePullSecrets, nodeSelector, tolerations)
        basicPodBuilder
                .editOrNewMetadata()
                .addToLabels(kubernetesJobManagerParameters.getLabels())
                .addToAnnotations(kubernetesJobManagerParameters.getAnnotations())
                .endMetadata()
                .editOrNewSpec()
                .addToImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
                .addToNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
                .addAllToTolerations(
                        // kubernetes.jobmanager.tolerations -> List<Map<String, String>>
                        kubernetesJobManagerParameters.getTolerations().stream()
                                .map(e -> KubernetesToleration.fromMap(e).getInternalResource())
                                .collect(Collectors.toList()))
                .endSpec();
        final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());
        return new FlinkPod.Builder(flinkPod)
                .withPod(basicPodBuilder.build())
                .withMainContainer(basicMainContainer)
                .build();
    }
    
    private Container decorateMainContainer(Container container) {
        final ContainerBuilder mainContainerBuilder = new ContainerBuilder(container);
        // Overwrite fields
        final String image =
                KubernetesUtils.resolveUserDefinedValue(
                        flinkConfig,
                        // kubernetes.container.image (default: apache/flink:{tag})
                        KubernetesConfigOptions.CONTAINER_IMAGE,
                        kubernetesJobManagerParameters.getImage(),
                        container.getImage(),
                        "main container image");
        final String imagePullPolicy =
                KubernetesUtils.resolveUserDefinedValue(
                        flinkConfig,
                        // kubernetes.container.image.pull-policy (default: IfNotPresent)
                        KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
                        kubernetesJobManagerParameters.getImagePullPolicy().name(),
                        container.getImagePullPolicy(),
                        "main container image pull policy");
        final ResourceRequirements requirementsInPodTemplate =
                container.getResources() == null
                        ? new ResourceRequirements()
                        : container.getResources();
        
        // JM Memory CPU 配置
        final ResourceRequirements requirements =
                KubernetesUtils.getResourceRequirements(
                        requirementsInPodTemplate,
                        kubernetesJobManagerParameters.getJobManagerMemoryMB(),
                        kubernetesJobManagerParameters.getJobManagerCPU(),
                        Collections.emptyMap(),
                        Collections.emptyMap());
        // name, image, pullPolicy, resource
        mainContainerBuilder
                .withName(Constants.MAIN_CONTAINER_NAME)
                .withImage(image)
                .withImagePullPolicy(imagePullPolicy)
                .withResources(requirements);
        // ports(rest, rpc, blob), env, status.podIP
        mainContainerBuilder
                .addAllToPorts(getContainerPorts())
                .addAllToEnv(getCustomizedEnvs())
                .addNewEnv()
                .withName(ENV_FLINK_POD_IP_ADDRESS)
                .withValueFrom(
                        new EnvVarSourceBuilder()
                                .withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
                                .build())
                .endEnv();
        return mainContainerBuilder.build();
    }
    
    
    // Pod有配置, 用户有配置, 优先使用用户配置;
    // Pod有配置, 用户无配置, 使用Pod配置
    // Pod无配置, 使用用户配置(已有默认值)
    // org.apache.flink.kubernetes.utils.KubernetesUtils#resolveUserDefinedValue
    public static <T> String resolveUserDefinedValue(
            Configuration flinkConfig,
            ConfigOption<T> configOption,
            String valueOfConfigOptionOrDefault,
            @Nullable String valueOfPodTemplate,
            String fieldDescription) {
        final String resolvedValue;
        if (valueOfPodTemplate != null) {
            // The config option is explicitly set.
            if (flinkConfig.contains(configOption)) {
                resolvedValue = valueOfConfigOptionOrDefault;
                LOG.info(
                        "The {} configured in pod template will be overwritten to '{}' "
                                + "because of explicitly configured options.",
                        fieldDescription,
                        resolvedValue);
            } else {
                resolvedValue = valueOfPodTemplate;
            }
        } else {
            resolvedValue = valueOfConfigOptionOrDefault;
        }
        return resolvedValue;
    }
    EnvSecretsDecorator
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        final Container basicMainContainer =
                new ContainerBuilder(flinkPod.getMainContainer())
                        .addAllToEnv(getSecretEnvs())
                        .build();
        return new FlinkPod.Builder(flinkPod).withMainContainer(basicMainContainer).build();
    }
    private List<EnvVar> getSecretEnvs() {
        // kubernetes.env.secretKeyRef -> List<Map<String, String>>
        return kubernetesComponentConf.getEnvironmentsFromSecrets().stream()
                .map(e -> KubernetesSecretEnvVar.fromMap(e).getInternalResource())
                .collect(Collectors.toList());
    }
    MountSecretsDecorator
    Pod 通过配置 kubernetes.secrets = foo:/opt/secrets-foo,bar:/opt/secrets-bar 挂载磁盘卷 foo, bar, 路径为/opt/secrets-foo-volume, ...
    MainContainer 则指定使用磁盘 foo,bar
    
    // org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        final Pod podWithMount = decoratePod(flinkPod.getPodWithoutMainContainer());
        final Container containerWithMount = decorateMainContainer(flinkPod.getMainContainer());
        return new FlinkPod.Builder(flinkPod)
                .withPod(podWithMount)
                .withMainContainer(containerWithMount)
                .build();
    }
    
    /*
    containers:
      volumeMounts:
        - name: {kubernetes.secrets的key}-volume
          mountPath: {kubernetes.secrets的value}
    */
    private Container decorateMainContainer(Container container) {
        final VolumeMount[] volumeMounts =
                // kubernetes.secrets -> Map<String, String> like: foo:/opt/secrets-foo,bar:/opt/secrets-bar
                kubernetesComponentConf.getSecretNamesToMountPaths().entrySet().stream()
                        .map(
                                secretNameToPath ->
                                        new VolumeMountBuilder()
                                                .withName(secretVolumeName(secretNameToPath.getKey()))
                                                .withMountPath(secretNameToPath.getValue())
                                                .build())
                        .toArray(VolumeMount[]::new);
        return new ContainerBuilder(container).addToVolumeMounts(volumeMounts).build();
    }
    
    /*
    volumes:
      - name: {kubernetes.secrets的key}-volume
        secret:
          secretName: {kubernetes.secrets的key}
    */
    private Pod decoratePod(Pod pod) {
        final Volume[] volumes =
                kubernetesComponentConf.getSecretNamesToMountPaths().keySet().stream()
                        .map(
                                secretName ->
                                        new VolumeBuilder()
                                                .withName(secretVolumeName(secretName))
                                                .withNewSecret()
                                                .withSecretName(secretName)
                                                .endSecret()
                                                .build())
                        .toArray(Volume[]::new);
        return new PodBuilder(pod).editOrNewSpec().addToVolumes(volumes).endSpec().build();
    }
    private String secretVolumeName(String secretName) {
        return secretName + "-volume";
    }
    CmdJobManagerDecorator
    容器主进程启动命令拼接: bash -c kubernetes-jobmanager.sh [kubernetes-session|kubernetes-application]
    
    // org.apache.flink.kubernetes.kubeclient.decorators.CmdJobManagerDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        final Container mainContainerWithStartCmd =
                new ContainerBuilder(flinkPod.getMainContainer())
                        // kubernetes.entry.path = /docker-entrypoint.sh
                        .withCommand(kubernetesJobManagerParameters.getContainerEntrypoint())
                        .withArgs(getJobManagerStartCommand())
                        .build();
        return new FlinkPod.Builder(flinkPod).withMainContainer(mainContainerWithStartCmd).build();
    }
    private List<String> getJobManagerStartCommand() {
        final KubernetesDeploymentTarget deploymentTarget =
                KubernetesDeploymentTarget.fromConfig(
                        kubernetesJobManagerParameters.getFlinkConfiguration());
        // bash -c kubernetes-jobmanager.sh [kubernetes-session|kubernetes-application]
        return KubernetesUtils.getStartCommandWithBashWrapper(
                Constants.KUBERNETES_JOB_MANAGER_SCRIPT_PATH + " " + deploymentTarget.getName());
    }
    
    
    @Internal
    public enum KubernetesDeploymentTarget {
        SESSION("kubernetes-session"),
        APPLICATION("kubernetes-application");
        private final String name;
        KubernetesDeploymentTarget(final String name) {
            this.name = checkNotNull(name);
        }
        public static KubernetesDeploymentTarget fromConfig(final Configuration configuration) {
            checkNotNull(configuration);
            // execution.target
            // bin/flink run [remote|local|yarn-per-job|yarn-session|kubernetes-session]
            // bin/flink run-application [yarn-application|kubernetes-application]
            final String deploymentTargetStr = configuration.get(DeploymentOptions.TARGET);
            final KubernetesDeploymentTarget deploymentTarget = getFromName(deploymentTargetStr);
            if (deploymentTarget == null) {
                throw new IllegalArgumentException(
                        "Unknown Kubernetes deployment target \""
                                + deploymentTargetStr
                                + "\"."
                                + " The available options are: "
                                + options());
            }
            return deploymentTarget;
        }
        public String getName() {
            return name;
        }
        public static boolean isValidKubernetesTarget(final String configValue) {
            return configValue != null
                    && Arrays.stream(KubernetesDeploymentTarget.values())
                            .anyMatch(
                                    kubernetesDeploymentTarget ->
                                            kubernetesDeploymentTarget.name.equalsIgnoreCase(
                                                    configValue));
        }
        private static KubernetesDeploymentTarget getFromName(final String deploymentTarget) {
            if (deploymentTarget == null) {
                return null;
            }
            if (SESSION.name.equalsIgnoreCase(deploymentTarget)) {
                return SESSION;
            } else if (APPLICATION.name.equalsIgnoreCase(deploymentTarget)) {
                return APPLICATION;
            }
            return null;
        }
        private static String options() {
            return Arrays.stream(KubernetesDeploymentTarget.values())
                    .map(KubernetesDeploymentTarget::getName)
                    .collect(Collectors.joining(","));
        }
    }
    InternalServiceDecorator
    配置内部协同资源的端口
    
    // org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator#buildAccompanyingKubernetesResources
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        if (!kubernetesJobManagerParameters.isInternalServiceEnabled()) {
            return Collections.emptyList();
        }
        final String serviceName =
                getInternalServiceName(kubernetesJobManagerParameters.getClusterId());
        /*
        apiVersion: v1
        metaData:
          name: serviceName
          labels:
            type: flink-native-kubernetes
            app: {kubernetes.cluster-id}
        spec:
          clusterIP: None
          selector:
          	{kubernetes.jobmanager.labels}
            type: flink-native-kubernetes
            app: {kubernetes.cluster-id}
            component: jobmanager
          ports:
            - port: {jobmanager.rpc.port} default 6123
              name: jobmanager-rpc
            - port: {blob.server.port} 必须指定非0端口
              name: blobserver
              
        */
        final Service headlessService =
                new ServiceBuilder()
                        .withApiVersion(Constants.API_VERSION)
                        .withNewMetadata()
                        .withName(serviceName)
                        .withLabels(kubernetesJobManagerParameters.getCommonLabels())
                        .endMetadata()
                        .withNewSpec()
                        .withClusterIP(Constants.HEADLESS_SERVICE_CLUSTER_IP)
                        .withSelector(kubernetesJobManagerParameters.getLabels())
                        .addNewPort()
                        .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
                        .withPort(kubernetesJobManagerParameters.getRPCPort())
                        .endPort()
                        .addNewPort()
                        .withName(Constants.BLOB_SERVER_PORT_NAME)
                        .withPort(kubernetesJobManagerParameters.getBlobServerPort())
                        .endPort()
                        .endSpec()
                        .build();
        // Set job manager address to namespaced service name
        final String namespace = kubernetesJobManagerParameters.getNamespace();
        kubernetesJobManagerParameters
                .getFlinkConfiguration()
                // jobmanager.rpc.address = {kubernetes.cluster-id}.{kubernetes.namespace}
                // kubernetes.namespace 默认值 default
                .setString(JobManagerOptions.ADDRESS,
                        getNamespacedInternalServiceName(serviceName, namespace));
        return Collections.singletonList(headlessService);
    }
    /** Generate name of the internal Service. */
    public static String getInternalServiceName(String clusterId) {
        return clusterId;
    }
    /** Generate namespaced name of the internal Service. */
    public static String getNamespacedInternalServiceName(String clusterId, String namespace) {
        return getInternalServiceName(clusterId) + "." + namespace;
    }
    ExternalServiceDecorator
    配置外部访问的rest服务
    
    // org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator#buildAccompanyingKubernetesResources
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        final String serviceName =
                getExternalServiceName(kubernetesJobManagerParameters.getClusterId());
        /*
        apiVersion: v1
        metaData:
          name: serviceName
          labels:
            type: flink-native-kubernetes
            app: {kubernetes.cluster-id}
          annotations:
            {kubernetes.rest-service.annotations}
        spec:
          type: {kubernetes.rest-service.exposed.type} default LoadBalancer
          clusterIP: None
          selector:
              {kubernetes.jobmanager.labels}
              type: flink-native-kubernetes
              app: {kubernetes.cluster-id}
              component: jobmanager
          ports:
            - name: rest
              port: {rest.port} default 8081
              targetPort: {rest.bind-port}
        */
        final Service externalService =
                new ServiceBuilder()
                        .withApiVersion(Constants.API_VERSION)
                        .withNewMetadata()
                        .withName(serviceName)
                        .withLabels(kubernetesJobManagerParameters.getCommonLabels())
                        .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
                        .endMetadata()
                        .withNewSpec()
                        .withType(kubernetesJobManagerParameters.getRestServiceExposedType().name())
                        .withSelector(kubernetesJobManagerParameters.getLabels())
                        .addNewPort()
                        .withName(Constants.REST_PORT_NAME)
                        .withPort(kubernetesJobManagerParameters.getRestPort())
                        .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
                        .endPort()
                        .endSpec()
                        .build();
        return Collections.singletonList(externalService);
    }
    HadoopConfMountDecorator
    环境有Hadoop配置时, 加载其core-site.xml, hdf-site.xml为configMap
    
    // org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        Volume hadoopConfVolume;
        // kubernetes.hadoop.conf.config-map.name
        final Optional<String> existingConfigMap =
                kubernetesParameters.getExistingHadoopConfigurationConfigMap();
        if (existingConfigMap.isPresent()) {
            /*
            volumes:
              - name: hadoop-config-volume
                configMap:
                  name: {kubernetes.hadoop.conf.config-map.name}
            */
            hadoopConfVolume =
                    new VolumeBuilder()
                            .withName(Constants.HADOOP_CONF_VOLUME) // hadoop-config-volume
                            .withNewConfigMap()
                            .withName(existingConfigMap.get())
                            .endConfigMap()
                            .build();
        } else {
            // Hadoop配置路径优先使用排序: [HADOOP_CONF_DIR | 2.0 HADOOP_HOME/etc/hadoop | 1.0 HADOOP_HOME/conf]
            final Optional<String> localHadoopConfigurationDirectory =
                    kubernetesParameters.getLocalHadoopConfigurationDirectory();
            if (!localHadoopConfigurationDirectory.isPresent()) {
                return flinkPod;
            }
            // 查找 core-site.xml 和 hdfs-site.xml
            final List<File> hadoopConfigurationFileItems =
                    getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
            if (hadoopConfigurationFileItems.isEmpty()) {
                LOG.warn(
                        "Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap."
                        localHadoopConfigurationDirectory.get());
                return flinkPod;
            }
            final List<KeyToPath> keyToPaths =
                    hadoopConfigurationFileItems.stream()
                            .map(
                                    file ->
                                            new KeyToPathBuilder()
                                                    .withKey(file.getName())
                                                    .withPath(file.getName())
                                                    .build())
                            .collect(Collectors.toList());
            /*
            volumes:
              - name: hadoop-config-volume
                configMap:
                  name: hadoop-config-{kubernetes.cluster-id}
                  items:
                    - key: core-site.xml
                      path: core-site.xml
                    - key: hdfs-site.xml
                      path: hdfs-site.xml
            */
            hadoopConfVolume =
                    new VolumeBuilder()
                            .withName(Constants.HADOOP_CONF_VOLUME)
                            .withNewConfigMap()
                            .withName(
                                    getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
                            .withItems(keyToPaths)
                            .endConfigMap()
                            .build();
        }
        /*
        pod:
          spec:
            volumes: {以上组装的 volumes}
        */
        final Pod podWithHadoopConf =
                new PodBuilder(flinkPod.getPodWithoutMainContainer())
                        .editOrNewSpec()
                        .addNewVolumeLike(hadoopConfVolume)
                        .endVolume()
                        .endSpec()
                        .build();
        /*
        containers:
          volumeMounts:
            - name: hadoop-config-volume
              mountPath: /opt/hadoop/conf
          env:
            name: HADOOP_CONF_DIR
            value: /opt/hadoop/conf
        */
        final Container containerWithHadoopConf =
                new ContainerBuilder(flinkPod.getMainContainer())
                        .addNewVolumeMount()
                        .withName(Constants.HADOOP_CONF_VOLUME)
                        .withMountPath(Constants.HADOOP_CONF_DIR_IN_POD)
                        .endVolumeMount()
                        .addNewEnv()
                        .withName(Constants.ENV_HADOOP_CONF_DIR)
                        .withValue(Constants.HADOOP_CONF_DIR_IN_POD)
                        .endEnv()
                        .build();
        return new FlinkPod.Builder(flinkPod)
                .withPod(podWithHadoopConf)
                .withMainContainer(containerWithHadoopConf)
                .build();
    }
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        if (kubernetesParameters.getExistingHadoopConfigurationConfigMap().isPresent()) {
            return Collections.emptyList();
        }
        final Optional<String> localHadoopConfigurationDirectory =
                kubernetesParameters.getLocalHadoopConfigurationDirectory();
        if (!localHadoopConfigurationDirectory.isPresent()) {
            return Collections.emptyList();
        }
        final List<File> hadoopConfigurationFileItems =
                getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
        if (hadoopConfigurationFileItems.isEmpty()) {
            LOG.warn(
                    "Found 0 files in directory {}, skip to create the Hadoop Configuration ConfigMap.",
                    localHadoopConfigurationDirectory.get());
            return Collections.emptyList();
        }
        final Map<String, String> data = new HashMap<>();
        for (File file : hadoopConfigurationFileItems) {
            data.put(file.getName(), FileUtils.readFileUtf8(file));
        }
        /*
        ConfigMap:
          apiVersion: v1
          metadata:
            name: hadoop-config-{kubernetes.cluster-id}
            labels:
              type: flink-native-kubernetes
              app: {kubernetes.cluster-id}
          data: {data -> core-site.xml,hdfs-site.xml}
        */
        final ConfigMap hadoopConfigMap =
                new ConfigMapBuilder()
                        .withApiVersion(Constants.API_VERSION)
                        .withNewMetadata()
                        .withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
                        .withLabels(kubernetesParameters.getCommonLabels())
                        .endMetadata()
                        .addToData(data)
                        .build();
        return Collections.singletonList(hadoopConfigMap);
    }
    private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
        final List<String> expectedFileNames = new ArrayList<>();
        expectedFileNames.add("core-site.xml");
        expectedFileNames.add("hdfs-site.xml");
        final File directory = new File(localHadoopConfigurationDirectory);
        if (directory.exists() && directory.isDirectory()) {
            return Arrays.stream(directory.listFiles())
                    .filter(
                            file ->
                                    file.isFile()
                                            && expectedFileNames.stream()
                                                    .anyMatch(name -> file.getName().equals(name)))
                    .collect(Collectors.toList());
        } else {
            return Collections.emptyList();
        }
    }
    public static String getHadoopConfConfigMapName(String clusterId) {
        return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
    }
    KerberosMountDecorator
    挂载配置文件
    	keytab -> /opt/kerberos/kerberos-keytab/{keytab fileName}
    	krb5.conf -> /etc/krb5.conf
    添加资源:
    	Secret -> keytab
    	ConfigMap -> krb5.conf
    
    // org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        PodBuilder podBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());
        ContainerBuilder containerBuilder = new ContainerBuilder(flinkPod.getMainContainer());
        // security.kerberos.login.keytab(deprecatedKey: security.keytab)
        // security.kerberos.login.principal(deprecatedKey: security.principal)
        if (!StringUtils.isNullOrWhitespaceOnly(securityConfig.getKeytab())
                && !StringUtils.isNullOrWhitespaceOnly(securityConfig.getPrincipal())) {
            /*
            volumes:
              - name: kerberos-keytab-volume
                secret:
                  secretName: kerberos-keytab-{kubernetes.cluster-id}
            */
            podBuilder =
                    podBuilder
                            .editOrNewSpec()
                            .addNewVolume()
                            .withName(Constants.KERBEROS_KEYTAB_VOLUME)
                            .withNewSecret()
                            .withSecretName(
                                    getKerberosKeytabSecretName(
                                            kubernetesParameters.getClusterId()))
                            .endSecret()
                            .endVolume()
                            .endSpec();
            /*
            container:
              spec:
                volumeMounts:
                  - name: kerberos-keytab-volume
                    mountPath: /opt/kerberos/kerberos-keytab
            */
            containerBuilder =
                    containerBuilder
                            .addNewVolumeMount()
                            .withName(Constants.KERBEROS_KEYTAB_VOLUME)
                            .withMountPath(Constants.KERBEROS_KEYTAB_MOUNT_POINT)
                            .endVolumeMount();
        }
        // security.kerberos.krb5-conf.path
        if (!StringUtils.isNullOrWhitespaceOnly(
                kubernetesParameters
                        .getFlinkConfiguration()
                        .get(SecurityOptions.KERBEROS_KRB5_PATH))) {
            final File krb5Conf =
                    new File(
                            kubernetesParameters
                                    .getFlinkConfiguration()
                                    .get(SecurityOptions.KERBEROS_KRB5_PATH));
            /*
            volumes:
              - name: kerberos-krb5conf-volume
                configMap:
                  - name: kerberos-krb5conf-{kubernetes.cluster-id}
                    items:
                      - key: {krb5Conf fileName}
                        path: {krb5Conf fileName}
            */
            podBuilder =
                    podBuilder
                            .editOrNewSpec()
                            .addNewVolume()
                            .withName(Constants.KERBEROS_KRB5CONF_VOLUME)
                            .withNewConfigMap()
                            .withName(
                                    getKerberosKrb5confConfigMapName(
                                            kubernetesParameters.getClusterId()))
                            .withItems(
                                    new KeyToPathBuilder()
                                            .withKey(krb5Conf.getName())
                                            .withPath(krb5Conf.getName())
                                            .build())
                            .endConfigMap()
                            .endVolume()
                            .endSpec();
            /*
            containers:
                volumeMounts:
                  - name: kerberos-keytab-volume
                    mountPath: /etc/krb5.conf
                    subPath: krb5.conf
            */
            containerBuilder =
                    containerBuilder
                            .addNewVolumeMount()
                            .withName(Constants.KERBEROS_KRB5CONF_VOLUME)
                            .withMountPath(Constants.KERBEROS_KRB5CONF_MOUNT_DIR + "/krb5.conf")
                            .withSubPath("krb5.conf")
                            .endVolumeMount();
        }
        return new FlinkPod(podBuilder.build(), containerBuilder.build());
    }
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        final List<HasMetadata> resources = new ArrayList<>();
        if (!StringUtils.isNullOrWhitespaceOnly(securityConfig.getKeytab())
                && !StringUtils.isNullOrWhitespaceOnly(securityConfig.getPrincipal())) {
            final File keytab = new File(securityConfig.getKeytab());
            if (!keytab.exists()) {
                LOG.warn(
                        "Could not found the kerberos keytab file in {}.",
                        keytab.getAbsolutePath());
            } else {
                /*
                Secret:
                  metadata:
                    name: kerberos-keytab-{kubernetes.cluster-id}
                  data:
                    {keytab fileName}: {file}
                */
                resources.add(
                        new SecretBuilder()
                                .withNewMetadata()
                                .withName(
                                        getKerberosKeytabSecretName(
                                                kubernetesParameters.getClusterId()))
                                .endMetadata()
                                .addToData(
                                        keytab.getName(),
                                        Base64.getEncoder()
                                                .encodeToString(Files.toByteArray(keytab)))
                                .build());
                // Set keytab path in the container. One should make sure this decorator is
                // triggered before FlinkConfMountDecorator.
                // security.kerberos.login.keytab = /opt/kerberos/kerberos-keytab/{keytab fileName}
                kubernetesParameters
                        .getFlinkConfiguration()
                        .set(
                                SecurityOptions.KERBEROS_LOGIN_KEYTAB,
                                String.format(
                                        "%s/%s",
                                        Constants.KERBEROS_KEYTAB_MOUNT_POINT, keytab.getName()));
            }
        }
        // security.kerberos.krb5-conf.path
        if (!StringUtils.isNullOrWhitespaceOnly(
                kubernetesParameters
                        .getFlinkConfiguration()
                        .get(SecurityOptions.KERBEROS_KRB5_PATH))) {
            final File krb5Conf =
                    new File(
                            kubernetesParameters
                                    .getFlinkConfiguration()
                                    .get(SecurityOptions.KERBEROS_KRB5_PATH));
            if (!krb5Conf.exists()) {
                LOG.warn(
                        "Could not found the kerberos config file in {}.",
                        krb5Conf.getAbsolutePath());
            } else {
                resources.add(
                        /*
                        configMap:
                          metadata
                            name: kerberos-krb5conf-{kubernetes.cluster-id}
                          data:
                            {krb5Conf fileName}: {file}
                        */
                        new ConfigMapBuilder()
                                .withNewMetadata()
                                .withName(
                                        getKerberosKrb5confConfigMapName(
                                                kubernetesParameters.getClusterId()))
                                .endMetadata()
                                .addToData(
                                        krb5Conf.getName(),
                                        Files.toString(krb5Conf, StandardCharsets.UTF_8))
                                .build());
            }
        }
        return resources;
    }
    public static String getKerberosKeytabSecretName(String clusterId) {
        return Constants.KERBEROS_KEYTAB_SECRET_PREFIX + clusterId;
    }
    public static String getKerberosKrb5confConfigMapName(String clusterID) {
        return Constants.KERBEROS_KRB5CONF_CONFIG_MAP_PREFIX + clusterID;
    }
    FlinkConfMountDecorator
    添加配置文件: logback-console.xml, log4j-console.properties, flink-conf.yaml
    
    // org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
        /*
        containers:
          volumeMounts:
            - name: flink-config-volume
              mountPath: {kubernetes.flink.conf.dir} default: /opt/flink/conf
        */
        final Container mountedMainContainer =
                new ContainerBuilder(flinkPod.getMainContainer())
                        .addNewVolumeMount()
                        .withName(FLINK_CONF_VOLUME)
                        .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
                        .endVolumeMount()
                        .build();
        return new FlinkPod.Builder(flinkPod)
                .withPod(mountedPod)
                .withMainContainer(mountedMainContainer)
                .build();
    }
    private Pod decoratePod(Pod pod) {
        final List<KeyToPath> keyToPaths =
                getLocalLogConfFiles().stream()
                        .map(
                                file ->
                                        new KeyToPathBuilder()
                                                .withKey(file.getName())
                                                .withPath(file.getName())
                                                .build())
                        .collect(Collectors.toList());
        keyToPaths.add(
                new KeyToPathBuilder()
                        .withKey(FLINK_CONF_FILENAME) // flink-conf.yaml
                        .withPath(FLINK_CONF_FILENAME)
                        .build());
        final Volume flinkConfVolume =
                new VolumeBuilder()
                        .withName(FLINK_CONF_VOLUME) // flink-config-volume
                        .withNewConfigMap()
                        .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
                        .withItems(keyToPaths)
                        .endConfigMap()
                        .build();
        /*
        volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config-{kubernetes.cluster-id}
              items:
                - key: logback-console.xml
                  path: logback-console.xml
                - key: log4j-console.properties
                  path: log4j-console.properties
                - key: flink-conf.yaml
                  path: flink-conf.yaml
        */
        return new PodBuilder(pod)
                .editSpec()
                .addNewVolumeLike(flinkConfVolume)
                .endVolume()
                .endSpec()
                .build();
    }
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        final String clusterId = kubernetesComponentConf.getClusterId();
        final Map<String, String> data = new HashMap<>();
        final List<File> localLogFiles = getLocalLogConfFiles();
        for (File file : localLogFiles) {
            data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
        }
        final Map<String, String> propertiesMap =
                getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
        // flink-conf.yaml
        data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap));
        /*
        configMap:
          version: v1
          metadata:
            name: flink-config-{kubernetes.cluster-id}
            labels:
              type: flink-native-kubernetes
              app: {kubernetes.cluster-id}
          data:
            logback-console.xml {file}
            log4j-console.properties {file}
            flink-conf.yaml {file}
        */
        final ConfigMap flinkConfConfigMap =
                new ConfigMapBuilder()
                        .withApiVersion(Constants.API_VERSION)
                        .withNewMetadata()
                        .withName(getFlinkConfConfigMapName(clusterId))
                        .withLabels(kubernetesComponentConf.getCommonLabels())
                        .endMetadata()
                        .addToData(data)
                        .build();
        return Collections.singletonList(flinkConfConfigMap);
    }
    /** Get properties map for the cluster-side after removal of some keys. */
    private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
        final Configuration clusterSideConfig = flinkConfig.clone();
        // Remove some configuration options that should not be taken to cluster side.
        // kubernetes.config.file
        clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
        // $internal.deployment.config-dir
        clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
        return clusterSideConfig.toMap();
    }
    @VisibleForTesting
    String getFlinkConfData(Map<String, String> propertiesMap) throws IOException {
        try (StringWriter sw = new StringWriter();
                PrintWriter out = new PrintWriter(sw)) {
            propertiesMap.forEach(
                    (k, v) -> {
                        out.print(k);
                        out.print(": ");
                        out.println(v);
                    });
            return sw.toString();
        }
    }
    private List<File> getLocalLogConfFiles() {
        // $internal.deployment.config-dir
        // or kubernetes.flink.conf.dir(default: /opt/flink/conf)
        final String confDir = kubernetesComponentConf.getConfigDirectory();
        // logback-console.xml
        final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
        // log4j-console.properties
        final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
        List<File> localLogConfFiles = new ArrayList<>();
        if (logbackFile.exists()) {
            localLogConfFiles.add(logbackFile);
        }
        if (log4jFile.exists()) {
            localLogConfFiles.add(log4jFile);
        }
        return localLogConfFiles;
    }
    @VisibleForTesting
    public static String getFlinkConfConfigMapName(String clusterId) {
        return CONFIG_MAP_PREFIX + clusterId;
    }
    PodTemplateMountDecorator
    存在tempalate文件 taskmanager-pod-template.yaml 时则添加挂载
    
    // org.apache.flink.kubernetes.kubeclient.decorators.PodTemplateMountDecorator#decorateFlinkPod
    @Override
    public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
        if (!getTaskManagerPodTemplateFile().isPresent()) {
            return flinkPod;
        }
        final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
        /*
        containers:
          volumeMounts:
            - name: pod-template-volume
              mountPath: /opt/flink/pod-template
        */
        final Container mountedMainContainer =
                new ContainerBuilder(flinkPod.getMainContainer())
                        .addNewVolumeMount()
                        .withName(POD_TEMPLATE_VOLUME)
                        .withMountPath(POD_TEMPLATE_DIR_IN_POD)
                        .endVolumeMount()
                        .build();
        return new FlinkPod.Builder(flinkPod)
                .withPod(mountedPod)
                .withMainContainer(mountedMainContainer)
                .build();
    }
    private Pod decoratePod(Pod pod) {
        final List<KeyToPath> keyToPaths = new ArrayList<>();
        keyToPaths.add(
                new KeyToPathBuilder()
                        .withKey(TASK_MANAGER_POD_TEMPLATE_FILE_NAME) //pod-template-volume
                        .withPath(TASK_MANAGER_POD_TEMPLATE_FILE_NAME)
                        .build());
        final Volume podTemplateVolume =
                new VolumeBuilder()
                        .withName(POD_TEMPLATE_VOLUME)
                        .withNewConfigMap()
                        .withName(podTemplateConfigMapName) // pod-template-{kubernetes.cluster-id}
                        .withItems(keyToPaths)
                        .endConfigMap()
                        .build();
        /*
        spec:
          volumes:
            - name: pod-template-volume
              configMap:
                name: pod-template-{kubernetes.cluster-id}
                items:
                  - key: taskmanager-pod-template.yaml
                    path: taskmanager-pod-template.yaml
        */
        return new PodBuilder(pod)
                .editSpec()
                .addNewVolumeLike(podTemplateVolume)
                .endVolume()
                .endSpec()
                .build();
    }
    @Override
    public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
        /*
        configMap:
          version: v1
          metadata:
            name: pod-template-{kubernetes.cluster-id}
            labels:
              type: flink-native-kubernetes
              app: {kubernetes.cluster-id}
          data:
            taskmanager-pod-template.yaml {file}
        */
        return getTaskManagerPodTemplateFile()
                .map(
                        FunctionUtils.uncheckedFunction(
                                file -> {
                                    final Map<String, String> data = new HashMap<>();
                                    data.put(
                                            TASK_MANAGER_POD_TEMPLATE_FILE_NAME,
                                            Files.toString(file, StandardCharsets.UTF_8));
                                    final HasMetadata flinkConfConfigMap =
                                            new ConfigMapBuilder()
                                                    .withApiVersion(Constants.API_VERSION)
                                                    .withNewMetadata()
                                                    .withName(podTemplateConfigMapName)
                                                    .withLabels(
                                                            kubernetesComponentConf
                                                                    .getCommonLabels())
                                                    .endMetadata()
                                                    .addToData(data)
                                                    .build();
                                    return Collections.singletonList(flinkConfConfigMap);
                                }))
                .orElse(Collections.emptyList());
    }
    private Optional<File> getTaskManagerPodTemplateFile() {
        return kubernetesComponentConf
                .getFlinkConfiguration()
                // kubernetes.pod-template-file.taskmanager
                .getOptional(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)
                .map(
                        file -> {
                            final File podTemplateFile = new File(file);
                            if (!podTemplateFile.exists()) {
                                throw new FlinkRuntimeException(
                                        String.format(
                                                "Pod template file %s does not exist.", file));
                            }
                            return podTemplateFile;
                        });
    }
    KubernetesJobManagerFactory
    创建Deployment
    
    // org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory#createJobManagerDeployment
    private static Deployment createJobManagerDeployment(
            FlinkPod flinkPod, KubernetesJobManagerParameters kubernetesJobManagerParameters) {
        final Container resolvedMainContainer = flinkPod.getMainContainer();
        final Pod resolvedPod =
                new PodBuilder(flinkPod.getPodWithoutMainContainer())
                        .editOrNewSpec()
                        .addToContainers(resolvedMainContainer)
                        .endSpec()
                        .build();
        final Map<String, String> labels = resolvedPod.getMetadata().getLabels();
        return new DeploymentBuilder()
                .withApiVersion(Constants.APPS_API_VERSION)
                .editOrNewMetadata()
                .withName(
                        KubernetesUtils.getDeploymentName(
                                kubernetesJobManagerParameters.getClusterId()))
                .withLabels(kubernetesJobManagerParameters.getLabels())
                .withOwnerReferences(
                        kubernetesJobManagerParameters.getOwnerReference().stream()
                                .map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
                                .collect(Collectors.toList()))
                .endMetadata()
                .editOrNewSpec()
                .withReplicas(1)
                .editOrNewTemplate()
                .withMetadata(resolvedPod.getMetadata())
                .withSpec(resolvedPod.getSpec())
                .endTemplate()
                .editOrNewSelector()
                .addToMatchLabels(labels)
                .endSelector()
                .endSpec()
                .build();
    }
    deployment
    Flink K8s 装饰后的生成的 deployment
    
    ## org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory#buildKubernetesJobManagerSpecification
    
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: {kubernetes.cluster-id}
      labels:
        {kubernetes.jobmanager.labels}
        type: flink-native-kubernetes
        app: {kubernetes.cluster-id}
        component: jobmanager
      ownerReferences:
        {kubernetes.jobmanager.owner.reference}
    spec:
      replicas: 1
      selector:
        matchLabels:
          {kubernetes.jobmanager.labels}
          type: flink-native-kubernetes
          app: {kubernetes.cluster-id}
          component: jobmanager
      template:
        metadata:
    	  labels:
    		{kubernetes.jobmanager.labels}
    		type: flink-native-kubernetes
    		app: {kubernetes.cluster-id}
    		component: jobmanager
    	  annotations:
    		{kubernetes.jobmanager.annotations}
    	spec:
          serviceAccount: {kubernetes.jobmanager.service-account} defualt
          serviceAccountName: {kubernetes.jobmanager.service-account} defualt
          imagePullSecrets:
         	{kubernetes.container.image.pull-secrets}
          nodeSelector:
            {kubernetes.jobmanager.node-selector}
          tolerations:
            {kubernetes.jobmanager.tolerations}
          containers:
            - name: flink-main-container
         	  image: {kubernetes.container.image} apache/flink:{tag}
              imagePullPolicy: {kubernetes.container.image.pull-policy} IfNotPresent
              command:
                - {kubernetes.entry.path} /docker-entrypoint.sh
              args:
                - bash -c kubernetes-jobmanager.sh [kubernetes-session|kubernetes-application]
              resources:
                - requests:
                    {memory}: {jobmanager.memory.process.size}
                    {cpu}: {kubernetes.jobmanager.cpu} 1.0
                - limits:
                    {memory}: {jobmanager.memory.process.size}
                    {cpu}: {kubernetes.jobmanager.cpu} 1.0
              ports:
                - name: rest
                  containerPort: {rest.port} 8081
                - name: jobmanager-rpc
                  containerPort: {jobmanager.rpc.port} 6123
                - name: blobserver
                  containerPort: {blob.server.port} 8081
         	  env:
         	    - name: containerized.master.env.*
         		  value: {containerized.master.env.*}
         	    - name: kubernetes.env.secretKeyRef
         		  value: {kubernetes.env.secretKeyRef}
         	    - name: _POD_IP_ADDRESS
         		  valueFrom:
         		    fieldRef:
         			  apiVersion: v1
         			  fieldPath: {status.podIP}
         	    - name: HADOOP_CONF_DIR
                  value: /opt/hadoop/conf
              volumeMounts:
                - name: kubernetes.secrets-volume
                  mountPath: {kubernetes.secrets}
                - name: hadoop-config-volume
                  configMap:
                    name: hadoop-config-{kubernetes.cluster-id}
                    items:
                      - key: core-site.xml
                        path: core-site.xml
                      - key: hdfs-site.xml
                        path: hdfs-site.xml
                - name: hadoop-config-volume
                  mountPath: /opt/hadoop/conf
                - name: kerberos-keytab-volume
                  mountPath: /opt/kerberos/kerberos-keytab
                - name: kerberos-keytab-volume
                  mountPath: /etc/krb5.conf
                  subPath: krb5.conf
                - name: flink-config-volume
                  mountPath: {kubernetes.flink.conf.dir} /opt/flink/conf
                - name: pod-template-volume
                  mountPath: /opt/flink/pod-template
          volumes:
            - name: kubernetes.secrets-volume
              secret:
                secretName: {kubernetes.secrets}
            - name: hadoop-config-volume
              configMap:
                name: {kubernetes.hadoop.conf.config-map.name}
            - name: kerberos-keytab-volume
              secret:
                secretName: kerberos-keytab-{kubernetes.cluster-id}
            - name: kerberos-krb5conf-volume
              configMap:
                - name: kerberos-krb5conf-{kubernetes.cluster-id}
                  items:
                    - key: {krb5Conf fileName}
                      path: {krb5Conf fileName}
            - name: flink-config-volume
              configMap:
                name: flink-config-{kubernetes.cluster-id}
                items:
                  - key: logback-console.xml
                    path: logback-console.xml
                  - key: log4j-console.properties
                    path: log4j-console.properties
                  - key: flink-conf.yaml
                    path: flink-conf.yaml
            - name: pod-template-volume
              configMap:
                name: pod-template-{kubernetes.cluster-id}
                items:
                  - key: taskmanager-pod-template.yaml
                    path: taskmanager-pod-template.yaml
    createJobManagerComponent
    // org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java:104 Debug可查看K8s部署时最终的Deployment
    
    // org/apache/flink/kubernetes/KubernetesClusterDescriptor.java:274
    // org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient#createJobManagerComponent
    
    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        final Deployment deployment = kubernetesJMSpec.getDeployment();
        final List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
    
        // create Deployment
        LOG.debug(
                "Start to create deployment with spec {}{}",
                System.lineSeparator(),
                KubernetesUtils.tryToGetPrettyPrintYaml(deployment));
        final Deployment createdDeployment =
                this.internalClient.apps().deployments().create(deployment);
    
        // Note that we should use the uid of the created Deployment for the OwnerReference.
        setOwnerReference(createdDeployment, accompanyingResources);
    
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }
    
    /*
    metadata:
      ownerReferences:
        name: {kubernetes.cluster-id}
        apiVersion: apps/v1
        uid: {deployment.getMetadata().getUid()}
        kind: Deployment
        controller: true
        blockOwnerDeletion: true
    */
    private void setOwnerReference(Deployment deployment, List<HasMetadata> resources) {
        final OwnerReference deploymentOwnerReference =
                new OwnerReferenceBuilder()
                        .withName(deployment.getMetadata().getName())
                        .withApiVersion(deployment.getApiVersion())
                        .withUid(deployment.getMetadata().getUid())
                        .withKind(deployment.getKind())
                        .withController(true)
                        .withBlockOwnerDeletion(true)
                        .build();
        resources.forEach(
                resource ->
                        resource.getMetadata()
                                .setOwnerReferences(
                                        Collections.singletonList(deploymentOwnerReference)));
    }

    K8s 创建 Deployment 和 协同服务, 完成部署.

    一种简洁的K8s部署参考

    K8s两种部署模式

  • 相关阅读:
    Tarjan算法求双连通分量
    Tarjan
    前端技术实现js图片水印
    记录一下ionic canvas图片,还有canvas里面的图片跨域的问题
    ionic cordova screenshot 使用和操作
    关于ionic2 更新到ionic3 后组件不能用的解决方案
    背景图处理,这是个好东西记录一下
    radio样式的写法,单选和多选如何快速的改变默认样式,纯CSS,
    ionic使用cordova插件中的Screenshot截图分享功能
    ionic中执行pop返回上一个页面,还需要执行操作
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/15670340.html
Copyright © 2011-2022 走看看