zoukankan      html  css  js  c++  java
  • MapReduce作业运行机制

    一,作业的提交

    Job.waitForCompletion(true):

     1)此方法调用submit(). 在Submit()方法里面连接JobTracker,即生成一个内部JobSummitter(实际上是new JobClient(),在new JobClient()里面生成一个JobSubmissionProtocol接口(JobTracker实现了此接口)对象jobSubmitClient(是它连接或对应着JobTracker)),在Submit()方法里面也调用JobClient.submitJobInternal(conf)方法返回一个RunningJob(步骤1);

     2)参数true说明要调用方法jobClient.monitorAndPrintJob()即检查作业的运行情况(每秒一次),如果有变化就报告给控制台(console)。

    jobClient.submitJobInternal()所实现的提交作业过程如下:

      a) 向Jobtracker请求一个新的job ID;(步骤2)

      b) 检查作业的输出说明,如果未指定或已存在则不提交作业并抛错误给程序;

      c) 计算并生成作业的输入分片,如果路径不存在则不提交作业并抛错误给程序;

      d) 将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以job id命名的目录下(即HDFS中)。作业jar副本较多(mapred.submit.replication = 10);(步骤3)

      e) 告知jobtracker作业准备执行(真正的提交作业jobSubmitClient.submitJob())(步骤4)

    二,作业的初始化

    jobtracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象---封装任务和记录信息(步骤5)

    为了创建任务运行列表,job scheduler首先从共享文件系统中获取已计算好的输入分片信息(步骤6),然后为每个分片创建一个map任务;

    创建的reduce任务数量由Job的mapred.reduce.task属性决定(setNumReduceTasks()设置),schedule创建相应数量的reduce任务。

    任务在此时被指定ID。

    除了map和reduce任务,还有setupJob和cleanupJob需要建立:由tasktrackers在所有map开始前和所有reduce结束后分别执行,这两个方法在OutputCommitter中(默认是FileOutputCommitter)。setupJob()创建输出目录和任务的临时工作目录,cleanupJob()删除临时工作目录。

    三,任务的分配

    每个tasktracker定期发送心跳给jobtracker,告知自己还活着,并附带消息说明自己是否已准备好接受新任务。jobtracker以此来分配任务,并使用心跳的返回值与tasktracker通信(步骤7)。Jobtracker利用调度算法先选择一个job然后再选此job的一个task分配给tasktracker.

    每个tasktracker会有固定数量的map和reduce任务槽,数量有tasktracker核的数量和内存大小来决定。jobtracker会先将tasktracker的所有的map槽填满,然后才填此tasktracker的reduce任务槽。Jobtracker分配map任务时会选取与输入分片最近的tasktracker,分配reduce任务用不着考虑数据本地化。

    四,任务的执行

    用到上面提到的setupJob()

    a) tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件及运行所需要的全部文件(DistributedCache设置的)复制到tasktracker本地(步骤8);

    b) 接下来tasktracker为任务新建一个本地工作目录,并把jar文件的内容解压到这个文件夹下;

    c) tasktracker新建一个taskRunner实例来运行该任务(步骤9);

     TaskRunner启动一个新的JVM来运行每个任务(步骤10),以便客户的map/reduce不会影响tasktracker。

    五,进度和状态的更新

    一个作业和它的每个任务都有一个状态,包括:作业或任务的运行状态(running, successful, failed),map和reduce的进度,计数器值,状态消息或描述。

    Child JVM有独立的线程每隔3秒检查任务更新标志,如果有更新就会报告给此tasktracker;

    tasktracker每隔5秒给jobtracker发心跳;

    job tracker合并这些更新,产生一个表明所有运行作业及其任务状态的全局试图。

    JobClient.monitorAndPrintJob()每秒查询这些信息。

    六,作业的完成

    当jobtracker收到最后一个任务(this will be the special job cleanup task)的完成报告后,便把job状态设置为successful。

    Job得到完成信息便从waitForCompletion()返回。

    最后,jobtracker清空作业的工作状态,并指示tasktracker也清空作业的工作状态(如删除中间输出)

    (权威指南上这样说这两句红字,不矛盾吗?  应该是不同层次的的cleanup task, 第一个可能是mapreduce内部的cleanup())

  • 相关阅读:
    C#
    C#
    C#
    python——socket网络编程
    Python——面向对象
    Python——函数
    Python——列表深浅拷贝
    Python——文件操作
    多级菜单(增强版)
    Python 编码机制
  • 原文地址:https://www.cnblogs.com/liangzh/p/2568701.html
Copyright © 2011-2022 走看看