zoukankan      html  css  js  c++  java
  • 转 RDDs转DF

    原文链接:http://blog.csdn.net/Gavin_chun/article/details/78663826

    一、方式1:反射的方法,但是生产上不建议使用。因为case class只能定义22个字段,有所限制。

    二、方式2:编程的方式,一般三步走。 
    1、从原始RDD创建一个RDD[Row]; 
    2、在步骤1中创建的RDD[Row]中,创建一个与scheam匹配的结构 
    3、通过SparkSession提供的createDataFrame方法将schema应用于RDD[Row]

    package com.spark.sql
    
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    
    /**
      * Created with IntelliJ IDEA.
      * Description: 
      * Author: A_ChunUnique
      * Date: 2017/11/28
      * Time: 14:27
      *
      **/
    object CovertRdd {
    
      case class Person(name: String, age: Long)
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[2]").appName("RDD Covert DF").getOrCreate()
        //runInferSchemaExample(spark)
        runProgrammaticSchemaExample(spark)
      }
    
      private def runInferSchemaExample(spark: SparkSession): Unit = {
        /*
        * 方法1:通过反射方法 将RDD转成一个DF
        * */
        import spark.implicits._
        val peopleDF = spark.sparkContext
          .textFile("file:///D:/ruoze/people.txt")
          .map(_.split(","))
          .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
        peopleDF.createOrReplaceTempView("people")
        val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
        teenagersDF.map(teenager => "Name: " + teenager(0) + ",Age:" + teenager(1)).show()
      }
    
      /*
        * 方法1:通过编程方法,将RDD转成一个DF
        * */
      private def runProgrammaticSchemaExample(spark: SparkSession): Unit = {
        import spark.implicits._
        // Create an RDD
        val peopleRDD = spark.sparkContext.textFile("file:///D:/ruoze/people.txt")
        // The schema is encoded in a string
        val schemaString = "name age"
        // Generate the schema based on the string of schema
        val fields = schemaString.split(" ")
          .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
        // Convert records of the RDD (people) to Rows
        val rowRDD = peopleRDD
          .map(_.split(","))
          .map(attributes => Row(attributes(0), attributes(1).trim))
    
        // Apply the schema to the RDD
        val peopleDF = spark.createDataFrame(rowRDD, schema)
    
        // Creates a temporary view using the DataFrame
        peopleDF.createOrReplaceTempView("people")
    
        // SQL can be run over a temporary view created using DataFrames
        val results = spark.sql("SELECT name FROM people")
    
        // The results of SQL queries are DataFrames and support all the normal RDD operations
        // The columns of a row in the result can be accessed by field index or by field name
        results.map(attributes => "Name: " + attributes(0)).show()
      }
    }
  • 相关阅读:
    【Unity】自定义编辑器窗口——拓展编辑器功能
    【Unity】AssetBundle的使用——打包/解包
    【Unity】使用Resources类管理资源
    【Unity】使用AssetDatabase编辑器资源管理
    【Unity】协程Coroutine及Yield常见用法
    【Unity】制作简易定时器(Timer)
    python3使用csv模块读写csv文件
    Python3使用csv模块csv.writer().writerow()保存csv文件,产生空行的问题
    MongoDB服务无法启动,发生服务特定错误:100
    ValueError: update only works with $ operators
  • 原文地址:https://www.cnblogs.com/kxgdby/p/7956584.html
Copyright © 2011-2022 走看看