zoukankan      html  css  js  c++  java
  • spark 系列之三 RDD,DataFrame与DataSet之间的转化

    本篇介绍 RDD,DataFrame与DataSet之间的转化

    在Object中构建 SparkSession

    object SparkRDD_DF {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder()
          .appName("DatasetAPP")
          .master("local")
          .getOrCreate()
    
        val rd = new Rdd2DF(sparkSession)
        rd.RDD2DF_1()
        rd.RDD2DF_2()
      }
    }

    第一种方式:

    这种方式适合你再体验已经知道了数据的字段类型和字段名称的情况下。

    需要先提前构建一个 Info的 case class

    case class Info(id: Int, name: String, age: Int)

    然后

      def RDD2DF_1():Unit ={
        //RDD ==> DataFrame 方式一
        val sparkSession:SparkSession = this.sparkSession
        import sparkSession.implicits._
        sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Info(line(0).toInt, line(1), line(2).toInt))
          .toDF()
          .show()
      }

    第二种方式:

    适合你提前不知道数据的字段,以及字段类型

      def RDD2DF_2():Unit ={
        //RDD ==> DataFrame 方式二
        val peopleRDD = sparkSession.sparkContext.textFile("file:///D:/software_download/spark_text/people.txt")
          .map(_.split(","))
          .map(line => Row(line(0).toInt, line(1), line(2).toInt))
    
    
        val structType =  StructType(Array(StructField("id", IntegerType, true),
          StructField("name", StringType, true),
          StructField("age", IntegerType, true)))
    
        sparkSession.createDataFrame(peopleRDD, structType).show()
      }

    DataFrame 与DataSet之间的转化

    object SparkDataset {
      case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SparkDataset").master("local").getOrCreate()
        //csv文件路径
        val path = "file:///D:/software_download/spark_text/sales.csv"
        //导入隐式转换
        import spark.implicits._
        //spark如何解析csv文件(外部数据源功能)
        val csv_dataframe = spark.read
                                 .option("header","true")
                                 .option("inferSchema","true").csv(path)
    
        //把csv_dataframe转换成Dataset
        //csv_dataframe.select("amountPaid","itemId").show()
    
        val csv_dataset = csv_dataframe.as[Sales]
        csv_dataset.map(line => (line.itemId,line.amountPaid)).show
        spark.stop()
      }
    }

    DataFrame和RDD 的简单使用,前面已经介绍过。

    我们知道 通过DataFrame 可以方便的使用一些算子,以及把DataFrame注册成临时表,方便我们使用sql语句进行查询,大大降低了学习的成本

    那么既生瑜,何生亮,既然有了DataFrame,那我们还要DataSet干嘛呢?

    下一遍,我们深入的讲一下DataSet的出现的意义,以及使用场景:)

  • 相关阅读:
    万豪项目总结
    解决jquery animate({scrollTop$pos},500)与$(window).scroll方法冲突的问题
    一波水文来袭-让我们一起谈谈闭包【原创】
    JS模块化规范AMD之RequireJS
    JS模块化规范CMD之SeaJS
    邂逅Sass和Compass之Compass篇
    邂逅Sass和Compass之Sass篇
    idea 修改SVN账户信息
    idea 创建/加载 maven项目速度较慢
    gitlab新建分支,idea中无法找到
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14203026.html
Copyright © 2011-2022 走看看