org.apache.spark.deploy.yarn.Client.scala中的monitorApplication方法:
/**
* Report the state of an application until it has exited, either successfully or
* due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
* KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
* or KILLED).
*
* @param appId ID of the application to monitor.
* @param returnOnRunning Whether to also return the application state when it is RUNNING.
* @param logApplicationReport Whether to log details of the application report every iteration.
* @return A pair of the yarn application state and the final application state.
*/
def monitorApplication(
appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
val report: ApplicationReport =
try {
getApplicationReport(appId)
} catch {
case e: ApplicationNotFoundException =>
logError(s"Application $appId not found.")
return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
case NonFatal(e) =>
logError(s"Failed to contact YARN for application $appId.", e)
return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
}
val state = report.getYarnApplicationState
if (logApplicationReport) {
logInfo(s"Application report for $appId (state: $state)")
// If DEBUG is enabled, log report details every iteration
// Otherwise, log them every time the application changes state
if (log.isDebugEnabled) {
logDebug(formatReportDetails(report))
} else if (lastState != state) {
logInfo(formatReportDetails(report))
}
}
if (lastState != state) {
state match {
case YarnApplicationState.RUNNING =>
reportLauncherState(SparkAppHandle.State.RUNNING)
case YarnApplicationState.FINISHED =>
// reportLauncherState(SparkAppHandle.State.FINISHED)
report.getFinalApplicationStatus match {
case FinalApplicationStatus.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case FinalApplicationStatus.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
reportLauncherState(SparkAppHandle.State.FINISHED)
}
case YarnApplicationState.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case YarnApplicationState.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
}
}
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
cleanupStagingDir(appId)
return (state, report.getFinalApplicationStatus)
}
if (returnOnRunning && state == YarnApplicationState.RUNNING) {
return (state, report.getFinalApplicationStatus)
}
lastState = state
}
// Never reached, but keeps compiler happy
throw new SparkException("While loop is depleted! This should never happen...")
}
其中:
if (lastState != state) {
state match {
case YarnApplicationState.RUNNING =>
reportLauncherState(SparkAppHandle.State.RUNNING)
case YarnApplicationState.FINISHED =>
// reportLauncherState(SparkAppHandle.State.FINISHED)
report.getFinalApplicationStatus match {
case FinalApplicationStatus.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case FinalApplicationStatus.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
reportLauncherState(SparkAppHandle.State.FINISHED)
}
case YarnApplicationState.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case YarnApplicationState.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
}
}
yarn state为finished的时候的状态细分不够明确,将原来的 reportLauncherState(SparkAppHandle.State.FAILED)注释掉,改成:
report.getFinalApplicationStatus match {
case FinalApplicationStatus.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case FinalApplicationStatus.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
reportLauncherState(SparkAppHandle.State.FINISHED)
}
因为完成状态的final state可能很多种状态,KILLED、FAILED、SUCCESS都可能是final state。
如果只返回一个finished状态给SparkLauncher的SparkAppHandle的话,其实我们在自己的代码中是无法知道这个spark 任务到底是成功了还是失败了,只知道它完成了。
所以要细分一下完成状态,自己用SparkLauncher提交JOB的时候可以监控JOB在失败的时候报警。
此BUG在spark1.6.0中存在对应CDH5.7到CDH5.9的spark都有这个问题,新的版本中已经修复此BUG。
如果在使用CDH版本的spark,那么就自己改一下代码重新编译打包一下,部署一个自己的spark on yarn服务吧。