之前也介绍过使用yarn api来submit spark任务,通过提交接口返回applicationId的用法,具体参考《Spark2.3(四十):如何使用java通过yarn api调度spark app,并根据appId监控任务,关闭任务,获取任务日志》;
但是我更喜欢使用该篇文章中介绍的使用java来调用spark-submit.sh shell提交任务,并从spark-sbumit.sh执行界面获取applicationId的方案。使用hadoop api方式需要配置好环境,以及根据hadoop版本不同,需要引入不通包。
用java调用shell使用说明:
用java调用shell,使用
1 Process p=Runtime.getRuntime().exec(String[] cmd);
Runtime.exec()方法将产生一个本地的进程,并返回一个Process子类的实例,该实例可用于控制进程或取得进程的相关信息。
由于调用Runtime.exec方法所创建的子进程没有自己的终端或控制台,因此该子进程的标准IO(如stdin,stdou,stderr)都通过
1 p.getOutputStream(), 2 p.getInputStream(), 3 p.getErrorStream()
方法重定向给它的父进程了.用户需要用这些stream来向 子进程输入数据或获取子进程的输出。
例如:Runtime.getRuntime().exec("ls")
- 另外需要关心的是Runtime.getRuntime().exec()中产生停滞(阻塞,blocking)的问题?
因为Runtime.getRuntime().exec()要自己去处理stdout和stderr的输出,就是说,执行的结果不知道是现有错误输出(stderr),还是现有标准输出(stdout)。你无法判断到底那个先输出,所以可能无法读取输出,而一直阻塞。
例如:你先处理标准输出(stdout),但是处理的结果是先有错误输出(stderr),一直在等错误输出(stderr)被取走了,才到标准输出(stdout),这样就产生了阻塞。
- 解决办法:
用两个线程将标准输出(stdout)和错误输出(stderr)。
参考代码:
1 import java.util.*; 2 import java.io.*; 3 4 class StreamGobbler extends Thread 5 { 6 InputStream is; 7 String type; 8 9 StreamGobbler(InputStream is, String type) 10 { 11 this.is = is; 12 this.type = type; 13 } 14 15 public void run() 16 { 17 try 18 { 19 InputStreamReader isr = new InputStreamReader(is); 20 BufferedReader br = new BufferedReader(isr); 21 String line=null; 22 while ( (line = br.readLine()) != null) 23 System.out.println(type + ">" + line); 24 } catch (IOException ioe) 25 { 26 ioe.printStackTrace(); 27 } 28 } 29 } 30 31 public class ExecRunner 32 { 33 public static void main(String args[]) 34 { 35 if (args.length < 1) 36 { 37 System.out.println("USAGE: java GoodWindowsExec <cmd>"); 38 System.exit(1); 39 } 40 41 try 42 { 43 String osName = System.getProperty("os.name" ); 44 String[] cmd = new String[3]; 45 if( osName.equals( "Windows NT" ) ) 46 { 47 cmd[0] = "cmd.exe" ; 48 cmd[1] = "/C" ; 49 cmd[2] = args[0]; 50 } 51 else if( osName.equals( "Windows 95" ) ) 52 { 53 cmd[0] = "command.com" ; 54 cmd[1] = "/C" ; 55 cmd[2] = args[0]; 56 } else { 57 StringTokenizer st = new StringTokenizer(command, " "); 58 cmd = new String[st.countTokens()]; 59 int token = 0; 60 while (st.hasMoreTokens()) { 61 String tokenString = st.nextToken(); 62 // System.out.println(tokenString); 63 cmd[token++] = tokenString; 64 } 65 } 66 67 Runtime rt = Runtime.getRuntime(); 68 System.out.println("Execing " + cmd[0] + " " + cmd[1] 69 + " " + cmd[2]); 70 Process proc = rt.exec(cmd); 71 // any error message? 72 StreamGobbler errorGobbler = new 73 StreamGobbler(proc.getErrorStream(), "ERROR"); 74 75 // any output? 76 StreamGobbler outputGobbler = new 77 StreamGobbler(proc.getInputStream(), "OUTPUT"); 78 79 // kick them off 80 errorGobbler.start(); 81 outputGobbler.start(); 82 83 // any error??? 84 int exitVal = proc.waitFor(); 85 System.out.println("ExitValue: " + exitVal); 86 } catch (Throwable t) 87 { 88 t.printStackTrace(); 89 } 90 } 91 }
使用JAVA调用spark-submit.sh实现
spark-submit提交脚本submit_test.sh
#/bin/sh jarspath='' for file in `ls /home/dx/djj/sparkjars/*.jar` do jarspath=${file},$jarspath done jarspath=${jarspath%?} echo $jarspath /home1/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/bin/spark-submit --master yarn --deploy-mode cluster --class com.dx.test.BroadcastTest --properties-file ./conf/spark-properties-mrs.conf --jars $jarspath --num-executors 10 --executor-memory 3G --executor-cores 1 --driver-memory 2G --driver-java-options "-XX:+TraceClassPaths" ./test.jar $1 $2 $3 $4
注意:yarn的提交方式测试时,需要修改--deploy-mode参数:
cluster方式:--deploy-mode cluster
client 方式:--deploy-mode client
我们如果需要从spark-submit中获取到applicationId,就需要从spark-submit执行打印结果(也就是Process对象的标准输出、错误输出)过滤出applicationId,如果用过spark-submit.sh提交spark任务的话,你会发现执行时,在打印界面上会输出applicationId。
- yarn的client方式(--deploy-mode client)时,执行spark-submit.sh提交任务打印applicationid的位置:
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@215a34b4{/static,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e380628{/,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1eaf1e62{/api,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@652ab8d9{/jobs/job/kill,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51e0301d{/stages/stage/kill,null,AVAILABLE,@Spark} 19/04/02 11:38:31 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032 [Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar] [Opened /usr/java/jdk1.8.0_152/jre/lib/charsets.jar] 19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
- yarn的cluster方式(--deploy-mode cluster)时,执行spark-submit.sh提交任务打印applicationid的位置:
19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING) 19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
过滤applicationId函数实现如下:
/** * @param line stdin,stderr的一行信息。 * */ private String filterApplicationId(String line, boolean isCluster) { String applicationId = null; line = line.toLowerCase(); // --deploy-mode client // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051 // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781 boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1; // --deploy-mode cluster // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED) // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING) boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1; boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) { if (isIndexSparkOwnLog && false == isCluster) { int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()); applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length()); } else if (isIndexSparkOwn2Log && true == isCluster) { int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()); applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length()); if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) { applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), ""); } } } if (applicationId != null && applicationId.startsWith("application_")) { System.out.println("====================================Index of applicationId:" + applicationId); System.out.println("====================================Index of applicationId:Complete ..."); } return applicationId; }
如果过滤成功,就反回applicationId,过滤不到返回null。
对stdin,stderr Stream进行接收的线程定义:
class StreamFilterTask implements Callable<String> { private InputStream inputStream; private ConcurrentLinkedQueue<String> queue; private String streamType = null; private boolean isCluster; private StreamFilterTask() { } public StreamFilterTask(InputStream inputStream, ConcurrentLinkedQueue<String> queue, String streamType, boolean isCluster) { this.inputStream = inputStream; this.queue = queue; this.streamType = streamType; this.isCluster = isCluster; } @Override public String call() throws Exception { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(inputStream)); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); // 维护队列为最近1000条记录,超过就删除。 // size() 是要遍历一遍集合的,所以尽量要避免用size而改用isEmpty(). if (this.streamType.equalsIgnoreCase("error")) { if (queue.size() > 1000) { // 检索并删除此队列的头,如果此队列为空,则返回空值。 queue.poll(); } // 在该队列的尾部插入指定的元素。由于队列未绑定,因此此方法永远不会返回false。 queue.offer(line); } String applicationId = filterApplicationId(line, isCluster); if (applicationId != null && applicationId.startsWith("application_")) { return applicationId; } } } catch (IOException ioe) { ioe.printStackTrace(); } finally { if (br != null) { try { br.close(); } catch (Exception e) { e.printStackTrace(); } } } return null; } /** * @param line stdin,stderr的一行信息。 * */ private String filterApplicationId(String line, boolean isCluster) { String applicationId = null; line = line.toLowerCase(); // --deploy-mode client // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051 // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781 boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1; // --deploy-mode cluster // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED) // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING) boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1; boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) { if (isIndexSparkOwnLog && false == isCluster) { int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()); applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length()); } else if (isIndexSparkOwn2Log && true == isCluster) { int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()); applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length()); if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) { applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), ""); } } } if (applicationId != null && applicationId.startsWith("application_")) { System.out.println("====================================Index of applicationId:" + applicationId); System.out.println("====================================Index of applicationId:Complete ..."); } return applicationId; } }
SubmitSpark类定义:
该类使用Porcess来处理脚本,通过获取Process对象的stdin,stderr过滤applicationId,通过Process.waitFro(tiimeout,TimeUnit)来控制最大允许等待时间。
class SubmitSpark { public String submit(String filePath, long timeoutMinutes, String charsetName) { String applicatioId = null; String command = filePath; boolean isCluster = false; BufferedReader bufferedReader = null; try { bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), charsetName)); String line = null; while ((line = bufferedReader.readLine()) != null) { if (line.replace(" ", " ").toLowerCase().indexOf("--deploy-mode cluster") != -1) { isCluster = true; break; } } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (bufferedReader != null) { try { bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } } StringTokenizer st = new StringTokenizer(command, " "); String[] cmd = new String[st.countTokens()]; int token = 0; while (st.hasMoreTokens()) { String tokenString = st.nextToken(); cmd[token++] = tokenString; } Runtime rt = Runtime.getRuntime(); System.out.println("Execing " + command); Process proc = null; try { proc = rt.exec(cmd); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); ExecutorService executor = Executors.newFixedThreadPool(2); // 使用future存储子线程执行后返回结果,必须在所有子线程都完成后才可以使用get(); // 如果在这里使用get(),会造成等待同步。 // any output? Future<String> futureInput = executor.submit(new StreamFilterTask(proc.getInputStream(), queue, "input", isCluster)); // any error message? Future<String> futureError = executor.submit(new StreamFilterTask(proc.getErrorStream(), queue, "error", isCluster)); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);"); // any error??? boolean exitVal = proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); System.out.println("exitVal:" + exitVal); proc.destroyForcibly(); System.out.println("proc.isAlive():" + proc.isAlive()); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);"); // applicationId不管--deploy-mode是cluster,还是client方式,applicationId信息都从getErrorStream中读取出来,因此只要能提交成功,就返回,除非资源不足,一直到超时失败为止。 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureError.get();:"); if (futureError.get() != null) { applicatioId = futureError.get(); } System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureError.get();:" + applicatioId); // 如果是cluster方式,会阻塞,因此不能打开该段代码 // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取applicatioId = futureInput.get();:"); // if (futureInput.get() != null) { // applicatioId = futureInput.get(); // } // kill process进程 // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取applicatioId = futureInput.get();:" // + applicatioId); // // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取process Id"); // long pid = -1; // try { // Class<?> clazz = Class.forName("java.lang.UNIXProcess"); // Field field = clazz.getDeclaredField("pid"); // field.setAccessible(true); // pid = (Integer) field.get(proc); // } catch (Throwable e) { // e.printStackTrace(); // } // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取process Id:" // + pid); // // System.out.println("proc.isAlive():" + proc.isAlive()); // String[] killCmd = { "sh", "-c", "kill -9 " + pid }; // Runtime.getRuntime().exec(killCmd).waitFor(); System.out.println("Complete:" + applicatioId); } catch (Throwable t) { t.printStackTrace(); } finally { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",开始获取if (proc != null && proc.isAlive())"); if (proc != null && proc.isAlive()) { proc.destroyForcibly(); System.out.println("proc.isAlive():" + proc.isAlive()); } System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",结束获取if (proc != null && proc.isAlive())"); } return applicatioId; } }
注意:
1)过期时间不能太短,太短会造成结果:任务还未提交到yarn就结束,导致任务提交还未提交就被结束问题,无法返回applicationId。
2)上边的SparkSubmit函数即使返回了applicatioId后,java -cp test.jar com.dx.test.Submit该java执行spark-submit.sh shell的程序都不会退出,实际上process的stdin,stderr还在打开中;
3)即使打开上边的kill process的代码,stdin,stderr被关闭依然无法让java -cp test.jar com.dx.test.Submit程序退出。打开kill process代码吧process对象给关闭后,(只要已经将spark任务提交到了yarn上)程序会catch到stdin,stderr的错误(在人为关闭java执行shell提交spark程序之前,yarn client方式式yarn上的spark程序也不会退出,yarn cluster一旦提交到yarn关闭java程序也无法关闭yarn上的spark程序)但yarn上的spark程序不会被关闭。因此,这个process代码可有可无。
测试:
package com.dx.test public class Submit { public static void main(String[] args) { String filePath = "./submit_test.sh"; String charsetName = "utf-8"; long timeoutMinutes = 5; SubmitSpark submitSpark = new SubmitSpark(); String applicationId = submitSpark.submit(filePath, timeoutMinutes, charsetName); System.out.println("return the applicationId:" + applicationId); } }
超时时间设置为2minutes
- yarn --deploy-mode client时,执行会出现以下问题:此时超时时间设置为2 minutes
19/04/02 10:54:49 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032 [Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar] exitVal:false proc.isAlive():false 2019-04-02 10:56:38,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 10:56:38,开始获取applicationId: java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-02 10:56:52,结束获取applicationId:null 2019-04-02 10:56:52,开始获取process Id 2019-04-02 10:56:52,结束获取process Id:13994 proc.isAlive():false Complete:null 2019-04-02 10:56:52,开始获取if (proc != null && proc.isAlive()) 2019-04-02 10:56:52,结束获取if (proc != null && proc.isAlive()) return the applicationId:null
备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。
1)获取不到applicationId,但是此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,此时yarn上的spark程序会被关闭。
- yarn --deploy-mode cluster时,执行会出现以下问题:此时超时时间设置为2 minutes
19/04/02 10:57:00 INFO yarn.Client: Uploading resource file:/home1/boco/duanjiajun/sparkjars/bcprov-jdk15on-1.52.jar -> hdfs://vm10.60.0.11.com.cn:8020/user/boco/.sparkStaging/application_1548381669007_0816/bcprov-jdk15on-1.52.jar exitVal:false proc.isAlive():false 2019-04-02 10:57:01,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 10:57:01,开始获取applicationId: java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-02 10:57:01,结束获取applicationId:null 2019-04-02 10:57:01,开始获取process Id 2019-04-02 10:57:01,结束获取process Id:14359 proc.isAlive():false Complete:null 2019-04-02 10:57:01,开始获取if (proc != null && proc.isAlive()) 2019-04-02 10:57:01,结束获取if (proc != null && proc.isAlive()) return the applicationId:null
备注:上边这个测试时打开了kill proces那段代码的情况下,实际不打开kill proces这段代码测试结果也一样。
1)获取不到applicationId,且此时程序有可能已经被提交到yarn上【但这次测试打印结果可以看到,任务还未被提交到yarn就结束了】。
2)此时窗口处于阻塞状态,CTRL+c结束java -cp ./test.jar com.dx.test.Submit执行,如果已经将spark任务提交到yarn上去了,此时yarn上的spark程序不会被关闭。
设置超时时间为5 minutes
- --deploy-mode cluster方式,设置超时时间为5 minutes
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING) ====================================Index of applicationId:application_1548381669007_0828 ====================================Index of applicationId:Complete ... exitVal:false proc.isAlive():true 2019-04-02 11:42:59,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 11:42:59,开始获取applicatioId = futureError.get();: 2019-04-02 11:42:59,结束获取applicatioId = futureError.get();:application_1548381669007_0828 Complete:application_1548381669007_0828 2019-04-02 11:42:59,开始获取if (proc != null && proc.isAlive()) 2019-04-02 11:42:59,结束获取if (proc != null && proc.isAlive()) return the applicationId:application_1548381669007_0828 ^Cbash-4.1$
此时手动结束进程,不会终止yarn上的spark程序
- --deploy-mode client方式,设置超时时间为5 minutes
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829 ====================================Index of applicationId:application_1548381669007_0829 ====================================Index of applicationId:Complete ... the value is :86 root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) |-- int_id: long (nullable = true) root |-- int_id: string (nullable = false) |-- job_result: string (nullable = true) Query started: a82ad759-8b14-4d58-93a3-8bed7617dd9c ------------------------------------------- Batch: 0 ------------------------------------------- listener...application_1548381669007_0829 +------+----------+ |int_id|job_result| +------+----------+ | 0| null| | 1| 1,86| | 2| 2,86| | 3| 3,86| | 4| 4,86| | 5| 5,86| | 6| 6,86| | 7| 7,86| | 8| 8,86| | 9| 9,86| | 10| 10,86| | 11| null| | 12| null| | 13| null| | 14| null| | 0| null| | 1| 1,86| | 2| 2,86| | 3| 3,86| | 4| 4,86| +------+----------+ only showing top 20 rows 。。。 listener...application_1548381669007_0829 Query made progress: { "id" : "a82ad759-8b14-4d58-93a3-8bed7617dd9c", "runId" : "a53447f1-056e-4d84-b27e-7007829bc1e2", "name" : null, "timestamp" : "2019-04-02T03:43:10.001Z", "batchId" : 9, "numInputRows" : 1000, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 1584.7860538827258, "durationMs" : { "addBatch" : 417, "getBatch" : 21, "getOffset" : 0, "queryPlanning" : 38, "triggerExecution" : 631, "walCommit" : 154 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=64]", "startOffset" : 107, "endOffset" : 117, "numInputRows" : 1000, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 1584.7860538827258 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@58975f19" } } the value is :83 Trigger accumulator value: 10 Load count accumulator value: 11 exitVal:false proc.isAlive():false 2019-04-02 11:43:19,结束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 11:43:19,开始获取applicatioId = futureError.get();: 2019-04-02 11:43:19,结束获取applicatioId = futureError.get();:application_1548381669007_0829 Complete:application_1548381669007_0829 2019-04-02 11:43:19,开始获取if (proc != null && proc.isAlive()) 2019-04-02 11:43:19,结束获取if (proc != null && proc.isAlive()) return the applicationId:application_1548381669007_0829 java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:162) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) #这里这个错误被程序catch的错误,打印出来的错误,不会导致程序中心。 ^Cbash-4.1$
此时手动结束进程,将会终止yarn上的spark程序