zoukankan      html  css  js  c++  java
  • 转 RDDs转DF

    原文链接:http://blog.csdn.net/Gavin_chun/article/details/78663826

    一、方式1:反射的方法,但是生产上不建议使用。因为case class只能定义22个字段,有所限制。

    二、方式2:编程的方式,一般三步走。 
    1、从原始RDD创建一个RDD[Row]; 
    2、在步骤1中创建的RDD[Row]中,创建一个与scheam匹配的结构 
    3、通过SparkSession提供的createDataFrame方法将schema应用于RDD[Row]

    package com.spark.sql
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
      * Created with IntelliJ IDEA.
      * Description: 
      * Author: A_ChunUnique
      * Date: 2017/11/28
      * Time: 14:27
      *
      **/
    object CovertRdd {
    
      case class Person(name: String, age: Long)
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[2]").appName("RDD Covert DF").getOrCreate()
        //runInferSchemaExample(spark)
        runProgrammaticSchemaExample(spark)
      }
    
      private def runInferSchemaExample(spark: SparkSession): Unit = {
        /*
        * 方法1:通过反射方法 将RDD转成一个DF
        * */
        import spark.implicits._
        val peopleDF = spark.sparkContext
          .textFile("file:///D:/ruoze/people.txt")
          .map(_.split(","))
          .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
        peopleDF.createOrReplaceTempView("people")
        val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
        teenagersDF.map(teenager => "Name: " + teenager(0) + ",Age:" + teenager(1)).show()
      }
    
      /*
        * 方法1:通过编程方法,将RDD转成一个DF
        * */
      private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
        import spark.implicits._
        // Create an RDD
        val peopleRDD = spark.sparkContext.textFile("file:///D:/ruoze/people.txt")
        // The schema is encoded in a string
        val schemaString = "name age"
        // Generate the schema based on the string of schema
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
        // Convert records of the RDD (people) to Rows
        val rowRDD = peopleRDD
          .map(_.split(","))
          .map(attributes => Row(attributes(0), attributes(1).trim))
    
        // Apply the schema to the RDD
        val peopleDF = spark.createDataFrame(rowRDD, schema)
    
        // Creates a temporary view using the DataFrame
        peopleDF.createOrReplaceTempView("people")
    
        // SQL can be run over a temporary view created using DataFrames
        val results = spark.sql("SELECT name FROM people")
    
        // The results of SQL queries are DataFrames and support all the normal RDD operations
        // The columns of a row in the result can be accessed by field index or by field name
        results.map(attributes => "Name: " + attributes(0)).show()
      }
    }
  • 相关阅读:
    ini_set /ini_get函数功能-----PHP
    【转】那个什么都懂的家伙
    word 2007为不同页插入不同页眉页脚
    August 26th 2017 Week 34th Saturday
    【2017-11-08】Linux与openCV:opencv版本查看及库文件位置等
    August 25th 2017 Week 34th Friday
    August 24th 2017 Week 34th Thursday
    August 23rd 2017 Week 34th Wednesday
    August 22nd 2017 Week 34th Tuesday
    August 21st 2017 Week 34th Monday
  • 原文地址:https://www.cnblogs.com/kxgdby/p/7956584.html
Copyright © 2011-2022 走看看