zoukankan      html  css  js  c++  java
  • Spark:JavaRDD 转化为 Dataset<Row>的两种方案

    JavaRDD 转化为 Dataset<Row>方案一:

    实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

    Student.java实体类:

    import java.io.Serializable;
    
    @SuppressWarnings("serial")
    public class Student implements Serializable {
        private String sid;
        private String sname;
        private int sage;
    
        public String getSid() {
            return sid;
        }
    
        public void setSid(String sid) {
            this.sid = sid;
        }
    
        public String getSname() {
            return sname;
        }
    
        public void setSname(String sname) {
            this.sname = sname;
        }
    
        public int getSage() {
            return sage;
        }
    
        public void setSage(int sage) {
            this.sage = sage;
        }
    
        @Override
        public String toString() {
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
        }
    }

    实现代码:

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
                public Student call(String line) throws Exception {
                    String parts[] = line.split(",");
                    Student stu = new Student();
                    stu.setSid(parts[0]);
                    stu.setSname(parts[1]);
                    stu.setSage(Integer.valueOf(parts[2]));
                    return stu;
                }
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
            df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

    JavaRDD 转化为 Dataset<Row>方案二:

    使用schema生成方案

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
                public Row call(String line) throws Exception {
                    String[] parts = line.split(",");
                    String sid = parts[0];
                    String sname = parts[1];
                    int sage = Integer.parseInt(parts[2]);
    
                    return RowFactory.create(sid, sname, sage);
                }
            });
    
            ArrayList<StructField> fields = new ArrayList<StructField>();
            StructField field = null;
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
            fields.add(field);
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  • 相关阅读:
    克如斯卡尔 P1546
    真正的spfa
    第四课 最小生成树 要点
    关于vscode中nullptr未定义
    cmake学习笔记
    python学习笔记
    (BFS 图的遍历) 2906. kotori和迷宫
    (图论基础题) leetcode 997. Find the Town Judge
    (BFS DFS 并查集) leetcode 547. Friend Circles
    (BFS DFS 图的遍历) leetcode 841. Keys and Rooms
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9365843.html
Copyright © 2011-2022 走看看