zoukankan      html  css  js  c++  java
  • Spark:JavaRDD 转化为 Dataset<Row>的两种方案

    JavaRDD 转化为 Dataset<Row>方案一:

    实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

    Student.java实体类:

    import java.io.Serializable;
    
    @SuppressWarnings("serial")
    public class Student implements Serializable {
        private String sid;
        private String sname;
        private int sage;
    
        public String getSid() {
            return sid;
        }
    
        public void setSid(String sid) {
            this.sid = sid;
        }
    
        public String getSname() {
            return sname;
        }
    
        public void setSname(String sname) {
            this.sname = sname;
        }
    
        public int getSage() {
            return sage;
        }
    
        public void setSage(int sage) {
            this.sage = sage;
        }
    
        @Override
        public String toString() {
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
        }
    }

    实现代码:

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
                public Student call(String line) throws Exception {
                    String parts[] = line.split(",");
                    Student stu = new Student();
                    stu.setSid(parts[0]);
                    stu.setSname(parts[1]);
                    stu.setSage(Integer.valueOf(parts[2]));
                    return stu;
                }
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
            df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

    JavaRDD 转化为 Dataset<Row>方案二:

    使用schema生成方案

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
                public Row call(String line) throws Exception {
                    String[] parts = line.split(",");
                    String sid = parts[0];
                    String sname = parts[1];
                    int sage = Integer.parseInt(parts[2]);
    
                    return RowFactory.create(sid, sname, sage);
                }
            });
    
            ArrayList<StructField> fields = new ArrayList<StructField>();
            StructField field = null;
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
            fields.add(field);
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  • 相关阅读:
    keyset与entryset
    solr4.9r+ Eclipse 4.3+ tomcat 7.5 +winds7(二)
    如何解决This system is not registered with RHN.
    堆和栈的差别(转过无数次的文章)
    墨菲定律、二八法则、马太效应、手表定理、“不值得”定律、彼得原理、零和游戏、华盛顿合作规律、酒与污水定律、水桶定律、蘑菇管理原理、钱的问题、奥卡姆剃刀等13条是左右人生的金科玉律
    atitit.软件开发GUI 布局管理优缺点总结java swing wpf web html c++ qt php asp.net winform
    漫谈并发编程(二):java线程的创建与基本控制
    exosip
    PostgreSQL服务端监听设置及client连接方法
    APK反编译。
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9365843.html
Copyright © 2011-2022 走看看