zoukankan      html  css  js  c++  java
  • sparksql 用反射的方式将rdd转换成dataset/dataframe

    java

     1 public class ReflectionDemo {
     2     private static SparkConf conf = new SparkConf().setAppName("reflectdemo").setMaster("local");
     3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     4 
     5     private static SparkSession session = new SparkSession(jsc.sc());
     6 
     7     public static void main(String[] args) throws AnalysisException {
     8 
     9         // rdd
    10         JavaRDD<String> rdd = jsc.textFile("./src/main/java/cn/tele/spark_sql/rdd2dataset/students.txt");
    11 
    12         // 封装rdd
    13         JavaRDD<Student> mapRdd = rdd.map(new Function<String, Student>() {
    14 
    15             private static final long serialVersionUID = 1L;
    16 
    17             @Override
    18             public Student call(String v1) throws Exception {
    19                 String[] fields = v1.split(",");
    20                 Student student = new Student(Integer.valueOf(fields[0]), fields[1], Integer.valueOf(fields[2]));
    21                 return student;
    22             }
    23         });
    24 
    25         // 通过反射的方式进行转换
    26         Dataset<Row> dataset = session.createDataFrame(mapRdd, Student.class);
    27         
    28         // 注册为临时信息表
    29         // dataset.registerTempTable("students");
    30 
    31         dataset.createOrReplaceTempView("studentsView");
    32 
    33         Dataset<Row> result = session.sql("select * from studentsView where age<=18");
    34         // result.show();
    35 
    36         // 把dataset转换成rdd
    37         JavaRDD<Student> javaRDD = result.javaRDD().map(new Function<Row, Student>() {
    38 
    39             private static final long serialVersionUID = 1L;
    40 
    41             @Override
    42             public Student call(Row v1) throws Exception {
    43                 return new Student(v1.getAs("id"), v1.getAs("name"), v1.getAs("age"));
    44             }
    45         });
    46 
    47         javaRDD.foreach(new VoidFunction<Student>() {
    48 
    49             private static final long serialVersionUID = 1L;
    50 
    51             @Override
    52             public void call(Student t) throws Exception {
    53                 System.out.println(t);
    54             }
    55         });
    56         session.stop();
    57         jsc.close();
    58     }
    59 }

    scala

     1 case class Student(val id:Int,val name:String,val age:Int)
     2 
     3 object ReflectionDemo {
     4   def main(args: Array[String]): Unit = {
     5     val conf = new SparkConf().setAppName("reflectdemo").setMaster("local")
     6 
     7     val sc = new SparkContext(conf)
     8 
     9     val sqlContext = new SQLContext(sc)
    10 
    11     //导入隐式转换
    12     import sqlContext.implicits._
    13 
    14     //创建rdd
    15     val rdd = sc.textFile("./src/main/scala/cn/tele/spark_sql/rdd2dataframe/students.txt", 2)
    16 
    17     //转换rdd为dataframe
    18     val dataframe = rdd.map(line => {
    19       val arr = line.split(",");
    20       new Student(arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt)
    21     }).toDF()
    22 
    23     //创临时视图
    24     dataframe.createOrReplaceTempView("students")
    25 
    26     //  dataframe.show()
    27 
    28     val df = sqlContext.sql("select * from students where age<=18")
    29 
    30     val newRdd = df.rdd.map(row => new Student(row.getAs[Int]("id"), row.getAs[String]("name"), row.getAs[Int]("age")))
    31 
    32     newRdd.foreach(println(_))
    33   }
    34 
    35 }
  • 相关阅读:
    CSS网站变灰
    长列表优化之滚动替换数据方案小记
    JS把数组中相同元素组合成一个新的数组问题
    yahoo CSS reset
    IE调试网页之三:使用 F12 工具控制台查看错误和状态 (Windows)
    KMP算法的JavaScript实现
    Android系统版本SDK_INT与版本对应关系
    利用jQuery的deferred异步按顺序加载JS文件
    Javascript图像处理之平滑处理
    IE调试网页之七:使用探查器工具分析代码性能 (Windows)
  • 原文地址:https://www.cnblogs.com/tele-share/p/10370665.html
Copyright © 2011-2022 走看看