zoukankan      html  css  js  c++  java
  • sparksql parquet 合并元数据

    java

     1 public class ParquetMergeSchema {
     2     private static SparkConf conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local");
     3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     4     private static SparkSession session = new SparkSession(jsc.sc());
     5 
     6     public static void main(String[] args) {
     7         JavaRDD<Tuple2<String, Object>> rdd1 = jsc.parallelize(
     8                 Arrays.asList(new Tuple2<String, Object>("jack", 21), new Tuple2<String, Object>("lucy", 20)));
     9 
    10         JavaRDD<Row> row1 = rdd1.map(new Function<Tuple2<String, Object>, Row>() {
    11 
    12             private static final long serialVersionUID = 1L;
    13 
    14             @Override
    15             public Row call(Tuple2<String, Object> v1) throws Exception {
    16                 return RowFactory.create(v1._1, v1._2);
    17             }
    18         });
    19 
    20         JavaRDD<Tuple2<String, Object>> rdd2 = jsc.parallelize(
    21                 Arrays.asList(new Tuple2<String, Object>("jack", "A"), new Tuple2<String, Object>("yeye", "B")));
    22 
    23         JavaRDD<Row> row2 = rdd2.map(new Function<Tuple2<String, Object>, Row>() {
    24 
    25             private static final long serialVersionUID = 1L;
    26 
    27             @Override
    28             public Row call(Tuple2<String, Object> v1) throws Exception {
    29                 return RowFactory.create(v1._1, v1._2);
    30             }
    31         });
    32 
    33         StructType schema1 = DataTypes
    34                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
    35                         DataTypes.createStructField("age", DataTypes.IntegerType, false)));
    36 
    37         StructType schema2 = DataTypes
    38                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
    39                         DataTypes.createStructField("grade", DataTypes.StringType, false)
    40 
    41                 ));
    42 
    43         // 将rdd转成dataset
    44         Dataset<Row> ds1 = session.createDataFrame(row1, schema1);
    45 
    46         Dataset<Row> ds2 = session.createDataFrame(row2, schema2);
    47 
    48         // 保存为parquet文件
    49         ds1.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
    50         ds2.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
    51 
    52         // 指定parquet文件的目录进行读取,设置mergeSchema为true进行合并
    53         Dataset<Row> dataset = session.read().option("mergeSchema", true)
    54                 .load("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
    55 
    56         dataset.printSchema();
    57         dataset.show();
    58 
    59         session.stop();
    60         jsc.close();
    61 
    62     }
    63 }

    scala

     1 object ParquetMergeSchema {
     2   def main(args: Array[String]): Unit = {
     3     val conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local")
     4     val sc = new SparkContext(conf)
     5     val sqlContext = new SQLContext(sc)
     6 
     7     val rdd1 = sc.parallelize(Array(("jack", 18), ("tele", 20)), 2).map(tuple => { Row(tuple._1, tuple._2) })
     8     val rdd2 = sc.parallelize(Array(("tele", "A"), ("wyc", "A"), ("yeye", "C")), 2).map(tuple => { Row(tuple._1, tuple._2) })
     9 
    10     //schema
    11     val schema1 = DataTypes.createStructType(Array(
    12       StructField("name", DataTypes.StringType, false),
    13       StructField("age", DataTypes.IntegerType, false)))
    14 
    15     val schema2 = DataTypes.createStructType(Array(
    16       StructField("name", DataTypes.StringType, false),
    17       StructField("grade", DataTypes.StringType, false)))
    18 
    19     //转换
    20     val df1 = sqlContext.createDataFrame(rdd1, schema1)
    21     val df2 = sqlContext.createDataFrame(rdd2, schema2)
    22 
    23     //写出
    24     df1.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
    25     df2.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
    26 
    27     //读取进行合并
    28     val df = sqlContext.read.option("mergeSchema", true).parquet("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
    29     df.printSchema()
    30     df.show()
    31   }
    32 }
  • 相关阅读:
    进程与线程的区别与联系
    任务、进程、线程
    类、对象、方法、实例方法、类方法
    java 泛型详解
    Java总结篇系列:Java泛型
    html+css+js 实现自动滑动轮播图
    第三篇web前端面试自我介绍(刚毕业的菜鸟)
    怎么写网站的需求文档
    在phpStudy怎么配置虚拟地址
    第二篇web前端面试自我介绍(刚毕业的菜鸟)
  • 原文地址:https://www.cnblogs.com/tele-share/p/10390972.html
Copyright © 2011-2022 走看看