zoukankan      html  css  js  c++  java
  • 利用SparkLauncher 类以JAVA API 编程的方式提交Spark job

    一.环境说明和使用软件的版本说明:

    hadoop-version:hadoop-2.9.0.tar.gz 

    spark-version:spark-2.2.0-bin-hadoop2.7.tgz

    java-version:jdk1.8.0_151

    集群环境:单机伪分布式环境。

    二.适用背景

     在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:

    ./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3  ../spark-demo.jar

    参数含义就不解释了,请参考官网资料。

    第二种提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

    三.文章的目地

    官网已有demo和API的情况下写这篇文章的目地:官网给出的demo 放在本机跑不了。出现的现象是程序结束了,什么输出都没有或者输出JAVA_HOME is not set,虽然我调用方法设置了,然而没啥用,因此把我搜索和加上在自己思考后能够运行的demo记录下来。

    四.相关demo

       根据官网的示例这里有两种方式:

    第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

    package com.learn.spark;
     
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.concurrent.CountDownLatch;
     
    public class LanuncherAppV {
        public static void main(String[] args) throws IOException, InterruptedException {
     
     
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
            env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
            //可以不设置
            //env.put("YARN_CONF_DIR","");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
            SparkAppHandle handle = new SparkLauncher(env)
                    .setSparkHome("/usr/local/spark")
                    .setAppResource("/usr/local/spark/spark-demo.jar")
                    .setMainClass("com.learn.spark.SimpleApp")
                    .setMaster("yarn")
                    .setDeployMode("cluster")
                    .setConf("spark.app.id", "11222")
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.instances", "32")
                    .setConf("spark.executor.cores", "3")
                    .setConf("spark.default.parallelism", "10")
                    .setConf("spark.driver.allowMultipleContexts", "true")
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
                        @Override
                        public void stateChanged(SparkAppHandle sparkAppHandle) {
                            if (sparkAppHandle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            System.out.println("state:" + sparkAppHandle.getState().toString());
                        }
                        
                        
                        @Override
                        public void infoChanged(SparkAppHandle sparkAppHandle) {
                            System.out.println("Info:" + sparkAppHandle.getState().toString());
                        }
                    });
            System.out.println("The task is executing, please wait ....");
            //线程等待任务结束
            countDownLatch.await();
            System.out.println("The task is finished!");
     
     
        }
    }

     注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
    第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

    package com.learn.spark;
     
    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
     
    import java.io.IOException;
    import java.util.HashMap;
     
    public class LauncherApp {
     
        public static void main(String[] args) throws IOException, InterruptedException {
            
            HashMap env = new HashMap();
            //这两个属性必须设置
            env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
            env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
            //env.put("YARN_CONF_DIR","");
     
            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome("/usr/local/spark")
                    .setAppResource("/usr/local/spark/spark-demo.jar")
                    .setMainClass("com.learn.spark.SimpleApp")
                    .setMaster("yarn")
                    .setDeployMode("cluster")
                    .setConf("spark.app.id", "11222")
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.akka.frameSize", "200")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.instances", "32")
                    .setConf("spark.executor.cores", "3")
                    .setConf("spark.default.parallelism", "10")
                    .setConf("spark.driver.allowMultipleContexts","true")
                    .setVerbose(true);
     
     
             Process process =handle.launch();
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();
     
            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();
     
            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);
     
        }
    }

      使用的自定义InputStreamReaderRunnable类实现如下:

    package com.learn.spark;
     
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
     
    public class InputStreamReaderRunnable implements Runnable {
     
        private BufferedReader reader;
     
        private String name;
     
        public InputStreamReaderRunnable(InputStream is, String name) {
            this.reader = new BufferedReader(new InputStreamReader(is));
            this.name = name;
        }
     
        public void run() {
            System.out.println("InputStream " + name + ":");
            try {
                String line = reader.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = reader.readLine();
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    mysql复制那点事
    全排列问题
    56. Merge Interval
    2. Add Two Numbers
    20. Valid Parentheses
    121. Best Time to Buy and Sell Stock
    120. Triangle
    96. Unique Binary Search Trees
    91. Decode Ways
    72. Edit Distance
  • 原文地址:https://www.cnblogs.com/itboys/p/10015968.html
Copyright © 2011-2022 走看看