zoukankan      html  css  js  c++  java
  • Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

    本文继《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)》,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见《Yarn源码分析之MRAppMaster上MapReduce作业初始化解析》一文。

            (三)启动

            作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  /** 
    2.   * This can be overridden to instantiate multiple jobs and create a  
    3.   * workflow. 
    4.   * 
    5.   * TODO:  Rework the design to actually support this.  Currently much of the 
    6.   * job stuff has been moved to init() above to support uberization (MR-1220). 
    7.   * In a typical workflow, one presumably would want to uberize only a subset 
    8.   * of the jobs (the "small" ones), which is awkward with the current design. 
    9.   */  
    10.  @SuppressWarnings("unchecked")  
    11.  protected void startJobs() {  
    12.    /** create a job-start event to get this ball rolling */  
    13. // 构造作业启动事件JobStartEvent实例startJobEvent  
    14.    JobEvent startJobEvent = new JobStartEvent(job.getID(),  
    15.        recoveredJobStartTime);  
    16.    /** send the job-start event. this triggers the job execution. */  
    17.    // 将作业启动事件JobStartEvent实例startJobEvent交由事件分发器dispatcher的事件处理器处理  
    18.    dispatcher.getEventHandler().handle(startJobEvent);  
    19.  }  

            很简单,首先构造作业启动事件JobStartEvent实例startJobEvent,然后将作业启动事件JobStartEvent实例startJobEvent交由事件分发器dispatcher的事件处理器处理。我们首先看下事件分发器dispatcher是如何初始化的,其在MRAppMaster服务初始化的serviceInit()方法中,关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. dispatcher = createDispatcher();  

            再来看下createDispatcher()方法,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. protected Dispatcher createDispatcher() {  
    2.   return new AsyncDispatcher();  
    3. }  

            就是创建一个AsyncDispatcher对象,其代表的是一个事件异步分发器AsyncDispatcher,我们曾经在《Yarn源码分析之事件异步分发器AsyncDispatcher》一文中专门介绍过这个AsyncDispatcher。AsyncDispatcher其实是一个生产者-消费者模型的事件异步分发器。在其内部有一个待分发事件队列eventQueue,并有一个GenericEventHandler类型的事件处理器handlerInstance,由其handle()方法负责将外部事件event添加到待分发队列eventQueue中,等到AsyncDispatcher中的消费者eventHandlingThread不断的获取待分发队列eventQueue中的事件,分发并交由之前注册的事件类型对应的事件处理器处理。关于这部分的内容请阅读《Yarn源码分析之事件异步分发器AsyncDispatcher》一文,此处不再做过多介绍。

            那么dispatcher中是如何注册JobStartEvent事件的处理器的呢?注册的事件处理器又是谁呢?还是在服务初始化的方法中,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. //register the event dispatchers  
    2. dispatcher.register(JobEventType.class, jobEventDispatcher);  

            通过查看JobStartEvent的源码我们知道,JobStartEvent继承自JobEvent,它也是一种JobEvent,所以其处理会交给jobEventDispatcher来处理。细心的读者获取会发现,在此之前,dispatcher已经注册过一个JobEventType对应的事件处理器,NoopEventHandler类型的eater了,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. NoopEventHandler eater = new NoopEventHandler();  
    2. //We do not have a JobEventDispatcher in this path  
    3. dispatcher.register(JobEventType.class, eater);  

            我们先看下NoopEventHandler的定义,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Eats events that are not needed in some error cases. 
    3.  */  
    4. private static class NoopEventHandler implements EventHandler<Event> {  
    5.   
    6.   @Override  
    7.   public void handle(Event event) {  
    8.     //Empty  
    9.   }  
    10. }  

            四个字,空空如也!那么,读者在这里可能就有疑问了,到底是由jobEventDispatcher还是eater来处理作业启动JobStartEvent事件内。这里要说的是,这两次注册实际上是形成了一个JobEventType事件类型的链式事件处理器,它会将事件挨个通过链式事件处理器中的每个处理器进行处理,这在《Yarn源码分析之事件异步分发器AsyncDispatcher》一文中的register()方法介绍中也提到过了,读者可自行查看。而这里,既然eater为空,不对事件做任何处理,我们还是看看jobEventDispatcher吧。

            那么,jobEventDispatcher是如何定义及初始化的呢?其实这个jobEventDispatcher在Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)中的作业初始化事件JobEventType.JOB_INIT处理时已经讲到过了,它就是一个JobEventDispatcher对象,这里再回顾一下,其定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private class JobEventDispatcher implements EventHandler<JobEvent> {  
    2.   @SuppressWarnings("unchecked")  
    3.   @Override  
    4.   public void handle(JobEvent event) {  
    5.     // 从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件  
    6.     ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);  
    7.   }  
    8. }  

            它实际上并不真正干活,而是从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件。那么在处理作业初始化事件时我们也提到过了,它是根据作业状态机的doTransition()方法根据事件类型来处理的,关于作业状态机,我们这里还是不做介绍,你还是只要知道作业启动事件是通过JobImpl的静态内部类StartTransition的transition()方法来处理的就行,其代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public static class StartTransition  
    2. implements SingleArcTransition<JobImpl, JobEvent> {  
    3.   /** 
    4.    * This transition executes in the event-dispatcher thread, though it's 
    5.    * triggered in MRAppMaster's startJobs() method. 
    6.    */  
    7.   @Override  
    8.   public void transition(JobImpl job, JobEvent event) {  
    9.     JobStartEvent jse = (JobStartEvent) event;  
    10.      
    11.     // 设置作业的起始时间startTime  
    12.     if (jse.getRecoveredJobStartTime() != 0) {  
    13.       job.startTime = jse.getRecoveredJobStartTime();  
    14.     } else {  
    15.       job.startTime = job.clock.getTime();  
    16.     }  
    17.       
    18.     // 创建作业已初始化事件JobInitedEvent实例jie  
    19.     JobInitedEvent jie =  
    20.       new JobInitedEvent(job.oldJobId,  
    21.            job.startTime,  
    22.            job.numMapTasks, job.numReduceTasks,  
    23.            job.getState().toString(),  
    24.            job.isUber());  
    25.       
    26.     // 将作业已初始化事件JobInitedEvent实例jie包装成作业历史事件JobHistoryEvent,并交给作业的事件处理器eventHandler处理  
    27.     job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));  
    28.       
    29.     // 创建作业信息变更事件JobInfoChangeEvent实例jice  
    30.     JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,  
    31.         job.appSubmitTime, job.startTime);  
    32.       
    33.     // 将作业信息变更事件JobInfoChangeEvent实例jice包装成作业历史事件JobHistoryEvent,并交给作业的事件处理器eventHandler处理  
    34.     job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));  
    35.       
    36.     // 调用作业度量指标metrics的runningJob()方法,标识作业已开始运行  
    37.     job.metrics.runningJob(job);  
    38.   
    39.     // 构造提交作业Setup事件CommitterJobSetupEvent,并交由作业的事件处理器eventHandler处理  
    40.     job.eventHandler.handle(new CommitterJobSetupEvent(  
    41.             job.jobId, job.jobContext));  
    42.   }  
    43. }  

            去掉关于作业历史信息等不是十分关键的细节,整体主体流程如下:

            1、设置作业的起始时间startTime;

            2、构造提交作业Setup事件CommitterJobSetupEvent,并交由作业的事件处理器eventHandler处理。

            那么,作业的事件处理器eventHandler是什么呢?它又是如何处理提交作业Setup事件CommitterJobSetupEvent的呢?

            我们先看下作业的事件处理器eventHandler,在MRAppMaster服务启动时创建作业JobImpl实例时,eventHandler是通过传入的dispatcher.getEventHandler()来初始化的,基于上面的陈述,这我们就不用讲了吧。

            我们还是看下dispatcher是如何注册事件CommitterJobSetupEvent对应的事件处理器的吧,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. dispatcher.register(CommitterEventType.class, committerEventHandler);  

            我们知道,CommitterJobSetupEvent继承自CommitterEvent,所以它实际上是通过committerEventHandler来处理的,那么什么是committerEventHandler呢?其初始化如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. //service to handle the output committer  
    2. committerEventHandler = createCommitterEventHandler(context, committer);  

            通过调用createCommitterEventHandler()方法,构造了一个CommitterEventHandler实例,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. protected EventHandler<CommitterEvent> createCommitterEventHandler(  
    2.     AppContext context, OutputCommitter committer) {  
    3.   return new CommitterEventHandler(context, committer,  
    4.       getRMHeartbeatHandler(), jobClassLoader);  
    5. }  

            关于CommitterEventHandler的介绍,我们后续会写相关文章进行详细的介绍,这里,你只要知道,它类似AsyncDispatcher,也是一个生产者-消费者模式的事件分发器,而最终是通过其内部EventProcessor类型的事件处理线程eventHandlingThread来处理的,在EventProcessor中,有针对JOB_SETUP事件处理的逻辑,关键代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. switch (event.getType()) {  
    2. case JOB_SETUP:  
    3.   handleJobSetup((CommitterJobSetupEvent) event);  
    4.   break;  

            继续追踪handleJobSetup()方法,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @SuppressWarnings("unchecked")  
    2. protected void handleJobSetup(CommitterJobSetupEvent event) {  
    3.   try {  
    4.     committer.setupJob(event.getJobContext());  
    5.     context.getEventHandler().handle(  
    6.         new JobSetupCompletedEvent(event.getJobID()));  
    7.   } catch (Exception e) {  
    8.     LOG.warn("Job setup failed", e);  
    9.     context.getEventHandler().handle(new JobSetupFailedEvent(  
    10.         event.getJobID(), StringUtils.stringifyException(e)));  
    11.   }  
    12. }  

            它做了两件事情,如下:

            1、调用committer的setupJob()方法处理该CommitterJobSetupEvent事件;

            2、又构造了一个JobSetupCompletedEvent事件,交由应用运行上下文context的事件处理器进行处理。

            而首先要说的是,committer、context均是由MRAppMaster在创建CommitterEventHandler时传入的,其对应的对象类型分别是:

            1、committer:

                  1.1、新版API是通过OutputFormat组件的getOutputCommitter()方法获取的;

                  1.2、旧版API是通过参数mapred.output.committer.class获取的,参数未配置默认为FileOutputCommitter。

            2、context:RunningAppContext。

            对于committer,我们这里以较为通用的FileOutputCommitter为例,看下其setupJob()方法,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. /** 
    2.  * Create the temporary directory that is the root of all of the task  
    3.  * work directories. 
    4.  * @param context the job's context 
    5.  */  
    6. public void setupJob(JobContext context) throws IOException {  
    7.   if (hasOutputPath()) {  
    8.     Path jobAttemptPath = getJobAttemptPath(context);  
    9.     FileSystem fs = jobAttemptPath.getFileSystem(  
    10.         context.getConfiguration());  
    11.     if (!fs.mkdirs(jobAttemptPath)) {  
    12.       LOG.error("Mkdirs failed to create " + jobAttemptPath);  
    13.     }  
    14.   } else {  
    15.     LOG.warn("Output Path is null in setupJob()");  
    16.   }  
    17. }  

            实际上就做了一件事情,创建作业中所有任务工作的临时根目录。

            再来看下context是如何处理JobSetupCompletedEvent的,还记得之前我们讲述的,RunningAppContext实际上什么都不干,而是交给了JobImpl对应的作业状态机了吗?我们就看下JobImpl中是如何处理JobSetupCompletedEvent事件的,其对应的处理在其静态内部类SetupCompletedTransition的transition()中,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private static class SetupCompletedTransition  
    2.     implements SingleArcTransition<JobImpl, JobEvent> {  
    3.   @Override  
    4.   public void transition(JobImpl job, JobEvent event) {  
    5.       
    6.     // 通过设置作业的setupProgress为1,标记作业setup已完成  
    7.     job.setupProgress = 1.0f;  
    8.       
    9.     // 调度作业的Map Task  
    10.     job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);  
    11.     // 调度作业的Reduce Task  
    12.     job.scheduleTasks(job.reduceTasks, true);  
    13.   
    14.     // If we have no tasks, just transition to job completed  
    15.     // 如果没有task了,则生成JOB_COMPLETED事件并交由作业的事件处理器eventHandler进行处理  
    16.     if (job.numReduceTasks == 0 && job.numMapTasks == 0) {  
    17.       job.eventHandler.handle(new JobEvent(job.jobId,  
    18.           JobEventType.JOB_COMPLETED));  
    19.     }  
    20.   }  
    21. }  

            是不是很简单,而且也很理所当然?处理流程如下:

            1、通过设置作业的setupProgress为1,标记作业setup已完成;

            2、调度作业的Map Task;

            3、调度作业的Reduce Task;

            4、如果没有task了,则生成JOB_COMPLETED事件并交由作业的事件处理器eventHandler进行处理。

            未完待续,后续作业启动部分内容详细描述、作业停止等内容,请关注《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(三)》。

  • 相关阅读:
    sqlite数据库如何远程连接?
    redis的两种持久化方案
    Netty入门系列(1) --使用Netty搭建服务端和客户端
    使用MQ消息队列的优缺点
    Netty基础系列(3) --彻底理解NIO
    Netty基础系列(2) --彻底理解阻塞非阻塞与同步异步的区别
    Netty基础系列(1) --linux网路I/O模型
    Jedis异常解决:NOAUTH Authentication required
    java并发编程(2) --Synchronized与Volatile区别
    java并发编程(1) --并发基础及其锁的原理
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556374.html
Copyright © 2011-2022 走看看