zoukankan      html  css  js  c++  java
  • 转】RDD与DataFrame的转换

      原博文出自于:  http://www.cnblogs.com/namhwik/p/5967910.html

    RDD与DataFrame转换
    1. 通过反射的方式来推断RDD元素中的元数据。因为RDD本身一条数据本身是没有元数据的,例如Person,而Person有name,id等,而record是不知道这些的,但是变成DataFrame背后一定知道,通过反射的方式就可以了解到背后这些元数据,进而转换成DataFrame。
    如何反射?
    Scala: 通过case class映射,在case class里面说我们这个RDD里面每个record的不同列的元数据是什么。(废弃)
    当样本类不能提前确定时(例如,当记录的结构由字符串或文本数据集编码而成,它在解析时,字段将会对不同的用户有不同的投影结果),SchemaRDD 可以由以下三个步骤创建: 
    当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

    复制代码

     //   从原来的RDD创建一个Row格式的RDD
     //    创建与RDD 中Rows结构匹配的StructType,通过该StructType创建表示RDD 的Schema
     //   通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD 的Schema
    val conf = new SparkConf().setMaster ("local").setAppName ("Test1") val sc = new SparkContext (conf) val sqlContext = new SQLContext(sc) // import sqlContext.implicits._ case class Person(name:String,age:Int) val people = sc.textFile ("d:/people.txt") val schemaString = "name age" val schema = StructType ( schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)) ) val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) val peopleSchemaRDD = sqlContext.createDataFrame(rowRDD, schema) peopleSchemaRDD .registerTempTable("people" ) val results = sqlContext . sql ("SELECT name FROM people" ) results.printSchema() println(results.count()) results.map(t => "Name: " + t(0)).collect().foreach(println)
    复制代码


    //1.利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。

    复制代码
    //2.   先创建一个bean类,然后将Rdd转换成DataFrame
     case class Person(name: String, age: Int)
      def main (args : Array[String]) : Unit =
      {
        val conf = new SparkConf().setMaster ("local").setAppName ("Test1")
        val sc = new SparkContext (conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        val people = sc.textFile("d:/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
        people.registerTempTable("people")
        val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
        teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
        teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
        teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
    复制代码
  • 相关阅读:
    思念
    空白
    curl json string with variable All In One
    virtual scroll list All In One
    corejs & RegExp error All In One
    socket.io All In One
    vue camelCase vs PascalCase vs kebabcase All In One
    element ui 表单校验,非必填字段校验 All In One
    github 定时任务 UTC 时间不准确 bug All In One
    input range & color picker All In One
  • 原文地址:https://www.cnblogs.com/zlslch/p/6040469.html
Copyright © 2011-2022 走看看