zoukankan      html  css  js  c++  java
  • Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

    一:准备数据源

        在项目下新建一个student.txt文件,里面的内容为:

    1,zhangsan,20  
    2,lisi,21  
    3,wanger,19  
    4,fangliu,18

     二:实现

         Java版:

        1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

    import java.io.Serializable;  
      
    @SuppressWarnings("serial")  
    public class Student implements Serializable {  
      
        String sid;  
        String sname;  
        int sage;  
        public String getSid() {  
            return sid;  
        }  
        public void setSid(String sid) {  
            this.sid = sid;  
        }  
        public String getSname() {  
            return sname;  
        }  
        public void setSname(String sname) {  
            this.sname = sname;  
        }  
        public int getSage() {  
            return sage;  
        }  
        public void setSage(int sage) {  
            this.sage = sage;  
        }  
        @Override  
        public String toString() {  
            return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";  
        }    
    }  

    2.转换,具体代码如下

    import java.util.ArrayList;  
      
    import org.apache.spark.SparkConf;  
    import org.apache.spark.api.java.JavaRDD;  
    import org.apache.spark.sql.Dataset;  
    import org.apache.spark.sql.Row;  
    import org.apache.spark.sql.RowFactory;  
    import org.apache.spark.sql.SaveMode;  
    import org.apache.spark.sql.SparkSession;  
    import org.apache.spark.sql.types.DataTypes;  
    import org.apache.spark.sql.types.StructField;  
    import org.apache.spark.sql.types.StructType;  
      
    public class TxtToParquetDemo {  
      
        public static void main(String[] args) {  
              
            SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");  
            SparkSession spark = SparkSession.builder().config(conf).getOrCreate();  
      
            reflectTransform(spark);//Java反射  
            dynamicTransform(spark);//动态转换  
        }  
          
        /** 
         * 通过Java反射转换 
         * @param spark 
         */  
        private static void reflectTransform(SparkSession spark)  
        {  
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  
              
            JavaRDD<Student> rowRDD = source.map(line -> {  
                String parts[] = line.split(",");  
      
                Student stu = new Student();  
                stu.setSid(parts[0]);  
                stu.setSname(parts[1]);  
                stu.setSage(Integer.valueOf(parts[2]));  
                return stu;  
            });  
              
            Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);  
            df.select("sid", "sname", "sage").  
            coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");  
        }  
        /** 
         * 动态转换 
         * @param spark 
         */  
        private static void dynamicTransform(SparkSession spark)  
        {  
            JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  
              
            JavaRDD<Row> rowRDD = source.map( line -> {  
                String[] parts = line.split(",");  
                String sid = parts[0];  
                String sname = parts[1];  
                int sage = Integer.parseInt(parts[2]);  
                  
                return RowFactory.create(  
                        sid,  
                        sname,  
                        sage  
                        );  
            });  
              
            ArrayList<StructField> fields = new ArrayList<StructField>();  
            StructField field = null;  
            field = DataTypes.createStructField("sid", DataTypes.StringType, true);  
            fields.add(field);  
            field = DataTypes.createStructField("sname", DataTypes.StringType, true);  
            fields.add(field);  
            field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);  
            fields.add(field);  
              
            StructType schema = DataTypes.createStructType(fields);  
              
            Dataset<Row> df = spark.createDataFrame(rowRDD, schema);  
            df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");                    
        }      
    }  

     scala版本:

    import org.apache.spark.sql.SparkSession  
    import org.apache.spark.sql.types.StringType  
    import org.apache.spark.sql.types.StructField  
    import org.apache.spark.sql.types.StructType  
    import org.apache.spark.sql.Row  
    import org.apache.spark.sql.types.IntegerType  
      
    object RDD2Dataset {  
        
      case class Student(id:Int,name:String,age:Int)  
      def main(args:Array[String])  
      {  
          
        val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()  
        import spark.implicits._  
        reflectCreate(spark)  
        dynamicCreate(spark)  
      }  
        
     /**  
         * 通过Java反射转换  
         * @param spark  
         */  
      private def reflectCreate(spark:SparkSession):Unit={  
        import spark.implicits._  
        val stuRDD=spark.sparkContext.textFile("student2.txt")  
        //toDF()为隐式转换  
        val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()  
        //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名  
        stuDf.printSchema()  
        stuDf.createOrReplaceTempView("student")  
        val nameDf=spark.sql("select name from student where age<20")  
        //nameDf.write.text("result") //将查询结果写入一个文件  
        nameDf.show()  
      }  
        
      /**  
         * 动态转换  
         * @param spark  
         */  
      private def dynamicCreate(spark:SparkSession):Unit={  
        val stuRDD=spark.sparkContext.textFile("student.txt")  
        import spark.implicits._  
        val schemaString="id,name,age"  
        val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))  
        val schema=StructType(fields)  
        val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))  
        val stuDf=spark.createDataFrame(rowRDD, schema)  
            stuDf.printSchema()  
        val tmpView=stuDf.createOrReplaceTempView("student")  
        val nameDf=spark.sql("select name from student where age<20")  
        //nameDf.write.text("result") //将查询结果写入一个文件  
        nameDf.show()  
      }  
    }  

      注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

             2.此代码不适用于spark2.0以前的版本。

  • 相关阅读:
    HTML基础 meta refresh 网页定时刷新
    HTML基础 meta name author 添加网页作者的信息
    HTML基础 mate refresh 5秒钟后,页面自动跳转
    HTML基础 marquee div块实现循环跑马灯的效果
    微服务jar包启动脚本
    怎么实现将word中的公式导入(或粘贴)到在线编辑中
    怎么实现将word中的公式导入(或粘贴)到网页编辑中
    Nginx实现浏览器端大文件分块上传
    javascript实现浏览器端大文件分块上传
    js实现浏览器端大文件分块上传
  • 原文地址:https://www.cnblogs.com/itboys/p/9172780.html
Copyright © 2011-2022 走看看