zoukankan      html  css  js  c++  java
  • RDD&Dataset&DataFrame

    Dataset创建

    object DatasetCreation {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("SparkSessionTest")
          .getOrCreate()
    
        import spark.implicits._
    
        //1: range
        val ds1 = spark.range(0, 10, 2, 2)
        ds1.show()
    
        val dogs = Seq(Dog("jitty", "red"), Dog("mytty", "yellow"))
        val cats = Seq(new Cat("jitty", 2), new Cat("mytty", 4))
    
        //2: 从Seq[T]中创建
        val data = dogs
        val ds = spark.createDataset(data)
        ds.show()
    
        //3: 从RDD[T]中创建
        val dogRDD = spark.sparkContext.parallelize(dogs)
        val dogDS = spark.createDataset(dogRDD)
        dogDS.show()
    
        val catRDD = spark.sparkContext.parallelize(cats)
        //val catDSWithoutEncoder = spark.createDataset(catRDD)
        val catDS = spark.createDataset(catRDD)(Encoders.bean(classOf[Cat]))
        catDS.show()
    
        //Encoders 负责JVM对象类型与spark SQL内部数据类型之间的转换
        val intDs = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder)
        val seqIntDs = Seq(Seq(1), Seq(2), Seq(3)).toDS() // implicitly provided (spark.implicits.newIntSeqEncoder)
        val arrayIntDs = Seq(Array(1), Array(2), Array(3)).toDS() // implicitly provided (spark.implicits.newIntArrayEncoder)
    
        //支持的Encoders有如下:
        Encoders.product //tuples and case classes
        Encoders.scalaBoolean
        Encoders.scalaByte
        Encoders.scalaDouble
        Encoders.scalaFloat
        Encoders.scalaInt
        Encoders.scalaLong
        Encoders.scalaShort
    
        Encoders.bean(classOf[Cat])
    
        spark.stop()
      }
    }
    

      

    DataFrame创建

    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SparkSession}
    
    
    
    object DataFrameCreation {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("SparkSessionTest")
          .getOrCreate()
    
        //1: 从RDD[A <: Product]中创建, case class 和 tuple都是Product的子类
        val rdd = spark.sparkContext.textFile("").map(line => {
          val splitData = line.split(",")
          Dog(splitData(0), splitData(1))
        })
    
        val tupleRDD = spark.sparkContext.parallelize(Seq(("jitty", 2), ("mytty", 4)))
    
        spark.createDataFrame(rdd)
        spark.createDataFrame(tupleRDD)
    
        val dogRDD = spark.sparkContext.parallelize(Seq(Dog("jitty", "red"), Dog("mytty", "yellow")))
        val dogDf = spark.createDataFrame(dogRDD)
        dogDf.show()
    
        //2: 从Seq[A <: Product]中创建
        val dogSeq = Seq(Dog("jitty", "red"), Dog("mytty", "yellow"))
        spark.createDataFrame(dogSeq).show()
    
        //3:用RDD[_] + class创建,这个class是java的bean
        val catRDD = spark.sparkContext.parallelize(Seq(new Cat("jitty", 2), new Cat("mytty", 4)))
        //val catDf = spark.createDataFrame(catRDD)
        val catDf = spark.createDataFrame(catRDD, classOf[Cat])
        catDf.show()
        catDf.createOrReplaceTempView("cat")
        spark.sql("select * from cat").show() //需要注意的是查询出来的cat的属性的顺序是不固定的
    
        //4: 用RDD[Row] + schema创建
        val rowSeq = Seq("tom,30", "katy, 46").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
        val rowRDD = spark.sparkContext.parallelize(rowSeq)
        val schema =
              StructType(
                  StructField("name", StringType, false) ::
                    StructField("age", IntegerType, true) :: Nil)
        val dataFrame = spark.createDataFrame(rowRDD, schema)
        dataFrame.printSchema
        dataFrame.show()
    
        //5: 从外部数据源中创建
        val df = spark.read.json(s"${BASE_PATH}/IoT_device_info.json")
        df.show()
    
        spark.stop()
      }
    }
    

      

    RDD&Dataset&DataFrame的转换

    package com.twq.dataset.creation
    import com.twq.dataset.Dog
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Row, SparkSession}
    
    
    object RDDDatasetTransform {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("RDDDatasetTransform")
          .getOrCreate()
    
        val dogs = Seq(Dog("jitty", "red"), Dog("mytty", "yellow"))
    
        val dogRDD = spark.sparkContext.parallelize(dogs)
    
        //1: RDD转DataFrame
        import spark.implicits._
        val dogDF = dogRDD.toDF()
        dogDF.show()
    
        val renameSchemaDF = dogRDD.toDF ("first_name", "lovest_color")
        renameSchemaDF.show()
    
        //2: DataFrame转RDD, schema信息丢掉了
        val dogRowRDD: RDD[Row] = dogDF.rdd
        dogRowRDD.collect()
        renameSchemaDF.rdd.collect()
    
        //3: RDD转Dataset
        val dogDS = dogRDD.toDS()
        dogDS.show()
    
        //4: Dataset转RDD
        val dogRDDFromDs: RDD[Dog] = dogDS.rdd
        dogRDDFromDs.collect()
    
        //5: DataFrame转Dataset
        val dogDsFromDf = dogDF.as[Dog]
        dogDsFromDf.show()
    
        //6: Dataset转DataFrame
        val dogDfFromDs = dogDsFromDf.toDF()
        dogDfFromDs.show()
    
        spark.stop()
      }
    }
    

      

    schema的定义以及复杂数据类型的用法

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SaveMode, SparkSession}
    
    
    
    
    object SchemaApiTest {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("SchemaApiTest")
          .master("local")
          .getOrCreate()
    
    
        val iotDeviceDf = spark.read.json(s"${BASE_PATH}/IoT_device_info.json")
    
        iotDeviceDf.toString()
    
        //1: schema的展示
        iotDeviceDf.schema
        iotDeviceDf.printSchema()
    
        //2: schema中可以有复杂数据类型
        val schema =
          StructType(
            StructField("name", StringType, false) ::
              StructField("age", IntegerType, true) ::
              StructField("map", MapType(StringType, StringType), true) ::
              StructField("array", ArrayType(StringType), true) ::
              StructField("struct",
                StructType(Seq(StructField("field1", StringType), StructField("field2", StringType))))
              :: Nil)
    
        val people =
          spark.sparkContext.parallelize(Seq("tom,30", "katy, 46")).map(_.split(",")).map(p =>
            Row(p(0), p(1).trim.toInt, Map(p(0) -> p(1)), Seq(p(0), p(1)), Row("value1", "value2")))
        val dataFrame = spark.createDataFrame(people, schema)
        dataFrame.printSchema
        dataFrame.show()
    
        dataFrame.select("map").collect().map(row => row.getAs[Map[String, String]]("map"))
        dataFrame.select("array").collect().map(row => row.getAs[Seq[String]]("array"))
        dataFrame.select("struct").collect().map(row => row.getAs[Row]("struct"))
    
    
        //schema 的用处
        val exampleSchema = new StructType().add("name", StringType).add("age", IntegerType)
        exampleSchema("name")   ///提取name信息,类型
        exampleSchema.fields  //所有字段类型信息
        exampleSchema.fieldNames//   所有字段名字
        exampleSchema.fieldIndex("name")///  字段索引位置
    
        //1:查看一个parquet文件的schema
        val sessionDf = spark.read.parquet(s"${BASE_PATH}/trackerSession")
        sessionDf.schema
        sessionDf.printSchema()
    
        //2:比对两个parquet文件的schema是否相同
        val changedSchemaFieldNames = sessionDf.schema.fieldNames.map(fieldName => {
          if (fieldName == "pageview_count") {
            "pv_count"
          } else fieldName
        })
        sessionDf.toDF(changedSchemaFieldNames:_*).write.mode(SaveMode.Overwrite).parquet(s"${BASE_PATH}/trackerSession_changeSchema")
        val schemaChangeSessionDf = spark.read.parquet(s"${BASE_PATH}/trackerSession_changeSchema")
        schemaChangeSessionDf.schema
        schemaChangeSessionDf.printSchema()
    
        val oldSchema = sessionDf.schema
    
        val changeSchema = schemaChangeSessionDf.schema
    
        oldSchema == changeSchema //false
    
        //3:两个parquet文件的schema不一样,需要进行统一
        val allSessionError
          = spark.read.parquet(s"${BASE_PATH}/trackerSession", s"${BASE_PATH}/trackerSession_changeSchema")
        allSessionError.printSchema()
        allSessionError.show()
    
        val allSessionRight = sessionDf.toDF(changeSchema.fieldNames:_*).union(schemaChangeSessionDf)
        allSessionRight.printSchema()
        allSessionRight.show()
    
        spark.stop()
    
      }
    }
    

      

  • 相关阅读:
    jmeter入门----安装及简单使用
    python的学习路线
    利用selenium模块来编写网络爬虫
    软件测试的一些建议
    作为面试官的一些心得
    测试中常用的python第三方库
    Bootstrap:Bootstrap_table第一篇:快速用bootstrap_table(支持参数)筛选并展示数据,固定表格前几列,实现表格单元格编辑
    JS插件:fullCalendar图解
    Python脚本:实现对象集合List导入到excel表格,支持mysql,postergrsql,MongoDB
    Python脚本:实现excel表格导入到数据库,支持mysql,postgresql,MongoDB
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/11489035.html
Copyright © 2011-2022 走看看