zoukankan      html  css  js  c++  java
  • sparksql 动态设置schema将rdd转换成dataset/dataframe

    java

     1 public class DynamicDemo {
     2     private static SparkConf conf = new SparkConf().setAppName("dynamicdemo").setMaster("local");
     3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     4     private static SparkSession session = new SparkSession(jsc.sc());
     5 
     6     public static void main(String[] args) {
     7 
     8         // 创建rdd
     9         JavaRDD<String> rdd = jsc.textFile("./src/main/java/cn/tele/spark_sql/rdd2dataset/students.txt");
    10 
    11         // 创建Row的rdd
    12         JavaRDD<Row> rowRdd = rdd.map(new Function<String, Row>() {
    13 
    14             private static final long serialVersionUID = 1L;
    15 
    16             @Override
    17             public Row call(String v1) throws Exception {
    18                 String[] fields = v1.split(",");
    19                 return RowFactory.create(Integer.valueOf(fields[0]), fields[1], Integer.valueOf(fields[2]));
    20             }
    21         });
    22 
    23         // 创建schema
    24         StructType schema = DataTypes
    25                 .createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.IntegerType, false),
    26                         DataTypes.createStructField("name", DataTypes.StringType, false),
    27                         DataTypes.createStructField("age", DataTypes.IntegerType, false)));
    28 
    29         // 转换
    30         Dataset<Row> dataset = session.createDataFrame(rowRdd, schema);
    31 
    32         dataset.createOrReplaceTempView("students");
    33 
    34         Dataset<Row> result = session.sql("select * from students where age<=18");
    35         result.show();
    36      session.stop();
    37         jsc.close();
    38     }
    39 }

    scala

     1 object DynamicDemo {
     2   def main(args: Array[String]): Unit = {
     3     val conf = new SparkConf().setAppName("reflectdemo").setMaster("local")
     4 
     5     val sc = new SparkContext(conf)
     6 
     7     val sqlContext = new SQLContext(sc)
     8 
     9     //创建rdd
    10     val rdd = sc.textFile("./src/main/scala/cn/tele/spark_sql/rdd2dataframe/students.txt", 8)
    11 
    12     val rowRdd = rdd.map(lines => {
    13       val arr = lines.split(",");
    14       Row(arr(0).trim().toInt, arr(1), arr(2).trim().toInt)
    15     })
    16 
    17     val schema = DataTypes.createStructType(Array(
    18       /*    DataTypes.createStructField("id",DataTypes.IntegerType,false),
    19           DataTypes.createStructField("name",DataTypes.StringType,false),
    20           DataTypes.createStructField("age",DataTypes.IntegerType,false)*/
    21       StructField("id", DataTypes.IntegerType, false),
    22       StructField("name", DataTypes.StringType, false),
    23       StructField("age", DataTypes.IntegerType, false)))
    24 
    25     //转换
    26     val dataframe = sqlContext.createDataFrame(rowRdd, schema)
    27 
    28     dataframe.createOrReplaceTempView("students")
    29 
    30     val result = sqlContext.sql("select * from students where age<=18")
    31     result.show()
    32   }
    33 }
  • 相关阅读:
    [MATLAB]Debut-不知名的线性变换
    [Raspberry]使用笔记
    [Algorithm]一切始于ADT-表达式计算
    [python]用request库来处理Http协议-收集北航表白墙内的数据
    [python]os库与shutil库与操作系统的交互-整理硬盘中Vivaldi的作品
    这是一篇使用Live Writer 发布的文章
    [python]re库(正则表达式)的小练习-抓取北航教务处通知列表
    [AVR]使用AVR单片机驱动舵机
    [Python]urllib库的简单应用-实现北航宿舍自动上网
    [Scrapy][转][未完成]关于scrapy命令
  • 原文地址:https://www.cnblogs.com/tele-share/p/10371158.html
Copyright © 2011-2022 走看看