zoukankan      html  css  js  c++  java
  • flink on yarn部分源码解析

    转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html

    flink任务的deploy形式有很多种选择,常见的有standalone,on yarn , Meos , Kubernetes等方式,目前公司内部统一采用flink on yarn的 single job模式(每个flink job 单独在yarn上声明一个flink集群),本文分析的是flink1.5.1版本源码使用legacy 模式提交yarn single job到yarn集群的部分源码。

    典型的flink提交single job命令格式如下: ./flink run  -m yarn-cluster -d  -yst -yqu flinkqu -yst  -yn 4 -ys 2 -c flinkdemoclass  flinkdemo.jar  args1 args2 ... 

    flink脚本的入口类为org.apache.flink.client.cli.CliFrontend

    在CliFrontend的main函数中首先通过loadCustomCommandLines方法加载了提交yarn任务初始化一个重要工具类

    org.apache.flink.yarn.cli.FlinkYarnSessionCli
        public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
            List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
    
            //    Command line interface of the YARN session, with a special initialization here
            //    to prefix all options with y/yarn.
            //    Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
            //          active CustomCommandLine in order and DefaultCLI isActive always return true.
            final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
            try {
                customCommandLines.add(
                    loadCustomCommandLine(flinkYarnSessionCLI,
                        configuration,
                        configurationDirectory,
                        "y",
                        "yarn"));
            } catch (NoClassDefFoundError | Exception e) {
                LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
            }
    
            if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
                customCommandLines.add(new DefaultCLI(configuration));
            } else {
                customCommandLines.add(new LegacyCLI(configuration));
            }
    
            return customCommandLines;
        }

    根据启动参数,CliFrontend开始运行方法run()->runProgram(),runProgram内部与yarn相关的一个重点方法为

    client = clusterDescriptor.deploySessionCluster(clusterSpecification);

    上文中的clusterDescriptor就是前面的FlinkYarnSessionCli执行createClusterDescriptor()方法后产生的集群属性描述对象,在本模式中对应的具体类是org.apache.flink.yarn.LegacyYarnClusterDescriptor,父类为AbstractYarnClusterDescriptor

    deploySessionCluster内部进一步调用deployInternal来向yarn集群提交一个flink集群。

    protected ClusterClient<ApplicationId> deployInternal(
                ClusterSpecification clusterSpecification,
                String applicationName,
                String yarnClusterEntrypoint,
                @Nullable JobGraph jobGraph,
                boolean detached) throws Exception {
    
            // ------------------ Check if configuration is valid --------------------
            validateClusterSpecification(clusterSpecification);
    
            if (UserGroupInformation.isSecurityEnabled()) {
                // note: UGI::hasKerberosCredentials inaccurately reports false
                // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
                // so we check only in ticket cache scenario.
                boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
    
                UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
                if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
                    && useTicketCache && !loginUser.hasKerberosCredentials()) {
                    LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
                    throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
                        "does not have Kerberos credentials");
                }
            }
    
            isReadyForDeployment(clusterSpecification);
    
            // ------------------ Check if the specified queue exists --------------------
    
            checkYarnQueues(yarnClient);
    
            // ------------------ Add dynamic properties to local flinkConfiguraton ------
            Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
            for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
                flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
            }
    
            // ------------------ Check if the YARN ClusterClient has the requested resources --------------
    
            // Create application via yarnClient
            final YarnClientApplication yarnApplication = yarnClient.createApplication();
            final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    
            Resource maxRes = appResponse.getMaximumResourceCapability();
    
            final ClusterResourceDescription freeClusterMem;
            try {
                freeClusterMem = getCurrentFreeClusterResources(yarnClient);
            } catch (YarnException | IOException e) {
                failSessionDuringDeployment(yarnClient, yarnApplication);
                throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
            }
    
            final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
    
            final ClusterSpecification validClusterSpecification;
            try {
                validClusterSpecification = validateClusterResources(
                    clusterSpecification,
                    yarnMinAllocationMB,
                    maxRes,
                    freeClusterMem);
            } catch (YarnDeploymentException yde) {
                failSessionDuringDeployment(yarnClient, yarnApplication);
                throw yde;
            }
    
            LOG.info("Cluster specification: {}", validClusterSpecification);
    
            final ClusterEntrypoint.ExecutionMode executionMode = detached ?
                ClusterEntrypoint.ExecutionMode.DETACHED
                : ClusterEntrypoint.ExecutionMode.NORMAL;
    
            flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
    
            ApplicationReport report = startAppMaster(
                flinkConfiguration,
                applicationName,
                yarnClusterEntrypoint,
                jobGraph,
                yarnClient,
                yarnApplication,
                clusterSpecification);
    
            String host = report.getHost();
            int port = report.getRpcPort();
    
            // Correctly initialize the Flink config
            flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
            flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
    
            flinkConfiguration.setString(RestOptions.ADDRESS, host);
            flinkConfiguration.setInteger(RestOptions.PORT, port);
    
            // the Flink cluster is deployed in YARN. Represent cluster
            return createYarnClusterClient(
                this,
                clusterSpecification.getNumberTaskManagers(),
                clusterSpecification.getSlotsPerTaskManager(),
                report,
                flinkConfiguration,
                true);
        }

     deployInternal方法开头对yarn集群的可用内存,queue等进行检查后申请了一个application,并调用startAppMaster声明了AM的启动类:YarnApplicationMasterRunner

    public ApplicationReport startAppMaster(
                Configuration configuration,
                String applicationName,
                String yarnClusterEntrypoint,
                JobGraph jobGraph,
                YarnClient yarnClient,
                YarnClientApplication yarnApplication,
                ClusterSpecification clusterSpecification) throws Exception {
    .....
    
            setApplicationTags(appContext);
    
            // add a hook to clean up in case deployment fails
            Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
            Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
            LOG.info("Submitting application master " + appId);
            yarnClient.submitApplication(appContext);
    
            LOG.info("Waiting for the cluster to be allocated");
            final long startTime = System.currentTimeMillis();
            ApplicationReport report;
    
    
    }

     YarnApplicationMasterRunner会在yarn集群上作为appmaster与resourcemanager通信申请对应的Taskmanagercontainer服务,启动jobmanager服务和webui服务等

        protected int runApplicationMaster(Configuration config) {
    ......
    ......
        webMonitor = BootstrapTools.startWebMonitorIfConfigured(
                    config,
                    highAvailabilityServices,
                    new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
                    new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
                    webMonitorTimeout,
                    new ScheduledExecutorServiceAdapter(futureExecutor),
                    LOG);
    
                metricRegistry = new MetricRegistryImpl(
                    MetricRegistryConfiguration.fromConfiguration(config));
    
                metricRegistry.startQueryService(actorSystem, null);
    
                // 2: the JobManager
                LOG.debug("Starting JobManager actor");
    
                // we start the JobManager with its standard name
                ActorRef jobManager = JobManager.startJobManagerActors(
                    config,
                    actorSystem,
                    futureExecutor,
                    ioExecutor,
                    highAvailabilityServices,
                    metricRegistry,
                    webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()),
                    new Some<>(JobMaster.JOB_MANAGER_NAME),
                    Option.<String>empty(),
                    getJobManagerClass(),
                    getArchivistClass())._1();
    
                final String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress();
    
                // 3: Flink's Yarn ResourceManager
                LOG.debug("Starting YARN Flink Resource Manager");
    
                Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
                    getResourceManagerClass(),
                    config,
                    yarnConfig,
                    highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                    appMasterHostname,
                    webMonitorURL,
                    taskManagerParameters,
                    taskManagerContext,
                    numInitialTaskManagers,
                    LOG);
    
                ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);

    另一方面,flink客户端在提交完集群后从runprogram()方法进入executeProgram();

        protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
            logAndSysout("Starting execution of program");
    
            final JobSubmissionResult result = client.run(program, parallelism);
    
            if (null == result) {
                throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
                    "ExecutionEnvironment.execute()");
            }
    
            if (result.isJobExecutionResult()) {
                logAndSysout("Program execution finished");
                JobExecutionResult execResult = result.getJobExecutionResult();
                System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
                System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
                Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
                if (accumulatorsResult.size() > 0) {
                    System.out.println("Accumulator Results: ");
                    System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
                }
            } else {
                logAndSysout("Job has been submitted with JobID " + result.getJobID());
            }
        }

    代码从ClusterClient.run()->prog.invokeInteractiveModeForExecution()开始真正进入用户flink job的main方法。

    main方法中,代码最后的env.execute() 会把生成job的执行plan并返回对应的DetachedEnvironment对象。

    方法调用链路为DetachedEnvironment.finalizeExecute()->ClusterClient.run()->YarnClusterClient.submitJob->ClusterClient.runDetached();

        /**
         * Submits a JobGraph detached.
         * @param jobGraph The JobGraph
         * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
         * @return JobSubmissionResult
         * @throws ProgramInvocationException
         */
        public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
    
            waitForClusterToBeReady();
    
            final ActorGateway jobManagerGateway;
            try {
                jobManagerGateway = getJobManagerGateway();
            } catch (Exception e) {
                throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
            }
    
            try {
                logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
                JobClient.submitJobDetached(
                    new AkkaJobManagerGateway(jobManagerGateway),
                    flinkConfig,
                    jobGraph,
                    Time.milliseconds(timeout.toMillis()),
                    classLoader);
                return new JobSubmissionResult(jobGraph.getJobID());
            } catch (JobExecutionException e) {
                throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
            }
        }
        @Override
        public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
            if (isDetached()) {
                if (newlyCreatedCluster) {
                    stopAfterJob(jobGraph.getJobID());
                }
                LOG.info("super.runDetached");
                return super.runDetached(jobGraph, classLoader);
            } else {
                LOG.info("super.run");
                return super.run(jobGraph, classLoader);
            }
        }

    最后,客户端连接到前文对应的jobmanager服务并把flink job grafaph提交给yarn上已经申请好的flink集群。 

    结论:flink on yarn的single job模式提交作业的逻辑为flink客户端首先申请一个yarn集群的application,等待集群成功部署后再联系jobmanager并把job提交到集群上面。这个模式的优点是每个

    flink job有一个独立的集群便于资源规划和管理,缺点是经过验证在am挂掉后yarn只能把原来的集群重启回来但是无法恢复flink jobgraph的行为,所以需要额外配置ha信息

  • 相关阅读:
    Python环境搭建
    appium的android端的环境搭建(Window)
    Unittest中常用的十四种断言方法
    Leetcode-141(判断链表是否存在环)
    Leetcode-88(归并两个有序数组)
    Leetcode-680(回文字符串)
    Leetcode-345(反转字符串中的元音字符)
    Leetcode-633 (两数平方和)
    Leetcode-167(有序数组的 Two Sum)
    判断是否为小数
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html
Copyright © 2011-2022 走看看