zoukankan      html  css  js  c++  java
  • sparksql 用反射的方式将rdd转换成dataset/dataframe

    java

     1 public class ReflectionDemo {
     2     private static SparkConf conf = new SparkConf().setAppName("reflectdemo").setMaster("local");
     3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     4 
     5     private static SparkSession session = new SparkSession(jsc.sc());
     6 
     7     public static void main(String[] args) throws AnalysisException {
     8 
     9         // rdd
    10         JavaRDD<String> rdd = jsc.textFile("./src/main/java/cn/tele/spark_sql/rdd2dataset/students.txt");
    11 
    12         // 封装rdd
    13         JavaRDD<Student> mapRdd = rdd.map(new Function<String, Student>() {
    14 
    15             private static final long serialVersionUID = 1L;
    16 
    17             @Override
    18             public Student call(String v1) throws Exception {
    19                 String[] fields = v1.split(",");
    20                 Student student = new Student(Integer.valueOf(fields[0]), fields[1], Integer.valueOf(fields[2]));
    21                 return student;
    22             }
    23         });
    24 
    25         // 通过反射的方式进行转换
    26         Dataset<Row> dataset = session.createDataFrame(mapRdd, Student.class);
    27         
    28         // 注册为临时信息表
    29         // dataset.registerTempTable("students");
    30 
    31         dataset.createOrReplaceTempView("studentsView");
    32 
    33         Dataset<Row> result = session.sql("select * from studentsView where age<=18");
    34         // result.show();
    35 
    36         // 把dataset转换成rdd
    37         JavaRDD<Student> javaRDD = result.javaRDD().map(new Function<Row, Student>() {
    38 
    39             private static final long serialVersionUID = 1L;
    40 
    41             @Override
    42             public Student call(Row v1) throws Exception {
    43                 return new Student(v1.getAs("id"), v1.getAs("name"), v1.getAs("age"));
    44             }
    45         });
    46 
    47         javaRDD.foreach(new VoidFunction<Student>() {
    48 
    49             private static final long serialVersionUID = 1L;
    50 
    51             @Override
    52             public void call(Student t) throws Exception {
    53                 System.out.println(t);
    54             }
    55         });
    56         session.stop();
    57         jsc.close();
    58     }
    59 }

    scala

     1 case class Student(val id:Int,val name:String,val age:Int)
     2 
     3 object ReflectionDemo {
     4   def main(args: Array[String]): Unit = {
     5     val conf = new SparkConf().setAppName("reflectdemo").setMaster("local")
     6 
     7     val sc = new SparkContext(conf)
     8 
     9     val sqlContext = new SQLContext(sc)
    10 
    11     //导入隐式转换
    12     import sqlContext.implicits._
    13 
    14     //创建rdd
    15     val rdd = sc.textFile("./src/main/scala/cn/tele/spark_sql/rdd2dataframe/students.txt", 2)
    16 
    17     //转换rdd为dataframe
    18     val dataframe = rdd.map(line => {
    19       val arr = line.split(",");
    20       new Student(arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt)
    21     }).toDF()
    22 
    23     //创临时视图
    24     dataframe.createOrReplaceTempView("students")
    25 
    26     //  dataframe.show()
    27 
    28     val df = sqlContext.sql("select * from students where age<=18")
    29 
    30     val newRdd = df.rdd.map(row => new Student(row.getAs[Int]("id"), row.getAs[String]("name"), row.getAs[Int]("age")))
    31 
    32     newRdd.foreach(println(_))
    33   }
    34 
    35 }
  • 相关阅读:
    MyBatisPlus-快速入门
    Spring Cloud Alibaba的使用
    SpringCloud-Bus组件的使用
    Python刷题:最长回文子串(字符串)
    Python刷题:求最大连续bit数(位运算)
    Python刷题:用二进制方式求集合S的所有子集(位运算)
    Python刷题:集合S(k)求|x-y|最小时的x和y(位运算)
    Python刷题:常用二进制操作(位运算)
    Stripe支付介绍在asp.net mvc中开发对接,图文加代码说明
    从今天起做个真正的程序员
  • 原文地址:https://www.cnblogs.com/tele-share/p/10370665.html
Copyright © 2011-2022 走看看