zoukankan      html  css  js  c++  java
  • [Hadoop]

      TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息、节点资源使用信息、各任务状态等。所以信息被序列化为TaskTrackerStatus实例对象。每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的。这些操作主要位于方法transmitHeartBeat的上半部分代码:

      1 HeartbeatResponse transmitHeartBeat(long now) throws IOException {   
      2  // 计算是否发送任务计数器信息,间隔时间为${COUNTER_UPDATE_INTERVAL}对应的值为60s,不支持配置
      3     boolean sendCounters;
      4     if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
      5       sendCounters = true;
      6       previousUpdate = now;
      7     }
      8     else {
      9       sendCounters = false;
     10     }
     11 
     12     // 
     13     // Check if the last heartbeat got through... 
     14     // if so then build the heartbeat information for the JobTracker;
     15     // else resend the previous status information.
     16     //
     17     if (status == null) {
     18       synchronized (this) {
     19         status = new TaskTrackerStatus(taskTrackerName, localHostname, 
     20                                        httpPort, 
     21                                        cloneAndResetRunningTaskStatuses(
     22                                          sendCounters), 
     23                                        failures, 
     24                                        maxMapSlots,
     25                                        maxReduceSlots); 
     26       }
     27     } else {
     28       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
     29                "' with reponseId '" + heartbeatResponseId);
     30     }
     31       
     32     //
     33     // Check if we should ask for a new Task
     34     // 计算节点资源使用信息
     35     boolean askForNewTask;
     36     long localMinSpaceStart;
     37     synchronized (this) {
     38       askForNewTask = 
     39         ((status.countOccupiedMapSlots() < maxMapSlots || 
     40           status.countOccupiedReduceSlots() < maxReduceSlots) && 
     41          acceptNewTasks); 
     42       localMinSpaceStart = minSpaceStart;
     43     }
     44     if (askForNewTask) {
     45       askForNewTask = enoughFreeSpace(localMinSpaceStart);
     46       long freeDiskSpace = getFreeSpace();
     47       long totVmem = getTotalVirtualMemoryOnTT();
     48       long totPmem = getTotalPhysicalMemoryOnTT();
     49       long availableVmem = getAvailableVirtualMemoryOnTT();
     50       long availablePmem = getAvailablePhysicalMemoryOnTT();
     51       long cumuCpuTime = getCumulativeCpuTimeOnTT();
     52       long cpuFreq = getCpuFrequencyOnTT();
     53       int numCpu = getNumProcessorsOnTT();
     54       float cpuUsage = getCpuUsageOnTT();
     55 
     56       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
     57       status.getResourceStatus().setTotalVirtualMemory(totVmem);
     58       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
     59       status.getResourceStatus().setMapSlotMemorySizeOnTT(
     60           mapSlotMemorySizeOnTT);
     61       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
     62           reduceSlotSizeMemoryOnTT);
     63       status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
     64       status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
     65       status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
     66       status.getResourceStatus().setCpuFrequency(cpuFreq);
     67       status.getResourceStatus().setNumProcessors(numCpu);
     68       status.getResourceStatus().setCpuUsage(cpuUsage);
     69     }
     70     //add node health information 添加节点健康状态    
     71     TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
     72     synchronized (this) {
     73       if (healthChecker != null) {
     74         healthChecker.setHealthStatus(healthStatus);
     75       } else {
     76         healthStatus.setNodeHealthy(true);
     77         healthStatus.setLastReported(0L);
     78         healthStatus.setHealthReport("");
     79       }
     80     }
     81 
     82 ......
     83 ...//发送心跳报告
     84 .....
     85     synchronized (this) {
     86       for (TaskStatus taskStatus : status.getTaskReports()) {
     87         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
     88             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
     89             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
     90             !taskStatus.inTaskCleanupPhase()) {
     91           if (taskStatus.getIsMap()) {
     92             mapTotal--;
     93           } else {
     94             reduceTotal--;
     95           }
     96           myInstrumentation.completeTask(taskStatus.getTaskID());
     97           runningTasks.remove(taskStatus.getTaskID());
     98         }
     99       }
    100 
    101 .....
    102 // 其他代码
    103 }
    transmitHeartBeat

      1、创建TaskTrackerStatus实例对象status,创建代码如下:

    status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses(sendCounters), failures, maxMapSlots,maxReduceSlots); 

      创建status对象的时候参数分别是: taskTrackerName-->当前节点名称,value为{"tracker_" + localHostname + ":" + taskReportAddress},其中taskReportAddress是为task服务的监听地址。

            localHostname-->当前节点的指定的host名称,配置参数变量为slave.host.name,如果不指定该参数,那么从mapred.tasktracker.dns.interface和mapred.tasktracker.dns.nameserver指定的dns中获取,默认为本地hostname。

            httpPort-->Http监听的端口号

            cloneAndResetRunningTaskStatuses(sendCounters)-->根据是否进行任务计数器信息发送标志,clone真正运行的task状态信息

            failures-->当前节点上失败的任务次数,用于判断当前节点的完整性,当该值达到最大标准的时候,JobTracker不会再给该节点分配任务信息。

            maxMapSlots, maxReduceSlots-->该节点运行的最大slot个数。

      2、判断是否允许分配任务给该节点,这个是先通过判断当前节点的空闲slot个数,然后通过判断当前节点的磁盘剩余量来达到的。代码如下:

    askForNewTask = ((status.countOccupiedMapSlots() < maxMapSlots || status.countOccupiedReduceSlots() < maxReduceSlots) && acceptNewTasks);
    askForNewTask = enoughFreeSpace(localMinSpaceStart); // 其中localMinSpaceStart为配置中给定的${mapred.local.dir.minspacestart},默认为0

      当满足第一个条件:使用的slot个数小于总slot个数的时候,那么给JobTracker发送节点资源使用情况。当满足第二个条件的时候,允许JobTracker给当前节点分配任务。

      3、初始化资源使用情况,主要是设置一系列的磁盘、内存等资源信息等,代码如下:

          long freeDiskSpace = getFreeSpace(); // 获取剩余的磁盘大小
          long totVmem = getTotalVirtualMemoryOnTT(); // 获取总的虚拟内存
          long totPmem = getTotalPhysicalMemoryOnTT(); // 获取总的物理内存
          long availableVmem = getAvailableVirtualMemoryOnTT(); // 获取可用的虚拟内存
          long availablePmem = getAvailablePhysicalMemoryOnTT(); // 获取可用的物理内存
          long cumuCpuTime = getCumulativeCpuTimeOnTT(); // 获取累积cpu时间
          long cpuFreq = getCpuFrequencyOnTT(); // 获取cpu频率
          int numCpu = getNumProcessorsOnTT(); // 获取总的进程数
          float cpuUsage = getCpuUsageOnTT(); // 获取cpu可用比例
    
          status.getResourceStatus().setAvailableSpace(freeDiskSpace);
          status.getResourceStatus().setTotalVirtualMemory(totVmem);
          status.getResourceStatus().setTotalPhysicalMemory(totPmem);
          status.getResourceStatus().setMapSlotMemorySizeOnTT(mapSlotMemorySizeOnTT); // 设置map阶段slot允许的内存大小, ${mapred.cluster.map.memory.mb}
          status.getResourceStatus().setReduceSlotMemorySizeOnTT(reduceSlotSizeMemoryOnTT); // 设置reduce阶段slot允许的内存大小, ${mapred.cluster.reduce.memory.mb}
          status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
          status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
          status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
          status.getResourceStatus().setCpuFrequency(cpuFreq);
          status.getResourceStatus().setNumProcessors(numCpu);
          status.getResourceStatus().setCpuUsage(cpuUsage);

      4、获取当前节点的监控状态,获取当前节点的监控状态是有线程NodeHealthCheckerService来周期性的检查的,可以通过配置一个监控脚本来实现,默认为不实现。详细分析见TaskTracker源码分析(TaskTracker节点健康状况监控)

      5、发送心跳报告

      6、处理当前真正运行的Task,处理规则是:只要task不是出于运行、就绪、提交挂起或者cleanup阶段,那么就将该task设置为完成状态,从真正运行的task列表中移除,并针对该task是map阶段或者reduce阶段,分别对map/reduce solt进行操作。

      7、完成发送。

      发送的状态对象是org.apache.hadoop.mapred.TaskTrackerStatus,主要属性有:

      String trackerName; // task tracker 节点名称
      String host; // 主机名
      int httpPort; // http web监听端口
      int failures; // 在该节点上失败的task次数
      List<TaskStatus> taskReports; // 当前节点上真正运行的各个人物的状态
        
      volatile long lastSeen; // 上次汇报时间
      private int maxMapTasks; // 当前节点上允许的最大map slot个数
      private int maxReduceTasks; // 当前节点上允许的最大reduce slot个数
      private TaskTrackerHealthStatus healthStatus; // 当前节点的健康状态对象
       
      public static final int UNAVAILABLE = -1; // 是否不可用
      private ResourceStatus resStatus; // 当前节点的资源对象

      其中ResourceStatus和TaskTrackerHealthStatus分别表示当前节点的资源信息和状态信息,是一个简单的model类。在这里不做分析。

      TaskStatus类全称为org.apache.hadoop.mapred.TaskStatus。主要保存当前TaskTracker上运行的所有任务的运行状态,基本属性如下:

      private final TaskAttemptID taskid; // task任务id
      private float progress; // 任务执行进度,0-1.0
      private volatile State runState; // 任务运行所处状态,详见TaskStatus.State枚举类
      private String diagnosticInfo; // 诊断信息,一般为异常信息或者错误信息
      private String stateString; // 字符串信息的运行状态
      private String taskTracker; // 所属task tracker名称
      private int numSlots; // 运行该task所需的slot个数,默认为1
        
      private long startTime; // 任务启动时间
      private long finishTime;  // 任务完成时间
      private long outputSize = -1L; // 输出数据量
        
      private volatile Phase phase = Phase.STARTING; // 任务运行阶段,详见TaskStatus.Phase枚举类 
      private Counters counters; // 该任务中定义的计数器(包括系统自带计数器和用户自定义计数器)
      private boolean includeCounters; // 是否包含计数器,计数器没个60s发送一次,也就是说每隔60s,发送的数据中包含一次计数器
      private SortedRanges.Range nextRecordRange = new SortedRanges.Range(); // 下一个要处理的数据区间,用于定位坏记录所在的空间

      ===================================

      ResourceStatus实例对象resStatus的属性是由抽象类ResourceCalculatorPlugin来获取的,如果不指定该抽象类的具体实现类,那么获取的value值全部都是-1。在linux平台上,默认实现为LinuxResourceCalculatorPlugin类。

        // 创建获取资源的对象
        Class<? extends ResourceCalculatorPlugin> clazz = fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
                null, ResourceCalculatorPlugin.class);
        resourceCalculatorPlugin = ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, fConf);

      另外,用户可以自定义该实现类,配置参数为${mapreduce.tasktracker.resourcecalculatorplugin},默认为空。获取代码如下:

    public static ResourceCalculatorPlugin getResourceCalculatorPlugin(
          Class<? extends ResourceCalculatorPlugin> clazz, Configuration conf) {
    
        if (clazz != null) {
          return ReflectionUtils.newInstance(clazz, conf); // 如果已经配置了class,那么直接使用配置的class
        }
    
        // No class given, try a os specific class
        try {
          String osName = System.getProperty("os.name"); // 获取操作系统
          if (osName.startsWith("Linux")) { // 如果是linux
            return new LinuxResourceCalculatorPlugin(); // 使用已经实现的一种
          }
        } catch (SecurityException se) {
          // Failed to get Operating System name.
          return null;
        }
    
        // Not supported on this system.
        return null;
      }

      在LinuxResourceCalculatorPlugin中,其实获取系统的资源信息都是通过读取proc虚拟文件系统中的一些信息来达成的,比如从/proc/meminfo中读取内存,从/proc/cpuinfo中读取cpu信息等。

  • 相关阅读:
    月食照片
    宾得镜头大全与发展史
    月食照片
    关于镜头系数的疑问
    经验和教训
    常用正则表达式
    12月19日
    部長面談
    周六
    异度空间
  • 原文地址:https://www.cnblogs.com/liuming1992/p/4829449.html
Copyright © 2011-2022 走看看