zoukankan      html  css  js  c++  java
  • 利用SparkLauncher 类以JAVA API 编程的方式提交Spark job

    一.环境说明和使用软件的版本说明:

    hadoop-version:hadoop-2.9.0.tar.gz 

    spark-version:spark-2.2.0-bin-hadoop2.7.tgz

    java-version:jdk1.8.0_151

    集群环境:单机伪分布式环境。

    二.适用背景

     在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:

    ./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3  ../spark-demo.jar

    参数含义就不解释了,请参考官网资料。

    第二种提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

    三.文章的目地

    官网已有demo和API的情况下写这篇文章的目地:官网给出的demo 放在本机跑不了。出现的现象是程序结束了,什么输出都没有或者输出JAVA_HOME is not set,虽然我调用方法设置了,然而没啥用,因此把我搜索和加上在自己思考后能够运行的demo记录下来。

    四.相关demo

       根据官网的示例这里有两种方式:

    第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

    package com.learn.spark;
     
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
     
    public class LanuncherAppV {
        public static void main(String[] args) throws IOException, InterruptedException {
     
     
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
            env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
            //可以不设置
            //env.put("YARN_CONF_DIR","");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
            SparkAppHandle handle = new SparkLauncher(env)
                    .setSparkHome("/usr/local/spark")
                    .setAppResource("/usr/local/spark/spark-demo.jar")
                    .setMainClass("com.learn.spark.SimpleApp")
                    .setMaster("yarn")
                    .setDeployMode("cluster")
                    .setConf("spark.app.id", "11222")
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.instances", "32")
                    .setConf("spark.executor.cores", "3")
                    .setConf("spark.default.parallelism", "10")
                    .setConf("spark.driver.allowMultipleContexts", "true")
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
                        @Override
                        public void stateChanged(SparkAppHandle sparkAppHandle) {
                            if (sparkAppHandle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            System.out.println("state:" + sparkAppHandle.getState().toString());
                        }
                        
                        
                        @Override
                        public void infoChanged(SparkAppHandle sparkAppHandle) {
                            System.out.println("Info:" + sparkAppHandle.getState().toString());
                        }
                    });
            System.out.println("The task is executing, please wait ....");
            //线程等待任务结束
            countDownLatch.await();
            System.out.println("The task is finished!");
     
     
        }
    }

     注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
    第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

    package com.learn.spark;
     
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
     
    public class LauncherApp {
     
        public static void main(String[] args) throws IOException, InterruptedException {
            
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
            env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
            //env.put("YARN_CONF_DIR","");
     
            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome("/usr/local/spark")
                    .setAppResource("/usr/local/spark/spark-demo.jar")
                    .setMainClass("com.learn.spark.SimpleApp")
                    .setMaster("yarn")
                    .setDeployMode("cluster")
                    .setConf("spark.app.id", "11222")
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.akka.frameSize", "200")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.instances", "32")
                    .setConf("spark.executor.cores", "3")
                    .setConf("spark.default.parallelism", "10")
                    .setConf("spark.driver.allowMultipleContexts","true")
                    .setVerbose(true);
     
     
             Process process =handle.launch();
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();
     
            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();
     
            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);
     
        }
    }

      使用的自定义InputStreamReaderRunnable类实现如下:

    package com.learn.spark;
     
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
     
    public class InputStreamReaderRunnable implements Runnable {
     
        private BufferedReader reader;
     
        private String name;
     
        public InputStreamReaderRunnable(InputStream is, String name) {
            this.reader = new BufferedReader(new InputStreamReader(is));
            this.name = name;
        }
     
        public void run() {
            System.out.println("InputStream " + name + ":");
            try {
                String line = reader.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = reader.readLine();
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    MySQL GTID复制Slave跳过错误事务Id以及复制排错问题总结
    Git基础命令整理
    原创-公司项目部署交付环境预检查shell脚本
    解决SecureCRT超时自动断开的问题
    Linux设置显示中文和设置字体
    高等代数4 线性方程组
    高等代数3 行列式
    高等代数2 向量组
    高等代数1 矩阵
    离散数学4 组合数学
  • 原文地址:https://www.cnblogs.com/itboys/p/10015968.html
Copyright © 2011-2022 走看看