zoukankan      html  css  js  c++  java
  • Spark:JavaRDD 转化为 Dataset<Row>的两种方案

    JavaRDD 转化为 Dataset<Row>方案一:

    实体类作为schema定义规范,使用反射,实现JavaRDD转化为Dataset<Row>

    Student.java实体类:

    import java.io.Serializable;
    
    @SuppressWarnings("serial")
    public class Student implements Serializable {
        private String sid;
        private String sname;
        private int sage;
    
        public String getSid() {
            return sid;
        }
    
        public void setSid(String sid) {
            this.sid = sid;
        }
    
        public String getSname() {
            return sname;
        }
    
        public void setSname(String sname) {
            this.sname = sname;
        }
    
        public int getSage() {
            return sage;
        }
    
        public void setSage(int sage) {
            this.sage = sage;
        }
    
        @Override
        public String toString() {
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
        }
    }

    实现代码:

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Student> rowRDD = source.map(new Function<String, Student>() {
                public Student call(String line) throws Exception {
                    String parts[] = line.split(",");
                    Student stu = new Student();
                    stu.setSid(parts[0]);
                    stu.setSname(parts[1]);
                    stu.setSage(Integer.valueOf(parts[2]));
                    return stu;
                }
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
            df.select("sid", "sname", "sage").coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");

    JavaRDD 转化为 Dataset<Row>方案二:

    使用schema生成方案

            SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark").getOrCreate();
            final JavaSparkContext ctx = JavaSparkContext.fromSparkContext(spark.sparkContext());
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
    
            JavaRDD<Row> rowRDD = source.map(new Function<String, Row>() {
                public Row call(String line) throws Exception {
                    String[] parts = line.split(",");
                    String sid = parts[0];
                    String sname = parts[1];
                    int sage = Integer.parseInt(parts[2]);
    
                    return RowFactory.create(sid, sname, sage);
                }
            });
    
            ArrayList<StructField> fields = new ArrayList<StructField>();
            StructField field = null;
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);
            fields.add(field);
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
            fields.add(field);
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  • 相关阅读:
    并发之CAS无锁技术
    dubbo-admin打包和zookper安装
    读书笔记<深入理解JVM>01 关于OutOfMemoryError 堆空间的溢出
    关于mybatis和spring复合pom的异常
    ElasticSearch入门一
    Niginx +Tomcat 集群搭建
    使用自定义线程池优化EchoServer
    使用线程池优化Echo模型
    获取请求主机IP地址,如果通过代理进来,则透过防火墙获取真实IP地址
    java中double和float精度丢失问题
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9365843.html
Copyright © 2011-2022 走看看