zoukankan      html  css  js  c++  java
  • Spark:JavaRDD 转化为 Dataset<Row>的两种方案

    JavaRDD 转化为 Dataset<Row>方案一:

    实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

    Student.java实体类:

    import java.io.Serializable;
    
    @SuppressWarnings("serial")
    public class Student implements Serializable {
        private String sid;
        private String sname;
        private int sage;
    
        public String getSid() {
            return sid;
        }
    
        public void setSid(String sid) {
            this.sid = sid;
        }
    
        public String getSname() {
            return sname;
        }
    
        public void setSname(String sname) {
            this.sname = sname;
        }
    
        public int getSage() {
            return sage;
        }
    
        public void setSage(int sage) {
            this.sage = sage;
        }
    
        @Override
        public String toString() {
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
        }
    }

    实现代码:

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
                public Student call(String line) throws Exception {
                    String parts[] = line.split(",");
                    Student stu = new Student();
                    stu.setSid(parts[0]);
                    stu.setSname(parts[1]);
                    stu.setSage(Integer.valueOf(parts[2]));
                    return stu;
                }
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
            df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

    JavaRDD 转化为 Dataset<Row>方案二:

    使用schema生成方案

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
                public Row call(String line) throws Exception {
                    String[] parts = line.split(",");
                    String sid = parts[0];
                    String sname = parts[1];
                    int sage = Integer.parseInt(parts[2]);
    
                    return RowFactory.create(sid, sname, sage);
                }
            });
    
            ArrayList<StructField> fields = new ArrayList<StructField>();
            StructField field = null;
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
            fields.add(field);
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  • 相关阅读:
    Euraka适合初学者的简单小demo
    springboot中常用的依赖
    SpringBoot的入门程序
    spring-data-solr查询
    SpringBoot整合Redis
    SpringBoot整合MyBatis
    使用swagger2生成文档
    SpringBoot整合Spring Data JPA
    SpringBoot构建RESTful API
    SpringBoot属性配置
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9365843.html
Copyright © 2011-2022 走看看