zoukankan      html  css  js  c++  java
  • hadoop2 作业执行过程之map过程

    在执行MAP任务之前,先了解一下它的容器和它容器的领导:container和nodemanager


    NodeManager

    NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中的单个计算节点,包括与ResourceManager保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)

    它包含以下几大组件:

    1.NodeStatusUpdater

    当NM启动时,该组件向RM注册,并发送节点上的可用资源。接下来,NM与RM通信,汇报Container的状态更新,包括节点上正在运行的Container、已完成的Container等,此外,RM可能向NodeStatusUpdater发信号,杀死处于运行中的Container

    NodeStatusUpdater是NM和RM通信的唯一通道,周期性地调用RPC函数nodeHeartbeat()向RM汇报节点上的各种信息

    2.ContainerManager

    它是NM中的核心组件,实现类是ContainerManagerImpl。它有几个组件组成,各自负责一部分功能,意管理运行在该节点上的所有Container

    2.1RPC Server

    它负责从AM上接收RPC请求以启动Container或者停止。

    供AM使用的接口分别是:startContainer()、stopContainer()、getContainerStatus()

    2.2ResourceLocalizationService

    负责(从HDFS)安全下载(采用多线程)和组织Container需要的各种文件资源。

    2.3ContainerLauncher

    维护了一个线程池,随时准备并在必要时尽快启动Container。同时会接收来自RM或者AM的清理Container请求,清理相应进程

    2.4AuxServices

    NM提供了一个框架以通过配置附属服务扩展自己的功能,这些服务是与NM其他服务隔离开的。

    2.5ContainerMonitor

    当一个Container启动以后,该组件便开始观察它在运行过程中的资源利用。NM启动一个Container后,ContainerMonitor会将改Container进程对应的一个pid添加到监控列表中

    2.6LogHandler

    一个可插拔组件,用户通过它可以宣传将Container日志写到本地磁盘或者打包上传到一个文件系统中。

    3.ContainerExecutor

    与地产操作系统交互,安全存放Container需要的文件和目录,进而以以一种安全的方式启动或者清除Container进程

    4.NodeHealthCheckerService

    周期性地运行一个配置好的脚步检查节点的健康状况,任何系统健康方面的改变都会通知NodeStatusUpdater传递给RM

    5.Security

    5.1ApplicationACLsManager 为所有面向用户的API提供安全检查

    5.2ContainerTokenSecretManager 检查收到各种请求的合法性,确保这些请求已被RM授权

    6.WebServer

    web展示


    Container

    Container的概念

    首先它和Linux的Container完全不同。它的使用是启动AM的时候和运行Task的时候,但是它的涉及则是RM向资源调度器申请启动AM的资源时和AM向RM的资源调度器申请启动Task资源时;

    当向资源调度器申请资源时,需向它发送一个ResourceRequest列表,其中描述了一个资源单元的详细请求,而资源调度器为之返回分配的资源来描述Container。每个ResourceRequest可看做一个可序列化的Java对象

    复制代码
    message ResourceRequestProto {
      optional PriorityProto priority = 1; // 资源优先级
      optional string resource_name = 2; // 资源名称(期望资源所在的host、rack名称等)
      optional ResourceProto capability = 3; // 资源量(仅支持CPU和内存两种资源)
      optional int32 num_containers = 4; // 满足以上条件的资源个数
      optional bool relax_locality = 5 [default = true];  //是否支持本地性松弛
    }
    复制代码

    这些资源默认是本地松弛的,即申请优先级为10,资源名称为“node11”,资源量为<2GB,1CPU>的5份资源时,如果node11上没有满足要求的资源,则优先找node11同机架上其他资源,继而找其他机架

    AM收到一个或者多个Container后,再次将改Container进一步分配给内部的某个任务,一旦确定任务后,AM需将任务运行环境(包括运行命令、环境变量、依赖的外部文件等)连同Container中的资源封装到ContainerLaunchContext对象中,进而与对应的NM通信,以启动该任务

    复制代码
    message ContainerLaunchContextProto {
      repeated StringLocalResourceMapProto localResources = 1; //Container启动以来的外部资源
      optional bytes tokens = 2;
      repeated StringBytesMapProto service_data = 3;
      repeated StringStringMapProto environment = 4; //Container启动所需的环境变量
      repeated string command = 5; //Container内部运行的任务启动命令,如果是MapReduce的话,Map/Reduce Task启动命令就在该字段中
      repeated ApplicationACLMapProto application_ACLs = 6;
    }
    复制代码

    Container启动步骤:

    1.资源本地化

    在本地拷贝一份运行Container所需的所有资源(通过Distributed Cache实现);

    为Container创建经隔离的工作目录,并在这些目录中准备好所有资源;

    YARN将资源分为两类:一类是public级别的资源,放在公共目录下,由所有用户共享,另一类是private级别的资源,这类资源时用户私有的,只能在所属用户的各个作业间共享。

    2.启动Container

    启动Container是由ContainerLauncher完成的;

    3.运行Container

    由ContainerExecutor完成

    4.资源回收

    由ResourceLocalizationService服务完成,该过程与资源本地化正好相反,它负责撤销Container运行过程中使用的各种资源。


    MAP

    mapper就是在运行Container的时候执行的。主角上场。

    Map任务是一类将输入记录转换为中间格式记录集的独立任务。Mapper类中的map方法将输入键值对映射到一组中间格式的键值对集合

    Container启动以后会根据AM传过来的任务信息启动一个YarnChild进程来运行任务,YarnChild直接调用分给它的jvmTask,而jvmTask则判断是map任务还是reduce任务来分别执行MapTask和ReduceTask来运行Map过程和Reduce过程

    每个task都会使用一个进程占用一个JVM来执行,org.apache.hadoop.mapred.Child方法是具体的JVM启动类

    taskFinal.run(job, umbilical); // run the task
    复制代码
    if (taskComing) {
          boolean isMap = in.readBoolean();
          if (isMap) {
            t = new MapTask();
          } else {
            t = new ReduceTask();
          }
          t.readFields(in);
        }
    复制代码

    这里的taskFinal就是jvmTask

    自定义的Map类继承自Mapper,由MapTask的run()方法来运行

    复制代码
     1   @Override
     2   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     3     throws IOException, ClassNotFoundException, InterruptedException {
     4     this.umbilical = umbilical;
     5 
     6     if (isMapTask()) {
     7       // If there are no reducers then there won't be any sort. Hence the map 
     8       // phase will govern the entire attempt's progress.
     9       if (conf.getNumReduceTasks() == 0) {
    10         mapPhase = getProgress().addPhase("map", 1.0f);
    11       } else {
    12         // If there are reducers then the entire attempt's progress will be 
    13         // split between the map phase (67%) and the sort phase (33%).
    14         mapPhase = getProgress().addPhase("map", 0.667f);
    15         sortPhase  = getProgress().addPhase("sort", 0.333f);
    16       }
    17     }
    18     TaskReporter reporter = startReporter(umbilical);
    19  
    20     boolean useNewApi = job.getUseNewMapper();
    21     initialize(job, getJobID(), reporter, useNewApi);
    22 
    23     // check if it is a cleanupJobTask
    24     if (jobCleanup) {
    25       runJobCleanupTask(umbilical, reporter);
    26       return;
    27     }
    28     if (jobSetup) {
    29       runJobSetupTask(umbilical, reporter);
    30       return;
    31     }
    32     if (taskCleanup) {
    33       runTaskCleanupTask(umbilical, reporter);
    34       return;
    35     }
    36 
    37     if (useNewApi) {
    38       runNewMapper(job, splitMetaInfo, umbilical, reporter);
    39     } else {
    40       runOldMapper(job, splitMetaInfo, umbilical, reporter);
    41     }
    42     done(umbilical, reporter);
    43   }
    复制代码

    MapTask先判断是否有Reduce任务,如果没有的话,Map任务结束则整个提交的作业结束;如果有的话,当Map任务完成的时候设置当前进度为66.7%,Sort完成的时候设置进度为33.3%;

    之后启动TaskReporter,用于更新当前状态;

    之后初始化任务,设置当前任务的状态为RUNNING,设置输出目录等;

    之后判断任务是不是jobCleanup任务、jobSetup任务、taskCleanup任务,并做相应的处理;

    之后判断使用新旧哪套API,因为MapTask要兼容两套API;

    确定以后调用runNewMapper方法,执行具体的map;

    作业完成以后调用done方法,进行任务的清理、计数器的更新、任务状态更新等;


    hadoop2的话是使用runNewMapper()

    复制代码
     1   @SuppressWarnings("unchecked")
     2   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
     3   void runNewMapper(final JobConf job,
     4                     final TaskSplitIndex splitIndex,
     5                     final TaskUmbilicalProtocol umbilical,
     6                     TaskReporter reporter
     7                     ) throws IOException, ClassNotFoundException,
     8                              InterruptedException {
     9     // make a task context so we can get the classes
    10     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    11       new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
    12                                                                   getTaskID(),
    13                                                                   reporter);
    14     // make a mapper
    15     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    16       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    17         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    18     // make the input format
    19     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    20       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
    21         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    22     // rebuild the input split
    23     org.apache.hadoop.mapreduce.InputSplit split = null;
    24     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
    25         splitIndex.getStartOffset());
    26     LOG.info("Processing split: " + split);
    27 
    28     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    29       new NewTrackingRecordReader<INKEY,INVALUE>
    30         (split, inputFormat, reporter, taskContext);
    31     
    32     job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    33     org.apache.hadoop.mapreduce.RecordWriter output = null;
    34     
    35     // get an output object
    36     if (job.getNumReduceTasks() == 0) {
    37       output = 
    38         new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    39     } else {
    40       output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    41     }
    42 
    43     org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    44     mapContext = 
    45       new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
    46           input, output, 
    47           committer, 
    48           reporter, split);
    49 
    50     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
    51         mapperContext = 
    52           new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
    53               mapContext);
    54 
    55     try {
    56       input.initialize(split, mapperContext);
    57       mapper.run(mapperContext);
    58       mapPhase.complete();
    59       setPhase(TaskStatus.Phase.SORT);
    60       statusUpdate(umbilical);
    61       input.close();
    62       input = null;
    63       output.close(mapperContext);
    64       output = null;
    65     } finally {
    66       closeQuietly(input);
    67       closeQuietly(output, mapperContext);
    68     }
    69   }
    复制代码

    它的执行过程是:

    1.获取配置信息类对象TaskAttemptContextImplement、自己开发的Mapper实例、用户指定的InputFormat对象(默认是TextInputFormat)、任务对应的分片信息split;

    2.根据inputFormat构建一个NewTrackingRecordReader对象,这个对象中的RecordReader<K,V> real是LineRecordReader,用于读取分片中的内容,传递给Mapper的map方法处理;

    3.执行Mapper中的setup方法;

    4.循环执行map方法;

    5.执行cleanup方法;

    6.最后是输出流的关闭output.close(mapperContext),该方法会执行MapOutputBuffer.flush()操作,将剩余数据也通过sortAndSpill()方法写入本地文件,并在最后调用mergeParts()方法合并所有的spill文件。


    关于spill,spill是map中比较重要的设计

    spill过程包括输出、排序、溢写、合并等步骤;

    每个Map任务不断的以<key,value>对的形式把数据输出到内存中构造一个环形的数据结构。这个数据结构其实是一个字节数组,叫kvbuffer,这里面不仅有<Key,Value>数据,还有索引数据,并且给放置索引数据的区域起了一个kvmeta的别名

    当这个缓冲区满足一定条件后就会对缓冲区kvbuffer中的数据进行排序,先按分区编号partition进行升序,然后按照key进行升序。这样快速排序后数据以分区为单位聚集在一起,且同一分区的所有数据按照key有序。然后通过sortAndSpill方法写到本地文件和索引文件;如果有combiner,spill之前也会做一次聚集操作,等数据跑完通过归并合并所有spill文件和索引文件。

    Map阶段的结果都会存储在本地种(如果有reducer的话),非HDFS。

     转自:https://www.cnblogs.com/admln/p/hadoop2-work-excute-map.html

  • 相关阅读:
    数据安全:1.98亿购车者的购车记录公之于众
    【作者面对面问答】包邮送《Redis 5设计与源码分析》5本
    关于区块链学习资料
    《机器学习》周志华西瓜书学习笔记(九):聚类
    Bulletproof零知识证明
    数据接收合并
    数据接收合并
    数据接收合并
    数据接收合并
    Tomcat 8 Host-Manager配置访问的方法,全网唯一正确配置
  • 原文地址:https://www.cnblogs.com/javalinux/p/15055087.html
Copyright © 2011-2022 走看看