zoukankan      html  css  js  c++  java
  • RDD转换成为DataFrame

    方式一: 通过case class创建DataFrames(反射)

    TestDataFrame1.scala

    package com.bky
    
    // 隐式类的导入
    // 定义case class,相当于表结构
    case class Dept(var id:Int, var position:String, var location:String)
    
    // 需要导入SparkSession这个包
    import org.apache.spark.sql.SparkSession
    
    /**
      * 方式一: 通过case class创建DataFrames(反射)
      */
    object TestDataFrame1 {
    
      def main(args: Array[String]): Unit = {
    
        /**
          * 直接使用SparkSession进行文件的创建。
          * 封装了SparkContext,SparkConf,SQLContext,
          * 为了向后兼容,SQLContext和HiveContext也被保存了下来
          */
        val spark = SparkSession
          .builder()  //构建sql
          .appName("TestDataFrame1") // 设置文件名
          .master("local[2]") // 设置executor
          .getOrCreate() //获取或创建
    
        import spark.implicits._  // 隐式转换
        // 将本地的数据读入RDD,将RDD与case class关联
        val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
          .map(line => Dept(line.split("	")(0).toInt,
            line.split("	")(1),
            line.split("	")(2).trim))
    
        // 将RDD转换成DataFrames(反射)
        val df = deptRDD.toDF()
    
        // 将DataFrames创建成一个临时的视图
        df.createOrReplaceTempView("dept")
    
        // 使用SQL语句进行查询
        spark.sql("select * from dept").show()
    
      }
    }
    

    精简版 TestDataFrame1.scala

    package com.bky
    
    import org.apache.spark.sql.SparkSession
    
    object TestDataFrame1 extends App {
        val spark = SparkSession
          .builder()  //构建sql
          .appName("TestDataFrame1") 
          .master("local[2]") 
          .getOrCreate() 
    
        import spark.implicits._  
        val deptRDD = spark.read.textFile("/Users/hadoop/data/dept.txt")
          .map(line => Dept(line.split("	")(0).toInt,
            line.split("	")(1),
            line.split("	")(2).trim))
      
        val df = deptRDD.toDF()   
        df.createOrReplaceTempView("dept")
        spark.sql("select * from dept").show()
    }
    
    case class Dept(var id:Int, var position:String, var location:String)
    

    方式二:通过创建structType创建DataFrames(编程接口)

    TestDataFrame2.scala

    package com.bky
    
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SparkSession}
    
    /**
      *
      * 方式二:通过创建structType创建DataFrames(编程接口)
      */
    object TestDataFrame2 extends App {
    
      val spark = SparkSession
        .builder()
        .appName("TestDataFrame2")
        .master("local[2]")
        .getOrCreate()
    
      /**
        * 将RDD数据映射成Row,需要导入import org.apache.spark.sql.Row
        */
      import spark.implicits._
      val path = "/Users/hadoop/data/dept.txt"
      val fileRDD = spark.read.textFile(path)
      val rowRDD= fileRDD.map(line => {
        val fields = line.split("	")
        Row(fields(0).toInt, fields(1), fields(2).trim)
      })
    
      // 创建StructType来定义结构
      val innerStruct = StructType(
        // 字段名,字段类型,是否可以为空
          StructField("id", IntegerType, true) ::
            StructField("position", StringType, true) ::
            StructField("location", StringType, true) :: Nil
      )
    
      val df = spark.createDataFrame(innerStruct)
      df.createOrReplaceTempView("dept")
      spark.sql("select * from dept").show()
    
    }
    
    

    方式三:通过json文件创建DataFrames

    TestDataFrame3.scala

    package com.bky
    
    import org.apache.spark.sql.SparkSession
    
    /**
      * 方式三:通过json文件创建DataFrames
      */
    object TestDataFrame3 extends App {
    
      val spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("TestDataFrame3")
        .getOrCreate()
    
      val path = "/Users/hadoop/data/test.json"
      val fileRDD = spark.read.json(path)
      fileRDD.createOrReplaceTempView("test")
      spark.sql("select * from test").show()
    }
    
    
  • 相关阅读:
    OpenJDK源码研究笔记(十二):JDBC中的元数据,数据库元数据(DatabaseMetaData),参数元数据(ParameterMetaData),结果集元数据(ResultSetMetaDa
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 257 二叉树的所有路径
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 242 有效的字母异位词
    Java实现 LeetCode 241 为运算表达式设计优先级
    Java实现 LeetCode 241 为运算表达式设计优先级
    Java实现 LeetCode 241 为运算表达式设计优先级
  • 原文地址:https://www.cnblogs.com/suixingc/p/rdd-zhuan-huan-cheng-weidataframe.html
Copyright © 2011-2022 走看看