zoukankan      html  css  js  c++  java
  • yarn client中的一个BUG的修复

    org.apache.spark.deploy.yarn.Client.scala中的monitorApplication方法:

    /**
    
       * Report the state of an application until it has exited, either successfully or
    
       * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
    
       * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
    
       * or KILLED).
    
       *
    
       * @param appId ID of the application to monitor.
    
       * @param returnOnRunning Whether to also return the application state when it is RUNNING.
    
       * @param logApplicationReport Whether to log details of the application report every iteration.
    
       * @return A pair of the yarn application state and the final application state.
    
       */
    
      def monitorApplication(
    
          appId: ApplicationId,
    
          returnOnRunning: Boolean = false,
    
          logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
    
        val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
    
        var lastState: YarnApplicationState = null
    
        while (true) {
    
          Thread.sleep(interval)
    
          val report: ApplicationReport =
    
            try {
    
              getApplicationReport(appId)
    
            } catch {
    
              case e: ApplicationNotFoundException =>
    
                logError(s"Application $appId not found.")
    
                return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
    
              case NonFatal(e) =>
    
                logError(s"Failed to contact YARN for application $appId.", e)
    
                return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
    
            }
    
          val state = report.getYarnApplicationState
    
          if (logApplicationReport) {
    
            logInfo(s"Application report for $appId (state: $state)")
    
            // If DEBUG is enabled, log report details every iteration
    
            // Otherwise, log them every time the application changes state
    
            if (log.isDebugEnabled) {
    
              logDebug(formatReportDetails(report))
    
            } else if (lastState != state) {
    
              logInfo(formatReportDetails(report))
    
            }
    
          }
    
          if (lastState != state) {
    
            state match {
    
              case YarnApplicationState.RUNNING =>
    
                reportLauncherState(SparkAppHandle.State.RUNNING)
    
              case YarnApplicationState.FINISHED =>
    
    //            reportLauncherState(SparkAppHandle.State.FINISHED)
    
                report.getFinalApplicationStatus match {
    
                  case FinalApplicationStatus.FAILED =>
    
                    reportLauncherState(SparkAppHandle.State.FAILED)
    
                  case FinalApplicationStatus.KILLED =>
    
                    reportLauncherState(SparkAppHandle.State.KILLED)
    
                  case _ =>
    
                    reportLauncherState(SparkAppHandle.State.FINISHED)
    
                }
    
              case YarnApplicationState.FAILED =>
    
                reportLauncherState(SparkAppHandle.State.FAILED)
    
              case YarnApplicationState.KILLED =>
    
                reportLauncherState(SparkAppHandle.State.KILLED)
    
              case _ =>
    
            }
    
          }
    
          if (state == YarnApplicationState.FINISHED ||
    
            state == YarnApplicationState.FAILED ||
    
            state == YarnApplicationState.KILLED) {
    
            cleanupStagingDir(appId)
    
            return (state, report.getFinalApplicationStatus)
    
          }
    
          if (returnOnRunning && state == YarnApplicationState.RUNNING) {
    
            return (state, report.getFinalApplicationStatus)
    
          }
    
          lastState = state
    
        }
    
        // Never reached, but keeps compiler happy
    
        throw new SparkException("While loop is depleted! This should never happen...")
    
      }


    其中:

          if (lastState != state) {
    
            state match {
    
              case YarnApplicationState.RUNNING =>
    
                reportLauncherState(SparkAppHandle.State.RUNNING)
    
              case YarnApplicationState.FINISHED =>
    
    //            reportLauncherState(SparkAppHandle.State.FINISHED)
    
                report.getFinalApplicationStatus match {
    
                  case FinalApplicationStatus.FAILED =>
    
                    reportLauncherState(SparkAppHandle.State.FAILED)
    
                  case FinalApplicationStatus.KILLED =>
    
                    reportLauncherState(SparkAppHandle.State.KILLED)
    
                  case _ =>
    
                    reportLauncherState(SparkAppHandle.State.FINISHED)
    
                }
    
              case YarnApplicationState.FAILED =>
    
                reportLauncherState(SparkAppHandle.State.FAILED)
    
              case YarnApplicationState.KILLED =>
    
                reportLauncherState(SparkAppHandle.State.KILLED)
    
              case _ =>
    
            }
    
          }

    yarn state为finished的时候的状态细分不够明确,将原来的 reportLauncherState(SparkAppHandle.State.FAILED)注释掉,改成:

    report.getFinalApplicationStatus match {
    
                  case FinalApplicationStatus.FAILED =>
    
                    reportLauncherState(SparkAppHandle.State.FAILED)
    
                  case FinalApplicationStatus.KILLED =>
    
                    reportLauncherState(SparkAppHandle.State.KILLED)
    
                  case _ =>
    
                    reportLauncherState(SparkAppHandle.State.FINISHED)
    
                }

    因为完成状态的final state可能很多种状态,KILLED、FAILED、SUCCESS都可能是final state。
    如果只返回一个finished状态给SparkLauncher的SparkAppHandle的话,其实我们在自己的代码中是无法知道这个spark 任务到底是成功了还是失败了,只知道它完成了。
    所以要细分一下完成状态,自己用SparkLauncher提交JOB的时候可以监控JOB在失败的时候报警。
    此BUG在spark1.6.0中存在对应CDH5.7到CDH5.9的spark都有这个问题,新的版本中已经修复此BUG。
    如果在使用CDH版本的spark,那么就自己改一下代码重新编译打包一下,部署一个自己的spark on yarn服务吧。

  • 相关阅读:
    2018-6-2_《JS操作数组(纯洁方法)》
    Centos7 xfs分区格式化挂载
    centos 常用命令集锦
    docker1.12在cento7里的组建swarm (一)
    centos7线程、文件打开数等调优日志(非优化案例、仅仅是个个人记录、为把相关配置文件记录一下)
    Centos7.2 新镜像、系统到手 更新清理 并且安装docker1.2以后版本 目前内容适合docker 1.7.x ce(社区版)
    程序员新手 0年份等级 指导(一) 开发人员IT架构总览
    docker 土法制作zookeeper镜像 并且搭建集群 基于centos7.2
    centos 删除多余的内核启动项
    docker1.12在cento7里的组件swarm (二)
  • 原文地址:https://www.cnblogs.com/itboys/p/9998622.html
Copyright © 2011-2022 走看看