zoukankan      html  css  js  c++  java
  • TaskTracker发送Heartbeat以及接受HeartbeatResponse

              我们通过在命令行输入start-all.sh启动hadoop服务,该脚本通过SSH运行各个节点上的TaskTracker类的main()来启动TaskTracker,它是作为一个单独的JVM来运行的。TaskTracker类实现了MapReduce模型中的TaskTracker的功能。TaskTracker的main函数如下: 

        main(){ 
        TaskTracker tt = new  TaskTracker(); 
            tt. run(); 
    .......

     }

     TaskTracker的run()方法主要调用了offerService(),其主要代码如下: 

    while (running && !shuttingDown) {//TaskTracker运行期间一直执行,每隔一段时间就想JobTracker发送一次心跳包,并接受JobTracker发送的HeartbeatResponse
          try {
          //判断是否该发送下一个心跳包
            while (remaining > 0) {
    ...
            } 
            // TaskTracker 刚启动:
            // 1. 验证buildVersion
            // 2. 为TaskTracker创建一个System Directory
            if(justInited) {
              String jobTrackerBV = jobClient.getBuildVersion();
            ...
              systemDirectory = new Path(dir);
              systemFS = systemDirectory.getFileSystem(fConf);
            }
    //每个一段时间检查一下TaskTrack的工作目录
            now = System.currentTimeMillis();
            if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
              localStorage.checkDirs();
              lastCheckDirsTime = now;
              int numFailures = localStorage.numFailures();
              // Re-init the task tracker if there were any new failures
              if (numFailures > lastNumFailures) {
                lastNumFailures = numFailures;
                return State.STALE;
              }
            }
             //发送心跳包,并向JobTracker请求Task
            HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
     ...
     //从心跳包的响应中获取它要执行的任务
            TaskTrackerAction[] actions = heartbeatResponse.getActions();
     
    //将相应的Task放入到特定的Queue
     if (actions != null){ 
              for(TaskTrackerAction action: actions) {
    //将要运行一个Task
                if (action instanceof LaunchTaskAction) {
                  addToTaskQueue((LaunchTaskAction)action);
                } else if (action instanceof CommitTaskAction) {//
                  CommitTaskAction commitAction = (CommitTaskAction)action;
                  if (!commitResponses.contains(commitAction.getTaskID())) {
                    LOG.info("Received commit task action for " + 
                              commitAction.getTaskID());
                    commitResponses.add(commitAction.getTaskID());
                  }
                } else {//待清理的Task
                  tasksToCleanup.put(action);
                }
              }
            }
    } 
  • 相关阅读:
    服务监控信息到底是“主动推送”还是“被动扫描”???
    spring boot metrics信息推送开发
    spring boot +RabbitMQ +InfluxDB+Grafara监控实践
    "敏捷革命"读书笔记
    对于搞技术的人怎样针对自己看什么书
    HBase数据库相关基本知识
    spring cloud 微服务日志跟踪 sleuth logback elk 整合
    日志收集(ElasticSearch)串联查询 MDC
    关于” 记一次logback传输日志到logstash根据自定义设置动态创建ElasticSearch索引” 这篇博客相关的优化采坑记录
    记一次logback传输日志到logstash根据自定义设置动态创建ElasticSearch索引
  • 原文地址:https://www.cnblogs.com/yueliming/p/3009869.html
Copyright © 2011-2022 走看看