zoukankan      html  css  js  c++  java
  • YARN源码分析(一)-----ApplicationMaster

    转自:http://blog.csdn.net/androidlushangderen/article/details/48128955

    YARN学习系列:http://blog.csdn.net/Androidlushangderen/article/category/5780183

    前言

    在之前两周主要学了HDFS中的一些模块知识,其中的许多都或多或少有我们借鉴学习的地方,现在将目光转向另外一个块,被誉为MRv2,就是yarn,在Yarn中,解决了MR中JobTracker单点的问题,将此拆分成了ResourceManager和NodeManager这样的结构,在每个节点上,还会有ApplicationMaster来管理应用程序的整个生命周期,的确在Yarn中,多了许多优秀的设计,而今天,我主要分享的就是这个ApplicationMaster相关的一整套服务,他是隶属于ResoureManager的内部服务中的.了解了AM的启动机制,你将会更进一步了解Yarn的任务启动过程.

    ApplicationMaster管理涉及类

    ApplicationMaster管理涉及到了4大类,ApplicationMasterLauncher,AMLivelinessMonitor,ApplicationMasterService,以及ApplicationMaster自身类.下面介绍一下这些类的用途,在Yarn中,每个类都会有自己明确的功能模块的区分.

    1.ApplicationMasterLauncher--姑且叫做AM启动关闭事件处理器,他既是一个服务也是一个处理器,在这个类中,只处理2类事件,launch和cleanup事件.分别对应启动应用和关闭应用的情形.

    2.AMLivelinessMonitor--这个类从名字上可以看出他是监控类,监控的对象是AM存活状态的监控类,检测的方法与之前的HDFS一样,都是采用heartbeat的方式,如果有节点过期了,将会触发一次过期事件.

    3.ApplicationMasterService--AM请求服务处理类.AMS存在于ResourceManager,中,服务的对象是各个节点上的ApplicationMaster,负责接收各个AM的注册请求,更新心跳包信息等.

    4.ApplicationMaster--节点应用管理类,简单的说,ApplicationMaster负责管理整个应用的生命周期.

    简答的描述完AM管理的相关类,下面从源码级别分析一下几个流程.

    AM启动

    要想让AM启动,启动的背景当然是有用户提交了新的Application的时候,之后ApplicationMasterLauncher会生成Launch事件,与对应的nodemanager通信,让其准备启动的新的AM的Container.在这里,就用到了ApplicationMasterLauncher这个类,之前在上文中已经提到,此类就处理2类事件,Launch启动和Cleanup清洗事件,先来看看这个类的基本变量设置

    1. //Application应用事件处理器  
    2. public class ApplicationMasterLauncher extends AbstractService implements  
    3.     EventHandler<AMLauncherEvent> {  
    4.   private static final Log LOG = LogFactory.getLog(  
    5.       ApplicationMasterLauncher.class);  
    6.   private final ThreadPoolExecutor launcherPool;  
    7.   private LauncherThread launcherHandlingThread;  
    8.     
    9.   //事件队列  
    10.   private final BlockingQueue<Runnable> masterEvents  
    11.     = new LinkedBlockingQueue<Runnable>();  
    12.   //资源管理器上下文  
    13.   protected final RMContext context;  
    14.     
    15.   public ApplicationMasterLauncher(RMContext context) {  
    16.     super(ApplicationMasterLauncher.class.getName());  
    17.     this.context = context;  
    18.     //初始化线程池  
    19.     this.launcherPool = new ThreadPoolExecutor(10, 10, 1,   
    20.         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());  
    21.     //新建处理线程  
    22.     this.launcherHandlingThread = new LauncherThread();  
    23.   }  

    还算比较简单,有一个masterEvents事件队列,还有执行线程以及所需的线程池执行环境。在RM相关的服务中,基本都是继承自AbstractService这个抽象服务类的。ApplicationMasterLauncher中主要处理2类事件,就是下面的展示的

    1. @Override  
    2.   public synchronized void  handle(AMLauncherEvent appEvent) {  
    3.     AMLauncherEventType event = appEvent.getType();  
    4.     RMAppAttempt application = appEvent.getAppAttempt();  
    5.     //处理来自ApplicationMaster获取到的请求,分为启动事件和清洗事件2种  
    6.     switch (event) {  
    7.     case LAUNCH:  
    8.       launch(application);  
    9.       break;  
    10.     case CLEANUP:  
    11.       cleanup(application);  
    12.     default:  
    13.       break;  
    14.     }  
    15.   }  

    然后调用具体的实现方法,以启动事件launch事件为例

    1. //添加应用启动事件  
    2.   private void launch(RMAppAttempt application) {  
    3.     Runnable launcher = createRunnableLauncher(application,   
    4.         AMLauncherEventType.LAUNCH);  
    5.     //将启动事件加入事件队列中  
    6.     masterEvents.add(launcher);  
    7.   }  

    这些事件被加入到事件队列之后,是如何被处理的呢,通过消息队列的形式,在一个独立的线程中逐一被执行

    1. //执行线程实现  
    2.   private class LauncherThread extends Thread {  
    3.       
    4.     public LauncherThread() {  
    5.       super("ApplicationMaster Launcher");  
    6.     }  
    7.   
    8.     @Override  
    9.     public void run() {  
    10.       while (!this.isInterrupted()) {  
    11.         Runnable toLaunch;  
    12.         try {  
    13.           //执行方法为从事件队列中逐一取出事件  
    14.           toLaunch = masterEvents.take();  
    15.           //放入线程池池中进行执行  
    16.           launcherPool.execute(toLaunch);  
    17.         } catch (InterruptedException e) {  
    18.           LOG.warn(this.getClass().getName() + " interrupted. Returning.");  
    19.           return;  
    20.         }  
    21.       }  
    22.     }  
    23.   }  

    如果论到事件的具体执行方式,就要看具体AMLauch是如何执行的,AMLauch本身就是一个runnable实例。

    1. /** 
    2.  * The launch of the AM itself. 
    3.  * Application事件执行器 
    4.  */  
    5. public class AMLauncher implements Runnable {  
    6.   
    7.   private static final Log LOG = LogFactory.getLog(AMLauncher.class);  
    8.   
    9.   private ContainerManagementProtocol containerMgrProxy;  
    10.   
    11.   private final RMAppAttempt application;  
    12.   private final Configuration conf;  
    13.   private final AMLauncherEventType eventType;  
    14.   private final RMContext rmContext;  
    15.   private final Container masterContainer;  

    在里面主要的run方法如下,就是按照事件类型进行区分操作

    1. @SuppressWarnings("unchecked")  
    2.   public void run() {  
    3.     //AMLauncher分2中事件分别处理  
    4.     switch (eventType) {  
    5.     case LAUNCH:  
    6.       try {  
    7.         LOG.info("Launching master" + application.getAppAttemptId());  
    8.         //调用启动方法  
    9.         launch();  
    10.         handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),  
    11.             RMAppAttemptEventType.LAUNCHED));  
    12.       ...  
    13.       break;  
    14.     case CLEANUP:  
    15.       try {  
    16.         LOG.info("Cleaning master " + application.getAppAttemptId());  
    17.         //调用作业清洗方法  
    18.         cleanup();  
    19.       ...  
    20.       break;  
    21.     default:  
    22.       LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");  
    23.       break;  
    24.     }  
    25.   }  

    后面的launch操作会调用RPC函数与远程的NodeManager通信来启动Container。然后到了ApplicationMaster的run()启动方法,在启动方法中,会进行应用注册的方法,

    1. @SuppressWarnings({ "unchecked" })  
    2.   public boolean run() throws YarnException, IOException {  
    3.     LOG.info("Starting ApplicationMaster");  
    4.   
    5.     Credentials credentials =  
    6.         UserGroupInformation.getCurrentUser().getCredentials();  
    7.     DataOutputBuffer dob = new DataOutputBuffer();  
    8.     credentials.writeTokenStorageToStream(dob);  
    9.     // Now remove the AM->RM token so that containers cannot access it.  
    10.     Iterator<Token<?>> iter = credentials.getAllTokens().iterator();  
    11.     while (iter.hasNext()) {  
    12.       Token<?> token = iter.next();  
    13.       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {  
    14.         iter.remove();  
    15.       }  
    16.     }  
    17.     allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());  
    18.   
    19.     //与ResourceManager通信,周期性发送心跳信息,包含了应用的最新信息  
    20.     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();  
    21.     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);  
    22.     amRMClient.init(conf);  
    23.     amRMClient.start();  
    24.     .....  
    25.   
    26.     // Register self with ResourceManager  
    27.     // This will start heartbeating to the RM  
    28.     //启动之后进行AM的注册  
    29.     appMasterHostname = NetUtils.getHostname();  
    30.     RegisterApplicationMasterResponse response = amRMClient  
    31.         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,  
    32.             appMasterTrackingUrl);  
    33.     // Dump out information about cluster capability as seen by the  
    34.     // resource manager  
    35.     int maxMem = response.getMaximumResourceCapability().getMemory();  
    36.     LOG.info("Max mem capabililty of resources in this cluster " + maxMem);  
    37.   
    38.     // A resource ask cannot exceed the max.  
    39.     if (containerMemory > maxMem) {  
    40.       LOG.info("Container memory specified above max threshold of cluster."  
    41.           + " Using max value." + ", specified=" + containerMemory + ", max="  
    42.           + maxMem);  
    43.       containerMemory = maxMem;  
    44.     }  

    在这个操作中,会将自己注册到AMLivelinessMonitor中,此刻开始启动心跳监控。

    AMLiveLinessMonitor监控

    在这里把重心从ApplicationMaster转移到AMLivelinessMonitor上,首先这是一个激活状态的监控线程,此类线程都有一个共同的父类

    1. //应用存活状态监控线程  
    2. public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {  

    在AbstractlinessMonitor中定义监控类线程的一类特征和方法

    1. //进程存活状态监控类  
    2. public abstract class AbstractLivelinessMonitor<O> extends AbstractService {  
    3.   
    4.   private static final Log LOG = LogFactory.getLog(AbstractLivelinessMonitor.class);  
    5.   
    6.   //thread which runs periodically to see the last time since a heartbeat is  
    7.   //received.  
    8.   //检查线程  
    9.   private Thread checkerThread;  
    10.   private volatile boolean stopped;  
    11.   //默认超时时间5分钟  
    12.   public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins  
    13.   //超时时间  
    14.   private int expireInterval = DEFAULT_EXPIRE;  
    15.   //监控间隔检测时间,为超时时间的1/3  
    16.   private int monitorInterval = expireInterval/3;  
    17.   
    18.   private final Clock clock;  
    19.     
    20.   //保存了心跳检验的结果记录  
    21.   private Map<O, Long> running = new HashMap<O, Long>();  

    心跳检测本身非常的简单,做一次通信记录检查,然后更新一下,记录时间,当一个新的节点加入监控或解除监控操作

    1. //新的节点注册心跳监控  
    2.   public synchronized void register(O ob) {  
    3.     running.put(ob, clock.getTime());  
    4.   }  
    5.     
    6.   //节点移除心跳监控  
    7.   public synchronized void unregister(O ob) {  
    8.     running.remove(ob);  
    9.   }  

    每次做心跳周期检测的时候,调用下述方法

    1. //更新心跳监控检测最新时间  
    2.   public synchronized void receivedPing(O ob) {  
    3.     //only put for the registered objects  
    4.     if (running.containsKey(ob)) {  
    5.       running.put(ob, clock.getTime());  
    6.     }  
    7.   }  

    非常简单的更新方法,O ob对象在这里因场景而异,在AM监控中,为ApplicationID应用ID。在后面的AMS和AM的交互中会看到。新的应用加入AMLivelinessMonitor监控中后,后面的主要操作就是AMS与AM之间的交互操作了。

    AM与AMS

    在ApplicationMaster运行之后,会周期性的向ApplicationMasterService发送心跳信息,心跳信息包含有许多资源描述信息。

    1. //ApplicationMaster心跳信息更新  
    2.   @Override  
    3.   public AllocateResponse allocate(AllocateRequest request)  
    4.       throws YarnException, IOException {  
    5.   
    6.     ApplicationAttemptId appAttemptId = authorizeRequest();  
    7.     //进行心跳信息时间的更新  
    8.     this.amLivelinessMonitor.receivedPing(appAttemptId);  
    9.     ....  

    每次心跳信息一来,就会更新最新监控时间。在AMS也有对应的注册应用的方法

    1. //ApplicationMaster在ApplicationMasterService上服务上进行应用注册  
    2. @Override  
    3. public RegisterApplicationMasterResponse registerApplicationMaster(  
    4.     RegisterApplicationMasterRequest request) throws YarnException,  
    5.     IOException {  
    6.   
    7.   ApplicationAttemptId applicationAttemptId = authorizeRequest();  
    8.   
    9.   ApplicationId appID = applicationAttemptId.getApplicationId();  
    10.   .....  
    11.       
    12.     //在存活监控线程上进行心跳记录,更新检测时间,key为应用ID  
    13.     this.amLivelinessMonitor.receivedPing(applicationAttemptId);  
    14.     RMApp app = this.rmContext.getRMApps().get(appID);  
    15.       
    16.     // Setting the response id to 0 to identify if the  
    17.     // application master is register for the respective attemptid  
    18.     lastResponse.setResponseId(0);  
    19.     responseMap.put(applicationAttemptId, lastResponse);  
    20.     LOG.info("AM registration " + applicationAttemptId);  
    21.     this.rmContext  

    如果在心跳监控中出现过期的现象,就会触发一个expire事件,在AMLiveLinessMonitor中,这部分的工作是交给CheckThread执行的

    1. //进程存活状态监控类  
    2. public abstract class AbstractLivelinessMonitor<O> extends AbstractService {  
    3.   ...  
    4.   //thread which runs periodically to see the last time since a heartbeat is  
    5.   //received.  
    6.   //检查线程  
    7.   private Thread checkerThread;  
    8.   ....  
    9.   //默认超时时间5分钟  
    10.   public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins  
    11.   //超时时间  
    12.   private int expireInterval = DEFAULT_EXPIRE;  
    13.   //监控间隔检测时间,为超时时间的1/3  
    14.   private int monitorInterval = expireInterval/3;  
    15.   ....  
    16.   //保存了心跳检验的结果记录  
    17.   private Map<O, Long> running = new HashMap<O, Long>();  
    18.   ...  
    19.   
    20.   private class PingChecker implements Runnable {  
    21.   
    22.     @Override  
    23.     public void run() {  
    24.       while (!stopped && !Thread.currentThread().isInterrupted()) {  
    25.         synchronized (AbstractLivelinessMonitor.this) {  
    26.           Iterator<Map.Entry<O, Long>> iterator =   
    27.             running.entrySet().iterator();  
    28.   
    29.           //avoid calculating current time everytime in loop  
    30.           long currentTime = clock.getTime();  
    31.   
    32.           while (iterator.hasNext()) {  
    33.             Map.Entry<O, Long> entry = iterator.next();  
    34.             //进行超时检测  
    35.             if (currentTime > entry.getValue() + expireInterval) {  
    36.               iterator.remove();  
    37.               //调用超时处理方法,将处理事件交由调度器处理  
    38.               expire(entry.getKey());  
    39.               LOG.info("Expired:" + entry.getKey().toString() +   
    40.                       " Timed out after " + expireInterval/1000 + " secs");  
    41.             }  
    42.           }  
    43.         }  

    check线程主要做的事件就是遍历每个节点的最新心跳更新时间,通过计算差值进行判断是否过期,过期调用expire方法。此方法由其子类实现

    1. //应用存活状态监控线程  
    2. public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {  
    3.   //中央调度处理器  
    4.   private EventHandler dispatcher;  
    5.   ...  
    6.   
    7.   @Override  
    8.   protected void expire(ApplicationAttemptId id) {  
    9.      //一旦应用过期,处理器处理过期事件处理  
    10.     dispatcher.handle(  
    11.         new RMAppAttemptEvent(id, RMAppAttemptEventType.EXPIRE));  
    12.   }  
    13. }  

    产生应用超期事件,然后发给中央调度器去处理。之所以采用的这样的方式,是因为在RM中,所有的模块设计是以事件驱动的形式工作,最大程度的保证了各个模块间的解耦。不同模块通过不同的事件转变为不同的状态,可以理解为状态机的改变。最后用一张书中的截图简单的展示AM模块相关的调用过程。

    全部代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。

    参考文献

    《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等

  • 相关阅读:
    使用 @Autowired 的时候,到底是写接口还是实现类?
    socket的简单例子
    java 将文件夹所有的文件合并到指定的文件夹下
    java 复制某一文件夹下的所有文件到另一个文件夹下
    java Date日期总结的一些转换的方法
    java 可排序的数值得字符串,转化成list集合后进行的排序的几种方法
    java 查看文件的大小的方法
    java 从一个总的list集合中,去掉指定的集合元素,得到新的集合
    java 可拆成数组的字符串,去掉重复元素的一种方法
    将博客搬至CSDN
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5044020.html
Copyright © 2011-2022 走看看