zoukankan      html  css  js  c++  java
  • 利用SparkLauncher在代码中调用Spark作业

    背景

    项目需要处理很多文件,而一些文件很大有几十GB,因此考虑对于这种文件,专门编写Spark程序处理,为了程序的统一处理,需要在代码中调用Spark作业来处理大文件。

    实现方案

    经过调研,发现可以使用Spark提供的SparkLauncher类进行Spark作业的提交,这个类的使用有很多参数需要注意,经过项目验证后,本文给出相对完整的使用方式以及说明

    首先项目中要添加pom依赖,注意加上自己的版本

    <dependency>
    	<groupId>org.apache.spark</groupId>
         <artifactId>spark-launcher_2.11</artifactId>
    </dependency>
    

    其次,可以把Spark作业本身的一些参数放在配置文件里,灵活修改,我这里是配置kerberos安全认证的CDH集群,Spark作业提交时使用的模式为yarn-client,主要使用到了一下配置,配置中的路径这里是作为例子随便填的,实际按照自己环境填写,另外,整个应用是在CDH客户端节点执行的。每个配置项都有说明:

    #spark application use
    #driver的日志输出
    driverLogDir=/root/test/logs/
    #kerberos认证keytab文件
    keytab=/root/test/dw_hbkal.keytab
    # keyberos认证主体
    principal=dw_hbkal
    # yarn集群上运行spark作业
    master=yarn
    # yarn-client模式
    deployMode=client
    # spark-executor个数和内存配置
    minExecutors=16
    maxExecutors=16
    executorMemory=1g
    # driver内存配置
    driverMemory=256M
    # spark-executor使用的core数量配置
    executorCores=2
    # spark作业的主类
    mainClass=com.unionpay.css.fcmp.compare.cp.spark.nonprikey.FileCompare
    # spark作业的jar包
    jarPath=/root/test/my-spark-job-1.0-SNAPSHOT.jar
    # spark作业依赖的第三方jar
    extjars=/root/test/mysql-connector-java-8.0.27.jar,/root/test/jedis-2.8.1.jar
    # CHD客户端上存放的集群配置文件,表明向哪个集群提交spark作业
    HADOOP_CONF_DIR=/root/CDH/bjc/CDH/etc/conf/hadoop-conf
    JAVA_HOME=/usr/java/jdk1.8.0_141
    SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
    # spark作业执行的yarn队列
    yarnQueue=mysparkqueue
    

    上述配置可以在代码中读取,并结合SparkLauncher一起使用,可以参看以下例子代码:

    //负责发起spark作业
    public class SparkJobService{
    
        private static final Logger logger = LoggerFactory.getLogger(SparkJobService.class);
        static Config config;
        //spark任务参数
        static String keytabPath;
        static String principal ;
        static String master;
        static String deployMode;
        static String minExecutods;
        static String maxExecutors;
        static String executorMemory;
        static String driverMemory;
        static String executorCores;
        static String mainClass;
        static String jarPath;
        static String extjars;
        static String yarnQueue;
        static String HADOOP_CONF_DIR;
        static String JAVA_HOME;
        static String SPARK_HOME;
        static String driverLogDir;
    
        static {
            config = new Config("job.properties");
            keytabPath = config.getString("keytab");
            principal = config.getString("principal");
            master = config.getString("master");
            deployMode = config.getString("deployMode");
            minExecutods = config.getString("minExecutods");
            maxExecutors = config.getString("maxExecutors");
            executorMemory = config.getString("executorMemory");
            driverMemory = config.getString("driverMemory");
            executorCores = config.getString("executorCores");
            mainClass = config.getString("mainClass");
            jarPath = config.getString("jarPath");
            extjars = config.getString("extjars");
            yarnQueue = config.getString("yarnQueue");
            HADOOP_CONF_DIR=config.getString("HADOOP_CONF_DIR");
            JAVA_HOME = config.getString("JAVA_HOME");
            SPARK_HOME = config.getString("SPARK_HOME");
            driverLogDir = config.getString("driverLogDir");
        }
    
        public static void main(String[] args) {
            try{
                //spark任务设置
                //如果在系统环境变量中添加了,可以不加
                HashMap<String,String> env = new HashMap();
                env.put("HADOOP_CONF_DIR",HADOOP_CONF_DIR);
                env.put("JAVA_HOME",JAVA_HOME);
                env.put("SPARK_HOME",SPARK_HOME);
    			
    			String jobArgs1 = "test1";
    			String jobArgs2 = "test2"
    			//......
    
                SparkLauncher launcher = new SparkLauncher(env).addSparkArg("--keytab",keytabPath).addSparkArg("--principal",principal).setMaster(master).setDeployMode(deployMode)
                        .setConf("spark.dynamicAllocation.minExecutors",minExecutods).setConf("spark.dynamicAllocation.maxExecutors",maxExecutors).setConf("spark.driver.memory",driverMemory).setConf("spark.executor.memory",executorMemory).setConf("spark.executor.cores",executorCores)
                        .setConf("spark.yarn.queue",yarnQueue)
                        .setAppResource(jarPath).setMainClass(mainClass).addAppArgs(jobArgs1,jobArgs2);
    
                //spark job中依赖jar,如mysql-connector.jar...
                for(String jarName : extjars.split(",")){
                    launcher.addJar(jarName);
                }
                launcher.setAppName("SparkJob");
                //spark本地driver日志
                launcher.redirectError(new File(driverLogDir + "spark_driver.log"));
                final String[] jobId = new String[]{""};
                //用来等待spark作业结束
                CountDownLatch latch = new CountDownLatch(1);
                SparkAppHandle sparkAppHandle = launcher.setVerbose(false).startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        SparkAppHandle.State state = sparkAppHandle.getState();
                        switch (state){
                            case SUBMITTED:
                                logger.info("提交spark作业成功");
    							//yarn上spark作业的jobId
                                jobId[0] = sparkAppHandle.getAppId();
                                break;
                            case FINISHED:
                                logger.info("spark job success");
                                break;
                            case FAILED:
                            case KILLED:
                            case LOST:
                                logger.info("spark job failed");
                        }
                        if (state.isFinal())
                            latch.countDown();
                    }
    
                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                    }
                });
    			//等待Spark作业执行结束
                latch.await();
            }catch (Exception e){
                logger.error("error",e);
            }finally {
    			//...
            }
        }
    }
    

    上述代码中,尤其注意spark作业参数是怎么配置的,不同的参数使用的是不同的方法调用,一些参数使用addSparkArg方法添加,一些使用setConf添加。特别提示,如果是传给spark应用本身的参数,需要使用addAppArgs方法传递,该方法形参为变长参数。

    另外,代码中设置了spark本地driver日志路径,这样可以方便产看日志。通过SparkAppHandle的stateChanged回调函数,获得spark作业的执行状态,本例子中需要等待spark作业执行结束,因此提交作业之后,通过CountDownLatch机制来等待,在stateChanged中,当发现spark作业为结束状态,计数器减一,整个程序结束。

    以上便是一种代码中调用Spark作业的一种实现方案,有问题可以一起交流。

  • 相关阅读:
    oracle客户端plsql安装配置
    vue基础-vue-cli(vue脚手架
    ES5、6、7浅析
    webservice的使用
    使用intellj idea的hibernate生成注解实体类
    spring源码分析
    Total Eclipse(并查集)
    《大道至简》读后感
    2020年8月3日Java学习日记
    2020年8月2日Java学习日记
  • 原文地址:https://www.cnblogs.com/darange/p/15750403.html
Copyright © 2011-2022 走看看