zoukankan      html  css  js  c++  java
  • 如何在Java应用中提交Spark任务?

    最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话啊。于是就想改一下之前觉得最丑陋的一个地方——任务提交。

    本博客内容基于Spark2.2版本~在阅读文章并想实际操作前,请确保你有:

    1. 一台配置好Spark和yarn的服务器
    2. 支持正常spark-submit --master yarn xxxx的任务提交

    老版本

    老版本任务提交是基于 ** 启动本地进程,执行脚本spark-submit xxx ** 的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:

    appplication_时间戳_数字
    

    老版本的spark通过修改SparkConf参数spark.app.id就可以手动指定id,新版本的代码是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的,具体的实现逻辑可以参考对应的链接。

    感兴趣的同学可以看一下,生成applicaiton_id的逻辑在hadoop-yarn工程的ContainerId中定义。

    总结一句话就是,想要自定义id,甭想了!!!!

    于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?

    1. 我事先生成一个自定义的id,当做参数传递到spark应用里面;
    2. 等spark初始化后,就可以通过sparkContext取得对应的application_id以及url
    3. 然后再driver连接数据库,插入一条关联关系

    新版本

    还是归结于互联网时代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现他可以基于Java代码自动提交Spark任务。SparkLauncher支持两种模式:

    1. new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样
    2. new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器

    当然是更倾向于第二种啦,因为好处很多:

    1. 自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能
    2. 可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)
    3. 返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!

    一步一步,代码展示

    首先创建一个最基本的Spark程序:

    import org.apache.spark.sql.SparkSession;
    import java.util.ArrayList;
    import java.util.List;
    
    public class HelloWorld {
        public static void main(String[] args) throws InterruptedException {
            SparkSession spark = SparkSession
                    .builder()
                    //.master("yarn")
                    //.appName("hello-wrold")
                    //.config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
            List<Person> persons = new ArrayList<>();
    
            persons.add(new Person("zhangsan", 22, "male"));
            persons.add(new Person("lisi", 25, "male"));
            persons.add(new Person("wangwu", 23, "female"));
    
    
            spark.createDataFrame(persons, Person.class).show(false);
    
            spark.close();
    
        }
    }
    

    然后创建SparkLauncher类:

    import org.apache.spark.launcher.SparkAppHandle;
    import org.apache.spark.launcher.SparkLauncher;
    
    import java.io.IOException;
    
    public class Launcher {
        public static void main(String[] args) throws IOException {
            SparkAppHandle handler = new SparkLauncher()
                    .setAppName("hello-world")
                    .setSparkHome(args[0])
                    .setMaster(args[1])
                    .setConf("spark.driver.memory", "2g")
                    .setConf("spark.executor.memory", "1g")
                    .setConf("spark.executor.cores", "3")
                    .setAppResource("/home/xinghailong/launcher/launcher_test.jar")
                    .setMainClass("HelloWorld")
                    .addAppArgs("I come from Launcher")
                    .setDeployMode("cluster")
                    .startApplication(new SparkAppHandle.Listener(){
                        @Override
                        public void stateChanged(SparkAppHandle handle) {
                            System.out.println("**********  state  changed  **********");
                        }
    
                        @Override
                        public void infoChanged(SparkAppHandle handle) {
                            System.out.println("**********  info  changed  **********");
                        }
                    });
    
    
            while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
                System.out.println("id    "+handler.getAppId());
                System.out.println("state "+handler.getState());
    
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    然后打包工程,打包过程可以参考之前的博客:
    http://www.cnblogs.com/xing901022/p/7891867.html

    打包完成后上传到部署Spark的服务器上。由于SparkLauncher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。

    [xinghailong@hnode10 launcher]$ ls
    launcher_test.jar  spark-launcher_2.11-2.2.0.jar
    [xinghailong@hnode10 launcher]$ pwd
    /home/xinghailong/launcher
    

    由于SparkLauncher需要指定SPARK_HOME,因此如果你的机器可以执行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪

    [xinghailong@hnode10 launcher]$ which spark2-submit
    /var/lib/hadoop-hdfs/bin/spark2-submit
    

    最后几行就能看到:

    export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark
    
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    
    exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
    

    综上,我们需要的是:

    1. 一个自定义的Jar,里面包含spark应用和SparkLauncher类
    2. 一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行
    3. 一个当前目录的路径
    4. 一个SARK_HOME环境变量指定的目录

    然后执行命令启动测试:

    java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
    

    说明:

    1. -Djava.ext.dirs 设置当前目录为java类加载的目录
    2. 传入两个参数,一个是SPARK_HOME;一个是启动模式

    观察删除发现成功启动运行了:

    id    null
    state UNKNOWN
    Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    **********  state  changed  **********
    ...省略一大堆拷贝jar的日志
    **********  info  changed  **********
    **********  state  changed  **********
    Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
    ... 省略一堆重定向的日志
    application_1518263195995_37615 (state: ACCEPTED)
    id    application_1518263195995_37615
    state SUBMITTED
    Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
    **********  state  changed  **********
    ... 省略一堆重定向的日志
    INFO: 	 user: hdfs
    **********  state  changed  **********
    Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
    Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
    INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
    

    总结

    这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。

    作者:xingoo
    本文版权归作者和博客园共有。欢迎转载,但必须保留此段声明,且在文章页面明显位置给出原文连接!

     

     
  • 相关阅读:
    1144 The Missing Number (20分)
    1145 Hashing
    1146 Topological Order (25分)
    1147 Heaps (30分)
    1148 Werewolf
    1149 Dangerous Goods Packaging (25分)
    TypeReference
    Supervisor安装与配置()二
    谷粒商城ES调用(十九)
    Found interface org.elasticsearch.common.bytes.BytesReference, but class was expected
  • 原文地址:https://www.cnblogs.com/javalinux/p/15094833.html
Copyright © 2011-2022 走看看