zoukankan      html  css  js  c++  java
  • java通过SparkSession连接spark-sql

    SparkSession配置获取客户端

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    
    public class SparkTool implements Serializable {
        private static final Logger LOGGER = LoggerFactory.getLogger(SparkTool.class);
    
        public static String appName ="root";
        private static JavaSparkContext jsc = null;
        private static SparkSession spark = null;
    
        private static void initSpark() {
            if (jsc == null || spark == null) {
    
                SparkConf  sparkConf = new SparkConf();
                sparkConf.set("spark.driver.allowMultipleContexts", "true");
                sparkConf.set("spark.eventLog.enabled", "true");
                sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
                sparkConf.set("spark.hadoop.validateOutputSpecs", "false");
                sparkConf.set("hive.mapred.supports.subdirectories", "true");
                sparkConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
    
                spark = SparkSession.builder().appName(appName).config(sparkConf).enableHiveSupport().getOrCreate();
                jsc = new JavaSparkContext(spark.sparkContext());
            }
    
        }
    
        public static JavaSparkContext getJsc() {
            if (jsc == null) {
                initSpark();
            }
            return jsc;
        }
    
        public static SparkSession getSession() {
            if (spark == null ) {
                initSpark();
            }
            return spark;
    
        }
    
    }

    通过sparkSession执行sql

     public List<TableInfo> selectTableInfoFromSpark(String abstractSql){
            List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
            TableInfo tableInfo = new TableInfo();
            SparkSession spark = SparkTool.getSession();
            Dataset<Row> dataset = spark.sql(abstractSql);
            List<Row> rowList = dataset.collectAsList();
            for(Row row : rowList){
                tableInfo.setColumnName(row.getString(1));
                tableInfo.setColumnType(row.getString(2));
                tableInfo.setColumnComment(row.getString(3));
                tableInfoList.add(tableInfo);
            }
            return tableInfoList;
        }

          java 或者scala操作spark-sql时查询出来的数据有RDD、DataFrame、DataSet三种。

         这三种数据结构关系以及转换或者解析见博客:https://www.jianshu.com/p/71003b152a84

  • 相关阅读:
    1015词法分析
    0909作业
    华为云专家来公司
    SVN与Git的优点差异比较
    eclipse快捷键大全
    Servlet,HttpServletRequest 和 HttpServletResponse
    XML
    JDBC
    XMIND快捷键
    java数据
  • 原文地址:https://www.cnblogs.com/yangcao/p/12073203.html
Copyright © 2011-2022 走看看