zoukankan      html  css  js  c++  java
  • RDD转换成为DataFrame

    方式一: 通过case class创建DataFrames(反射)

    TestDataFrame1.scala

    package com.bky
    
    // 隐式类的导入
    // 定义case class,相当于表结构
    case class Dept(var id:Int, var position:String, var location:String)
    
    // 需要导入SparkSession这个包
    import org.apache.spark.sql.SparkSession
    
    /**
      * 方式一: 通过case class创建DataFrames(反射)
      */
    object TestDataFrame1 {
    
      def main(args: Array[String]): Unit = {
    
        /**
          * 直接使用SparkSession进行文件的创建。
          * 封装了SparkContext,SparkConf,SQLContext,
          * 为了向后兼容,SQLContext和HiveContext也被保存了下来
          */
        val spark = SparkSession
          .builder()  //构建sql
          .appName("TestDataFrame1") // 设置文件名
          .master("local[2]") // 设置executor
          .getOrCreate() //获取或创建
    
        import spark.implicits._  // 隐式转换
        // 将本地的数据读入RDD,将RDD与case class关联
        val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
          .map(line => Dept(line.split("	")(0).toInt,
            line.split("	")(1),
            line.split("	")(2).trim))
    
        // 将RDD转换成DataFrames(反射)
        val df = deptRDD.toDF()
    
        // 将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("dept")
    
        // 使用SQL语句进行查询
        spark.sql("select * from dept").show()
    
      }
    }
    

    精简版 TestDataFrame1.scala

    package com.bky
    
    import org.apache.spark.sql.SparkSession
    
    object TestDataFrame1 extends App {
        val spark = SparkSession
          .builder()  //构建sql
          .appName("TestDataFrame1") 
          .master("local[2]") 
          .getOrCreate() 
    
        import spark.implicits._  
        val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
          .map(line => Dept(line.split("	")(0).toInt,
            line.split("	")(1),
            line.split("	")(2).trim))
      
        val df = deptRDD.toDF()   
        df.createOrReplaceTempView("dept")
        spark.sql("select * from dept").show()
    }
    
    case class Dept(var id:Int, var position:String, var location:String)
    

    方式二:通过创建structType创建DataFrames(编程接口)

    TestDataFrame2.scala

    package com.bky
    
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SparkSession}
    
    /**
      *
      * 方式二:通过创建structType创建DataFrames(编程接口)
      */
    object TestDataFrame2 extends App {
    
      val spark = SparkSession
        .builder()
        .appName("TestDataFrame2")
        .master("local[2]")
        .getOrCreate()
    
      /**
        * 将RDD数据映射成Row,需要导入import org.apache.spark.sql.Row
        */
      import spark.implicits._
      val path = "/Users/hadoop/data/dept.txt"
      val fileRDD = spark.read.textFile(path)
      val rowRDD= fileRDD.map(line => {
        val fields = line.split("	")
        Row(fields(0).toInt, fields(1), fields(2).trim)
      })
    
      // 创建StructType来定义结构
      val innerStruct = StructType(
        // 字段名,字段类型,是否可以为空
          StructField("id", IntegerType, true) ::
            StructField("position", StringType, true) ::
            StructField("location", StringType, true) :: Nil
      )
    
      val df = spark.createDataFrame(innerStruct)
      df.createOrReplaceTempView("dept")
      spark.sql("select * from dept").show()
    
    }
    
    

    方式三:通过json文件创建DataFrames

    TestDataFrame3.scala

    package com.bky
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * 方式三:通过json文件创建DataFrames
      */
    object TestDataFrame3 extends App {
    
      val spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("TestDataFrame3")
        .getOrCreate()
    
      val path = "/Users/hadoop/data/test.json"
      val fileRDD = spark.read.json(path)
      fileRDD.createOrReplaceTempView("test")
      spark.sql("select * from test").show()
    }
    
    
  • 相关阅读:
    Educational Codeforces Round 81 (Rated for Div. 2) A-E
    SEERC 2018 I
    manjaro linux java环境配置
    Pangu and Stones HihoCoder
    Linux下 vim 的配置
    C++内存管理技术
    Interview_C++_day27
    Interview_C++_day26
    Interview_C++_day25
    Interview_数据库_day24
  • 原文地址:https://www.cnblogs.com/suixingc/p/rdd-zhuan-huan-cheng-weidataframe.html
Copyright © 2011-2022 走看看