zoukankan      html  css  js  c++  java
  • Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)

     v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了四件事:

            1、通过设置作业Job的成员变量setupProgress为1,标记作业setup已完成;

            2、调度作业Job的Map Task;

            3、调度作业的JobReduce Task;

            4、如果没有task了,则生成JOB_COMPLETED事件并交由作业的事件处理器eventHandler进行处理。

            本文,我们就将研究作业Job中Task是如何被调度的。

            首先看下SetupCompletedTransition中transition()方法关于作业Job中Task调度的代码,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 调度作业Job的Map Task  
    2. job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);  
    3. // 调度作业Job的Reduce Task  
    4. job.scheduleTasks(job.reduceTasks, true);  

            它实际上是通过Job,也就是JobImpl的scheduleTasks()完成的,这个方法需要两个参数,第一个是作业Job待调度任务的任务ID集合taskIDs,第二个参数是表示是否恢复任务输出的标志位recoverTaskOutput,对于Map-Only型作业中Map任务和所有类型作业的Reduce任务,都需要恢复,标志位recoverTaskOutput为true,具体代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  protected void scheduleTasks(Set<TaskId> taskIDs,  
    2.      boolean recoverTaskOutput) {  
    3.     
    4. // 遍历传入的任务集合taskIDs中的每个TaskId,对taskID做以下处理:  
    5.    for (TaskId taskID : taskIDs) {  
    6.       
    7.      // 根据taskID从集合completedTasksFromPreviousRun中移除对应元素,并获取被移除的元素TaskInfo实例taskInfo  
    8.      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);  
    9.        
    10.      if (taskInfo != null) {// 若存在taskID对应任务信息TaskInfo实例taskInfo  
    11.         
    12.     // 构造T_RECOVER类型任务恢复事件TaskRecoverEvent,交给eventHandler处理,标志位recoverTaskOutput表示是否恢复任务的输出,  
    13.     // 对于Map-Only型Map任务和所有的Reduce任务,都需要恢复,标志位recoverTaskOutput为true  
    14.        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,  
    15.            committer, recoverTaskOutput));  
    16.      } else {  
    17.         
    18.     // 否则,构造T_SCHEDULE类型任务调度事件TaskEvent,交给eventHandler处理  
    19.        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));  
    20.      }  
    21.    }  
    22.  }  

            scheduleTasks()方法遍历传入的任务集合taskIDs中的每个TaskId实例taskID,对taskID做以下处理:

           1、根据taskID从集合completedTasksFromPreviousRun中移除对应元素,并获取被移除的元素TaskInfo实例taskInfo;

           2、若存在taskID对应任务信息TaskInfo实例taskInfo,构造T_RECOVER类型任务恢复事件TaskRecoverEvent,交给eventHandler处理,标志位recoverTaskOutput表示是否恢复任务的输出,对于Map-Only型Map任务和所有的Reduce任务,都需要恢复,标志位recoverTaskOutput为true;

           3、否则,构造T_SCHEDULE类型任务调度事件TaskEvent,交给eventHandler处理。

            我们先看T_SCHEDULE类型任务调度事件TaskEvent的处理,它是交由Job的eventHandler来处理的,而这个eventHandler是在Job被创建时(即构造JobImpl实例时)由MRAppMaster的dispatcher来赋值的,而在MRAppMaster中,dispatcher被创建后就会注册任务事件的处理器TaskEventDispatcher实例,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. dispatcher.register(TaskEventType.class, new TaskEventDispatcher());  

            而这个任务事件处理器TaskEventDispatcher中处理任务事件TaskEvent的handle()方法定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private class TaskEventDispatcher implements EventHandler<TaskEvent> {  
    2.   @SuppressWarnings("unchecked")  
    3.   @Override  
    4.   public void handle(TaskEvent event) {  
    5.     Task task = context.getJob(event.getTaskID().getJobId()).getTask(  
    6.         event.getTaskID());  
    7.     ((EventHandler<TaskEvent>)task).handle(event);  
    8.   }  
    9. }  

            它实际上是通过作业Job中相关任务Task的handle()方法来处理的,而这个任务Task的实现则是TaskImpl,其中对于各种任务事件的处理,也是类似作业Job,由一个任务Task的状态机进行处理,关于任务Task的状态机,我们会有专门的文章进行介绍,这里,您只需要知道在TaskImpl中,对于上述两种任务状态机中任务状态的转换、触发事件及事件处理者定义如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  private static final StateMachineFactory  
    2.               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>   
    3.            stateMachineFactory   
    4.           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>  
    5.               (TaskStateInternal.NEW)  
    6.       
    7. // 省略部分代码  
    8.        .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,   
    9.                  TaskEventType.T_SCHEDULE, new InitialScheduleTransition())       
    10. // 省略部分代码  
    11. .addTransition(TaskStateInternal.NEW,  
    12.                   EnumSet.of(TaskStateInternal.FAILED,  
    13.                   TaskStateInternal.KILLED,  
    14.                   TaskStateInternal.RUNNING,  
    15.                   TaskStateInternal.SUCCEEDED),  
    16.                   TaskEventType.T_RECOVER, new RecoverTransition())  
    17. // 省略部分代码  

            由此可见,对于T_RECOVER类型任务恢复事件TaskRecoverEvent,Task状态机指定由RecoverTransition处理,并且任务Task的状态会由NEW转换为RUNNING、FAILED、KILLED、SUCCEEDED等,而对于T_SCHEDULE类型任务调度事件TaskEvent,则由Task状态机指定为InitialScheduleTransition处理,并且任务Task的状态会由NEW转换为SCHEDULED。下面,我们挨个进行分析。

            一、T_SCHEDULE类型任务调度事件TaskEvent

            由InitialScheduleTransition进行处理,任务Task的状态会由NEW转换为SCHEDULED,InitialScheduleTransition代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private static class InitialScheduleTransition  
    2.   implements SingleArcTransition<TaskImpl, TaskEvent> {  
    3.   
    4.   @Override  
    5.   public void transition(TaskImpl task, TaskEvent event) {  
    6.       
    7.     // 添加并调度任务运行尝试TaskAttempt,Avataar.VIRGIN表示它是第一个Attempt,  
    8.     // 而剩余的Avataar.SPECULATIVE表示它是为拖后腿任务开启的一个Attempt,即推测执行原理  
    9.     task.addAndScheduleAttempt(Avataar.VIRGIN);  
    10.     // 设置任务的调度时间scheduledTime为当前时间  
    11.     task.scheduledTime = task.clock.getTime();  
    12.     // 发送任务启动事件  
    13.     task.sendTaskStartedEvent();  
    14.   }  
    15. }  

             InitialScheduleTransition的处理逻辑比较简单,大体如下:

            1、调用addAndScheduleAttempt()方法,添加并调度任务运行尝试TaskAttempt,Avataar.VIRGIN表示它是第一个Attempt,而剩余的Avataar.SPECULATIVE表示它是为拖后腿任务开启的一个Attempt,即推测执行原理;

            2、设置任务的调度时间scheduledTime为当前时间;

            3、发送任务启动事件。

            其中,1中的addAndScheduleAttempt()方法实现如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  // This is always called in the Write Lock  
    2.  private void addAndScheduleAttempt(Avataar avataar) {  
    3.     
    4. // 调用addAttempt()方法,创建一个任务运行尝试TaskAttempt实例attempt,  
    5. // 并将其添加到attempt集合attempts中,还会设置attempt的Avataar属性  
    6.    TaskAttempt attempt = addAttempt(avataar);  
    7.      
    8.    // 将attempt的id添加到正在执行的attempt集合inProgressAttempts中  
    9.    inProgressAttempts.add(attempt.getID());  
    10.      
    11.    //schedule the nextAttemptNumber  
    12.    // 调度TaskAttempt  
    13.      
    14.    // 如果集合failedAttempts大小大于0,说明该Task之前有TaskAttempt失败过,此次为重新调度,  
    15.    // TaskAttemp事件类型为TA_RESCHEDULE,  
    16.    if (failedAttempts.size() > 0) {  
    17.      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),  
    18.          TaskAttemptEventType.TA_RESCHEDULE));  
    19.    } else {  
    20.      // 否则为TaskAttemp事件类型为TA_SCHEDULE  
    21.      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),  
    22.          TaskAttemptEventType.TA_SCHEDULE));  
    23.    }  
    24.  }  

            addAndScheduleAttempt()方法处理逻辑如下:

            1、调用addAttempt()方法,创建一个任务运行尝试TaskAttempt实例attempt,并将其添加到attempt集合attempts中,还会设置attempt的Avataar属性;

            2、将attempt的id添加到正在执行的attempt集合inProgressAttempts中;

            3、调度TaskAttempt:如果集合failedAttempts大小大于0,说明该Task之前有TaskAttempt失败过,此次为重新调度,TaskAttemp事件类型为TA_RESCHEDULE,否则为TaskAttemp事件类型为TA_SCHEDULE。

            而addAttempt()方法实现如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private TaskAttemptImpl addAttempt(Avataar avataar) {  
    2.    
    3. / 调用createAttempt()方法创建任务运行尝试TaskAttemptImpl实例attempt  
    4.   TaskAttemptImpl attempt = createAttempt();  
    5.     
    6.   // 设置attempt的Avataar属性  
    7.   attempt.setAvataar(avataar);  
    8.     
    9.   // 记录debug级别日志信息:Created attempt ... ...  
    10.   if (LOG.isDebugEnabled()) {  
    11.     LOG.debug("Created attempt " + attempt.getID());  
    12.   }  
    13.     
    14.   // 将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中,  
    15.   // attempts先被初始化为Collections.emptyMap()  
    16.   // this.attempts = Collections.emptyMap();  
    17.   switch (attempts.size()) {  
    18.     case 0:  
    19.         
    20.     // 如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt  
    21.       attempts = Collections.singletonMap(attempt.getID(),  
    22.           (TaskAttempt) attempt);  
    23.       break;  
    24.         
    25.     case 1:  
    26.         
    27.     // 如果attempts大小为1,即为Collections.singletonMap(),则将其替换为LinkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt  
    28.       Map<TaskAttemptId, TaskAttempt> newAttempts  
    29.           = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);  
    30.       newAttempts.putAll(attempts);  
    31.       attempts = newAttempts;  
    32.       attempts.put(attempt.getID(), attempt);  
    33.       break;  
    34.   
    35.     default:  
    36.     // 如果attempts大小大于1,说明其实一个LinkedHashMap,直接put吧  
    37.       attempts.put(attempt.getID(), attempt);  
    38.       break;  
    39.   }  
    40.   
    41.   // 累加TaskAttempt计数器nextAttemptNumber  
    42.   ++nextAttemptNumber;  
    43.     
    44.   // 返回TaskAttemptImpl实例attempt  
    45.   return attempt;  
    46. }  

            其处理逻辑如下:

            1、调用createAttempt()方法创建任务运行尝试TaskAttemptImpl实例attempt;

            2、设置attempt的Avataar属性;

            3、记录debug级别日志信息:Created attempt ... ...;

            4、将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中,attempts先被初始化为Collections.emptyMap():

                  4.1、如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt;

                  4.2、如果attempts大小为1,即为Collections.singletonMap(),则将其替换为LinkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt;

                  4.3、如果attempts大小大于1,说明其实一个LinkedHashMap,直接put吧;

            5、累加TaskAttempt计数器nextAttemptNumber;

            6、返回TaskAttemptImpl实例attempt。

             继续往下追踪createAttempt()方法,其在TaskImpl中代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. protected abstract TaskAttemptImpl createAttempt();  

            这是一个抽象方法,由其子类实现,而它的子类有两个,表示Map任务的MapTaskImpl和表示Reduce任务的ReduceTaskImpl,其createAttempt()方法分别实现如下:

             1、MapTaskImpl.createAttempt()

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override  
    2. protected TaskAttemptImpl createAttempt() {  
    3.   return new MapTaskAttemptImpl(getID(), nextAttemptNumber,  
    4.       eventHandler, jobFile,  
    5.       partition, taskSplitMetaInfo, conf, taskAttemptListener,  
    6.       jobToken, credentials, clock, appContext);  
    7. }  

            生成一个MapTaskAttemptImpl实例,传入表示Attempt序号的nextAttemptNumber、事件处理器eventHandler、作业文件jobFile、分区信息partition、分片元数据信息taskSplitMetaInfo等关键变量。

            2、ReduceTaskImpl.createAttempt()

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @Override  
    2. protected TaskAttemptImpl createAttempt() {  
    3.   return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,  
    4.       eventHandler, jobFile,  
    5.       partition, numMapTasks, conf, taskAttemptListener,  
    6.       jobToken, credentials, clock, appContext);  
    7. }  

            生成一个ReduceTaskAttemptImpl实例,除不需要分片元数据信息taskSplitMetaInfo,和需要一个Map任务数numMapTasks外,其他与MapTaskAttemptImpl基本相同。

            TaskAttempt生成了,接下来就应该进行调度执行了。我们再折回去看看addAndScheduleAttempt()方法中,发送的TA_SCHEDULE或TA_RESCHEDULE类型的TaskAttemptEvent,其与JobImpl、TaskImpl一样,是由TaskAttempt状态机负责处理的,如下所示:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 在事件TaskAttemptEventType.TA_SCHEDULE的触发下,经过RequestContainerTransition的处理,  
    2. // TaskAttempt的状态由NEW转换成UNASSIGNED  
    3. .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,  
    4.     TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))  
    5.   
    6. // 在事件TaskAttemptEventType.TA_SCHEDULE的触发下,经过RequestContainerTransition的处理,  
    7. // TaskAttempt的状态由NEW转换成UNASSIGNED  
    8. .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,  
    9.     TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))  
    10. // 上述二者的区别是RequestContainerTransition传入的标志位rescheduled,前者为false,后者为true  

            在事件TaskAttemptEventType.TA_SCHEDULE的触发下,经过RequestContainerTransition的处理,TaskAttempt的状态由NEW转换成UNASSIGNED;在事件TaskAttemptEventType.TA_SCHEDULE的触发下,经过RequestContainerTransition的处理,TaskAttempt的状态由NEW转换成UNASSIGNED;上述二者的区别是RequestContainerTransition传入的标志位rescheduled,前者为false,后者为true。

            我们再看下RequestContainerTransition的实现,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. @SuppressWarnings("unchecked")  
    2.   @Override  
    3.   public void transition(TaskAttemptImpl taskAttempt,   
    4.       TaskAttemptEvent event) {  
    5.     // Tell any speculator that we're requesting a container  
    6.       
    7.     // taskAttempt的事件处理器eventHandler处理SpeculatorEvent事件,告诉所有的speculator,此时正在申请一个容器  
    8.     taskAttempt.eventHandler.handle  
    9.         (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));  
    10.     //request for container  
    11.       
    12.     // 申请容器  
    13.     if (rescheduled) {// Task的Attempt重新调度  
    14.         
    15.     // 构造容器申请事件ContainerRequestEvent,并交由taskAttempt的事件处理器eventHandler处理,  
    16.     // 这个eventHandler实际上是MRAppMaster中的dispatcher,依次经过TaskImpl、TaskAttemptImpl的创建传递过来的,  
    17.       taskAttempt.eventHandler.handle(  
    18.           ContainerRequestEvent.createContainerRequestEventForFailedContainer(  
    19.               taskAttempt.attemptId,   
    20.               taskAttempt.resourceCapability));  
    21.     } else {// Task的Attempt第一次调度  
    22.         
    23.     // 构造容器申请事件ContainerRequestEvent,并交由taskAttempt的事件处理器eventHandler处理,  
    24.       taskAttempt.eventHandler.handle(new ContainerRequestEvent(  
    25.           taskAttempt.attemptId, taskAttempt.resourceCapability,  
    26.           taskAttempt.dataLocalHosts.toArray(  
    27.               new String[taskAttempt.dataLocalHosts.size()]),  
    28.           taskAttempt.dataLocalRacks.toArray(  
    29.               new String[taskAttempt.dataLocalRacks.size()])));  
    30.     }  
    31.       
    32.     // 两者创建的ContainerRequestEvent事件的区别是,rescheduled时,不需要考虑Node和Lock位置属性,因为此时Attempt之前已经失败过,此时应当能够以完成Attempt为首要任务,  
    33.     // 同时,两者的事件类型都是ContainerAllocator.EventType.CONTAINER_REQ,  
    34.     // MRAppMaster中的dispatcher针对该事件ContainerAllocator.EventType注册的事件处理器是LocalContainerAllocator或RMContainerAllocator  
    35.   }  

            RequestContainerTransition的transition()方法处理逻辑如下:

            1、TaskAttempt的事件处理器eventHandler处理SpeculatorEvent事件,告诉所有的speculator,此时正在申请一个容器;

            2、申请容器:

                  2.1、如果是Task的Attempt重新调度,构造容器申请事件ContainerRequestEvent,并交由taskAttempt的事件处理器eventHandler处理,这个eventHandler实际上是MRAppMaster中的dispatcher,依次经过TaskImpl、TaskAttemptImpl的创建传递过来的;

                  2.2、否则如果是Task的Attempt第一次调度,构造容器申请事件ContainerRequestEvent,并交由taskAttempt的事件处理器eventHandler处理。

            两者创建的ContainerRequestEvent事件的区别是,rescheduled时,不需要考虑Node和Lock位置属性,因为此时Attempt之前已经失败过,此时应当能够以完成Attempt为首要任务,同时,两者的事件类型都是ContainerAllocator.EventType.CONTAINER_REQ,MRAppMaster中的dispatcher针对该事件ContainerAllocator.EventType注册的事件处理器是LocalContainerAllocator或RMContainerAllocator。

            关于Yarn容器等资源申请与分配RMContainerAllocator的介绍,我会在以后的文章中为大家讲解,这里,你只需要了解其执行的大体流程即可:

            1、RMContainerAllocator首先间接继承自AbstractService,它是Hadoop中的一种服务,有服务初始化serviceInit()及服务启动serviceStart()方法要执行;

            2、RMContainerAllocator针对容器请求分配事件,是一个双重生产者-消费者模式,第一层生产者通过其handle()方法,将容器请求分配ContainerAllocatorEvent加入其内部eventQueue队列,第一层消费者通过其内部事件处理线程eventHandlingThread,不断的从事件队列eventQueue中take事件进行消费,而消费的方式是做为第二层生产者,将事件按照任务类型放入调度请求列表scheduledRequests、pendingReduces中,scheduledRequests是一个复杂的区分Map和Reduce任务的会立即被调度的请求列表,而pendingReduces只是存储等待被调度的Reduce任务请求的列表,其会根据Yarn中资源情况和Map任务完成情况确定是将事件移送至(即rampUp)scheduledRequests,还是从scheduledRequests移回Reduce任务调度请求至pendingReduces(即rampDown),而第二层的消费者则是RMContainerAllocator祖先父类RMCommunicator中的心跳线程allocatorThread,它周期性的调用heartbeat()方法,从Yarn的RM中获取可用资源,然后消费scheduledRequests列表中的请求,进行容器分配;

            3、RMContainerAllocator中,对于Map任务来说,它经历的数据结构,或者生命周期为scheduled->assigned->completed,而Reduce任务则是pending->scheduled->assigned->completed;

            4、经过一些的复杂逻辑后,包括综合判断资源情况、任务本地性、优先调度失败任务、Map任务完成比例、针对拖后退的任务进行推测执行等,无论是Map任务还是Reduce任务,最终在分配到容器Container后,都会发送一个TaskAttemptContainerAssignedEvent事件,交由TaskAttemptImpl的状态机中ContainerAssignedTransition进行处理,而其方法则最终会构造ContainerRemoteLaunchEvent事件,进行Container远程加载,在远程或本机或本进程Container中Launch任务尝试进行任务的执行。

            关于RMContainerAllocator,因为其结构、处理逻辑比较复杂,我会专门写文章进行分析,敬请期待!


            二、T_RECOVER类型任务恢复事件TaskRecoverEvent

            未完待续!敬请关注后续文章!

  • 相关阅读:
    js统一设置富文本中的图片宽度
    springboot 使用Filter
    js判断当前浏览器
    es安装ik后报错无法启动 read write
    logback Filter LevelFilter ThresholdFilter
    使用vue构建一个可视化大数据平台
    常用ES6-ES10知识点总结
    常见的web安全问题总结
    web性能优化指南
    使用node+express+mongodb实现用户注册、登录和验证功能
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556387.html
Copyright © 2011-2022 走看看