zoukankan      html  css  js  c++  java
  • Flink on Yarn模式启动流程源代码分析

    此文已由作者岳猛授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    Flink on yarn的启动流程可以参见前面的文章 Flink on Yarn启动流程,下面主要是从源码角度看下这个实现,可能有的地方理解有误,请给予指正,多谢。

    --> 1.命令行启动yarn session

    bin/yarn-session.sh -n 3 -jm 1024 -nm 1024 -st
    我们去看下启动脚本

      $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR" $log_setting  org.apache.flink.yarn.cli.FlinkYarnSessionCli  -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

    主要是用java -cp的方式启动主类** *org.apache.flink.yarn.cli.FlinkYarnSessionCli * , $@ 就是我们传入的哪些参数 " -n 3 -jm 1024 -nm 1024 -st" **。

    1. FlinkYarnSessionCli 的启动流程分析

    首先看下Main函数

    public static void main(String[] args) {   
    FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
       System.exit(cli.run(args));
    }

    主要是构造FlinkYarnSessionCli,然后执行其run方法,这里主要介绍主要流程的代码。

    public int run(String[] args)

    • 1.解析命令行参数

      cmd = parser.parse(options, args)
    • 2.根据命令行参数决定执行那种模式。

       # 第一种,判断命令是否包含 -q



    ** 如: **


    # 第二种,判断是否有-id参数


    这里我们看下交互模式是啥样的,共有两个可选项,help和stop,如果我们敲入stop,则应用对应的所有进程会退出。


    # 第三种,为正常模式




    ** 这里主要为构造YarnClusterDescriptor,然后调用其deploy方法启动集群 ,接着将Jobmanager和web ui地址写入到out文件中去,如果采用分离模式,则等待集群启动之后yarn session自动退出,如果不是则进入交互模式,我们可以通过交互控制这个Applitcation **


    接着看下是如何构造YarnClusterDescriptor的

    ----------------- **1   creat  YarnClusterDescriptor  ** ----------------------

    直接new YarnClusterDescriptor对象,然后将依赖jar地址,配置参数如taskmanager个数,jar地址,配置文件地址,配置参数等设置到YarnClusterDescriptor对象中去,然后返回这个对象。

    ------------** 2 YarnClusterDescriptor deploy  ** -------------------------

    由于YarnClusterDescriptor没有重写depoy方法则直接调用其父类AbstractYarnClusterDescriptor的deploy方法,但是最终调用的是其deployInternal方法.

    接着看下deployInternal方法,简单的描述下流程,后续代码分析下面的github地址

    • 检查是否具备Deploy的条件,如配置文件,jar路径是否为空

    • 获取yarn的client,用户和RM进行通信

    • 增加动态的配置属性到配置conf对象中去,解析配置conf对象为kv对

    • 获取HDFS FileSyetem,这里用于将本地jar及配置文件上传到HDFS,

    • 判断JobManager和TaskManager申请的资源是否满足yarn分配单个container的最小分配,如果小于则将container最小分配用来初始化jobMananger和TaskMananer

    • 通过yarn client创建Application,返回GetNewApplicationResponse对象用于跟RM进行RPC通信。

    • 通过GetNewApplicationResponse对象获取RM能够为这个应用分配的最大资源,如果最大资源不能够满足jobManagerMemoryMb和taskManagerMemoryMb则报错,计算总的jobmanager和所有taskmanager总共需要的资源(jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount),计算RM中总共的空闲资源,判断空闲资源是否满足前面计算需要的需求,如果不满足,则可能先启动yarn session,task manager等到有资源再进行启动;先为jobManager分配一个nm,然后再在其他的nm上启动taskmanager

    • 设置启动ApplicationMaster的 lanchcontext,这里主要是设置java home,主类,jvm参数数,log文件配置。ApplicationMaster的主类 YarnApplicationMasterRunner ** YarnApplicationMasterRunner **。

    protected Class<?> getApplicationMasterClass() {   
    return YarnApplicationMasterRunner.class;
    }```
    - 设置ApplicationSubmissionContext,获取ApplicationId
    - 设置session需要的hdfs路径,然后将本地jar包及配置文件,配置文件上传到HDFS
    - 设置AM启动的token信息,设置AM启动的过程中需要从hdfs下载那些依赖的jar和配置文件,设置ApplicationMaster及Flink及其他进程的classpath,不多说
    - 设置钩子函数在deploy的时候清理上传到hdfs的文件及本地下载的依赖文件
    - *** 重点,提交Applicaiton到RM;设置这个Application的状态为NEW,然后监控这个应用,如果不是之前的NEW状态,则打印当前状态,如果Running状态则跳出这个循环,如果是其他状态,则抛出YarnDeploymentException异常,上层调用捕获处理吧,不然250ms判断一次 ***
    - depoly成功,钩子函数删除临时文件,如依赖的jar包和配置文件等,返回YarnClusterClient对象,包含了这YarnClusterDescriptor,ApplicationReport等重要的属性。
    ***
    ***deploy 成功以后进入交互模式,在runInteractiveCli里面最重要的一步是构造ApplicationClient Actor用于和JobManager Actor进行通信,但是如果发送 RegisterInfoMessageListener、UnRegisterInfoMessageListener等消息,将会由jobmanager actor将forward方法路由到flink resource manager actor去处理,此时jobmanager作为flink resource manager的代理,此时收到这两个消息的时候,由于是forward的方法,sender仍然是application client actor,所以flink manager resource actor可以直接给application client返回消息***
    ***
    > ------------ ** 3 代码展示主要流程**------
    ![](//upload-images.jianshu.io/upload_images/3249301-d22456f0939a8365.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-9c80aa18467d4e10.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-85a2f462ff96e5fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3c81e3dc9b23db0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-bf190e6a72366f0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-57eb01f090d38dd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d548d544dbd1b713.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2013feca33032c46.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-d0d8c8c1a56f28ff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-be3a228edeffad9d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)*** ---- ApplicationClient 和JobManager Actor通信代码 --***
    
    ![](//upload-images.jianshu.io/upload_images/3249301-56371ec18930ba4f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-ed28091d44dc3906.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1a0df11e1a57941d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-913d82bf6d5825b8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6309a49886d0cc4e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-47f86bcfdee4f967.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a483d4af26931cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-4de478f435cb1356.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-27986c3659bd96cc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a27f89acbe3406de.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-f04a3da4f97a08dc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)## 2. YarnApplicationMasterRunner 启动流程分析
    *** RM首先分配一个NM的container去启动YarnApplicationMasterRunner ,接着下来我们看下是怎么做的***
    首先是进入main函数里面,构造一个YarnApplicationMasterRunner对象,直接调用其Run方法。
    > run方法主要步骤
    - 获取当前用户的UGI及远端UGI
    - 将当前用户ugi里面的token传递到远端的UGI中,用于数据和服务访问
    - 在远端的UGI里面执行runApplicationMaster启动ApplicationMaster
    ![](//upload-images.jianshu.io/upload_images/3249301-7d3ac2af1bf091f6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> runApplicationMaster主要过程,这里注释很清楚,我只捡重要的提示下
    - 1) load and parse / validate all configurations
    - 2) start the actor system,try to start the actor system, JobManager and JobManager actor system
    - 3) Generate the configuration for the TaskManagers,这里主要是JobManager的地址,taskManager注册的超时时间,slot个数,这里还有最重要的一步是构造TaskManager的ContainerLaunchContext,这个context里面包含了启动TaskManager的启动命令,***主类是YarnTaskManager***。
    -  start the actors and components in this order:  1) JobManager & Archive (in non-HA case, the leader service takes this),启动JobManagerActor,这里主类是***YarnJobManager***  2) Web Monitor (we need its port to register) 启动WEB监控页面,创建LeaderRetrievalService对象,这个主要用于启动TaskManager的时候,告诉TaskManager JobManager得akka url,用于TaskManager向JobManager进行注册。  3) Resource Master for YARN   启动YarnFlinkResourceManager Actor,这里主要用于Flink container资源的管理包括申请与释放等。  4) Process reapers for the JobManager and Resource Master
    ***这里主要介绍YarnApplicationMasterRunner 是如何通过YarnFlinkResourceManager去完成container的申请与启动TaskManager的,这里相对来说,比较复杂,我跟到Yarn的代码里才算整明白***
    
    ![YarnFlinkResourceManager的继承关系](//upload-images.jianshu.io/upload_images/3249301-cbc8215f8c356913.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)说明YarnFlinkResourceManager其实是一个actor,在runApplicationMaster方法中,通过下面的代码启动这个Actor

    Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
    getResourceManagerClass(),//YarnFlinkResourceManager
    config,
    yarnConfig,
    leaderRetriever,
    appMasterHostname,
    webMonitorURL,
    taskManagerParameters,
    taskManagerContext,
    numInitialTaskManagers,
    LOG);
    ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);//启动YarnFlinkResourceManager actor

    接着看下YarnFlinkResourceManager 的构造方法,这里主要有三个成员变量比较重要

    //在yarn 的rm端会调用该回对象的回调函数进行container申请,resourceManagerCallbackHandler里面只有该actor的actor ref,所以回调的过程中能够与该actor进行通信
    /** Callback handler for the asynchronous resourceManagerClient /
    private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
    //AM与RM通信的client,resourceManagerClient对象持有resourceManagerCallbackHandler
    /* Client to communicate with the Resource Manager (YARN's master) /
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    //AM与NM的通信client
    /* Client to communicate with the Node manager and launch TaskManager processes */
    private NMClient nodeManagerClient;

    YarnFlinkResourceManager 启动的过程先执行preStart方法,自己没有实现则执行其父类FlinkResourceManager的preStart方法。接着调用YarnFlinkResourceManager 的initialize方法。
    > ***在initialize方法里面***
    *** resourceManagerClient.start() ----> 
      AMRMClientAsyncImpl.serviceStart()--->
      CallbackHandlerThread.start()(守护线程)--->
    YarnResourceManagerCallbackHandler.onContainersAllocated(allocated)---> yarnFrameworkMaster.tell(new ContainersAllocated(containers),ActorRef.noSender())(yarnFrameworkMaster为YarnFlinkResourceManager ActorRef) -->
    YarnFlinkResourceManager .containersAllocated -->
    NMClient.startContainer(container, taskManagerLaunchContext)
    至此通知各个NM启动container。***
    ![](//upload-images.jianshu.io/upload_images/3249301-3a5b6377e93742b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2c532578dc7a508d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6a2cd3067970fe39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e14cfee9289c57bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-2e3b73f252030ea8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-a3239382a6bde777.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-0a52a463c263bbd7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-52cecffb2670cc67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)***至此,YarnApplicationMasterRunner 重要的流程已经说完,细节东西太多,就不再说了,有时间再看,接下来看YarnTaskManager的部分***## 3. YarnTaskManager启动流程分析***接上面nodeManagerClient.startContainer(container, taskManagerLaunchContext)将通知NM去启动container,NM根据taskManagerLaunchContext的启动信息,从HDFS下载YarnTaskManager启动过程依赖的jar和配置文件
    (container_tokens  default_container_executor_session.sh  default_container_executor.sh  flink-conf.yaml  flink.jar  launch_container.sh  lib  log4j.properties  logback.xml),然后shell执行launch_container.sh,最终用java -cp启动YarnTaskManager进程,启动进程的时候首先执行YarnTaskManager run方法,TaskManager会拿到JobManager的akka地址,然后发送注册消息,JobManager收到注册消息以后,注册成功之后就发送ack确认注册信息给TaskManager,然后TaskManger根据配置以及JobManager返回过来的信息构建一些真正干活的成员变量。过程:***
    > 
    YarnTaskManagerRunner.runYarnTaskManager(args, classOf[YarnTaskManager])-->
    TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager)-->
    TaskManager.runTaskManager -->
    TaskManager.startTaskManagerComponentsAndActor-->
    actorSystem.actorOf(tmProps, actorName)-->
    TaskManager.preStart-->
    StandaloneLeaderRetrievalService.start(TaskManager)-->
    TaskManger.notifyLeaderAddress-->
    TaskManager.handleJobManagerLeaderAddress-->
    TaskManager.triggerTaskManagerRegistration()
    TaskManager.handleRegistrationMessage-->
    instanceManager.registerTaskManager-->
    jobManager 发送消息AcknowledgeRegistration给TaskManager
    TaskManager.associateWithJobManager-->
    
    
    
    ![](//upload-images.jianshu.io/upload_images/3249301-8797abe4fead540e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-6ada492f0e25718b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-7d9e8a6893af056a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-e4116544f6c7a677.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-03e551a6ebf785c1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-155d80ccd8b5bed4.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-40bbe88a86f090c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-2226948e829a5add.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![Paste_Image.png](//upload-images.jianshu.io/upload_images/3249301-b9c444d706737b6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-3ff9f4e9243c1264.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-08823728261241ec.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![associateWithJobManager](//upload-images.jianshu.io/upload_images/3249301-f3aa8048869975c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-1bb10a80436aa199.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)![](//upload-images.jianshu.io/upload_images/3249301-442effae29613f2c.png)###基本上Flink on yarn的流程就是这样,细节需要深入,有不正确的地方,希望给予指正。




    链接:https://www.jianshu.com/p/8a3177095072


    免费体验云安全(易盾)内容安全、验证码等服务


    更多网易技术、产品、运营经验分享请点击




    相关文章:
    【推荐】 Android TV 开发 (1)
    【推荐】 针对云主机卡死问题的定位分析方法
    【推荐】 TheBeamModel:Stream&Tables翻译(上)

  • 相关阅读:
    Network (poj1144)
    C. Hongcow Builds A Nation
    ZYB loves Xor I(hud5269)
    D. Chloe and pleasant prizes
    Game(hdu5218)
    约瑟夫环的递推方法
    Misaki's Kiss again(hdu5175)
    Exploration(hdu5222)
    B. Arpa's weak amphitheater and Mehrdad's valuable Hoses
    C. Arpa's loud Owf and Mehrdad's evil plan
  • 原文地址:https://www.cnblogs.com/zyfd/p/9882370.html
Copyright © 2011-2022 走看看