zoukankan      html  css  js  c++  java
  • RDD转DataFrame常用的两种方式

            随着Spark1.4.x的更新,Spark提供更高阶的对象DataFrame,提供了比RDD更丰富的API操作,同时也支持RDD转DataFrame(下面简称“DF”),但是要注意,不是任意类型对象组成的RDD都可以转换成DF,,只有当组成RDD[T]的每一个T对象内部具有鲜明的字段结构时,才能隐式或者显示地创建DF所需要的Schema(结构信息),从而进行RDD->DF转换。下面介绍两种常用的RDD转DF的方式(示例Spark版本为2.2.0)。

    准备文件:person3.txt

    20,Michael,173,150
    30,Andy,168,140
    19,Justin,145,145
    18,Sam,175,122
    17,Jean,173,134
    22,Jerry,180,99
    26,Fasttson,150,145
    23,Mivak,177,118
    16,Heaat,168,129
    36,Franc,165,138

    一、使用反射机制推理出schema

            使用这种方式,往往是因为RDD[T]的T对象就已经是具有典型的一维表结构的字段结构对象,因此SparkSql可以自动推断出schema。常用的操作是使用样例对象(case class)。

    package com.cjs
     
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
     
    object RDDToDF1 {
        case class Person(name:String, age:Int)
     
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
     
            val conf = new SparkConf()
                .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse")
                .set("spark.some.config.option","some-value")
     
            val ss = SparkSession
                .builder()
                .config(conf)
                .appName("RDDToDF")
                .master("local[2]")
                .getOrCreate()
     
            val path = "E:\IntelliJ Idea\sparkSql_practice\src\main\scala\com\cjs\person3.txt"
     
            import ss.implicits._
            val sc = ss.sparkContext
     
            val RDDPerson1 = sc.textFile(path)
            val RDDPerson = RDDPerson1.map(_.split(",")).map(p => Person(p(1),p(0).trim.toInt)) 
     
            val DFPerson = RDDPerson.toDF()
     
            DFPerson.printSchema()
     
            DFPerson.select($"name",$"age").show()
     
        }
     
    }

    运行结果:

    二、自定义schema

            这种方式是通过编程接口(StructType),允许创建一个schema,然后将其应用到RDD[Row]中。相对来说,这种方式的步骤会比较繁琐,但更自由,更灵活。因为有很多时候样例对象(case class)不能提前定义,或者一个文本的数据集的字段对不同的用户来说,需要被解释为不同的字段名,那么通过这种方式可以灵活定义schema。

    package com.cjs
     
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types._
     
    object RDDToDF2 {
        def main(args: Array[String]): Unit = {
            Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
     
            val conf = new SparkConf()
                .set("spark.sql.warehouse.dir","file:///e:/tmp/spark-warehouse")
                .set("spqrk.some.config.option","some-value")
     
            val ss = SparkSession
                .builder()
                .config(conf)
                .appName("RDD_To_DF2")
                .master("local[2]")
                .getOrCreate()
     
            import ss.implicits._
     
            val schemaString = "name,age"   //1、创建schema所需要的字段名的字符串,用特殊符号连接,如“,”
     
            //2、遍历schema的fileName,将每个fileName封装成StructField对象,实质是定义每一个file的数据类型、属性。 Array[StructField]
            val fields = schemaString.split(",").map(fileName => StructField(fileName, StringType, nullable = true))
            //3、将 Array[StructField]转化成schema
            val schema = StructType(fields)
     
            val sc = ss.sparkContext
            val path = "E:\IntelliJ Idea\sparkSql_practice\src\main\scala\com\cjs\person3.txt"
            val peopleRDD = sc.textFile(path)
            //4、构建RDD[Row]
            val rowRDD = peopleRDD.map(_.split(",")).map(att=>Row(att(1),att(0).trim))
     
            val peopleDF = ss.createDataFrame(rowRDD, schema)
     
            peopleDF.select($"name",$"age").show()
        }
     
    }

    运行结果:

  • 相关阅读:
    【美团面试题】——图遍历
    ubuntu15.04 + ROS jade
    3P(PS、PR、PDF编辑器Acrobat)中的基基本操作(三)
    谈谈你对组件的看法
    Web开发中跨域的几种解决方案
    JS事件模型
    zepto的clone方法于textarea和select的bug修复
    SASS使用总结
    git用法总结
    去除inline-block元素间间距
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11466103.html
Copyright © 2011-2022 走看看