背景
近期在研究使用
java api
的方式来调用Spark
程序,通过句柄的方式来完成监控Job运行状态、及时杀死Job等功能。官方文档直接指出使用Java/Scala创建Job的方式——利用SparkLauncher
类。 环境:Hadoop2.6;Spark 1.6
SparkLauncher类概览
该类通过主要包含一些设置Spark Job参数相关的成员变量与函数,另外,还包括启动Spark Job的两个函数(launch()
和startApplication()
)。
其中,launch()
函数只是简单拼接spark-submit命令,并以子进程的方式启动;startApplication()
通过将命令提交给ChildProcAppHandle
,通过其创建Launch Server
与Driver
中Launch Backend
之间的连接,返回的ChildProcAppHandle
可以实现用户与程序之间实时交互的效果。
ChildProcAppHandle
类可以对提交后的job
有一些交互式操作,如获取JobIb(Application****)
、获取Job
运行状态、杀死Job
等。
综上,可以创建一个SparkLauncher,配置Spark参数,通过startApplication()
的方式提交Job
,然后监控获得作业的运行状态(State
)。
SparkLauncher launcher = new SparkLauncher();
launcher.setAppName(appName);
launcher.setMaster("yarn-cluster");
// launcher.setMaster("local");
launcher.setAppResource("wordcount.jar");
launcher.setMainClass("WordCount");
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "2g");
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g");
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "3");
//jar包路径为在整个文件系统中的路径。
launcher.addAppArgs(new String[]{"",""});//SparkApp Main args
launcher.setVerbose(true);
SparkAppHandle handle = launcher.startApplication();
while(handle.getState() != SparkAppHandle.State.FINISHED) {
Thread.sleep(1000L);
System.out.println("applicationId is: "+ handle.getAppId());
System.out.println("current state: "+ handle.getState());
// handle.stop();
}
实践过程中遇到问题
在实践时,由于SparkAppHandle
中State
有如下枚举值:
想当然认为JOB
运行成功状态就是FINISHED
,失败会是FAILED
。运行一个确实能成功运行的Job
的确没有出现问题,跟预期一样。但是,在运行一个输出路径在HDFS上已存在的JOB
时,后台得到的State
并不是一开始想当然的FAILED
,而同样是FINISHED
。于是各种查,终于在GITHUB
上找到链接,原来这个问题在Spark2.0中已经修复,在1.6中是已存在的bug。
细看代码yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
,进行对比。
Spark 1.6
Spark 2.0
很明显,Spark 2.0
中对FINISHED
的状态进行了细分,让Yarn
将更准确的State
状态信息传回。