zoukankan      html  css  js  c++  java
  • Spark提高篇——RDD/DataSet/DataFrame(二)

    欢迎大家关注我的公众号,“互联网西门二少”,我将继续输出我的技术干货~

    该部分分为两篇,分别介绍RDD与Dataset/DataFrame:

    一、RDD

    二、DataSet/DataFrame

    该篇主要介绍DataSet与DataFrame。

    一、生成DataFrame

    1.1.通过case class构造DataFrame

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataFrameTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataFrameTest")
          .getOrCreate()
    
        import spark.implicits._
        val caseClassDF = Seq(Person(1, "lily", 18), Person(2, "lucy", 20)).toDF("id", "name", "age")
        println("=========== show: ============")
        caseClassDF.show()
        println("=========== schema: ===========")
        caseClassDF.printSchema()
    
        spark.stop()
      }
    }

            这里通过“import spark.implicits._”使用了隐式Encoder,否则无法调用“.toDF()”,这里的spark为上面定义的sparkSession变量,并不是“import org.apache.spark”,注意不要混淆。

            运行结果:

    1534072368_74_w963_h367.png

            可以看到,我们将两个Person实例封装为DataFrame实例,之后便可以像操作表/视图一样来对其进行其他处理了。

    1.2.通过数值构造DataFrame

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataFrameTest {
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataFrameTest")
          .getOrCreate()
    
        import spark.implicits._
        val valueDF = Seq(1, 2, 3).toDF("id")
        println("=========== show: ============")
        valueDF.show()
        println("=========== schema: ===========")
        valueDF.printSchema()
    
        spark.stop()
      }
    }

            通过这个和上面的例子可以看到,我们可以通过Seq将任何值类型对象转换为DataFrame对象,Seq类似于Java的List。

            运行结果:

    1534072404_69_w1439_h713.png

    1.3.通过SparkSession读取数据

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataFrameTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataFrameTest")
          .getOrCreate()
    
        // 默认将整行定义为value: String, 可以为空
        val hdfsDF = spark.read.text(dataPath)
    
        println("=========== show: ============")
        hdfsDF.show()
        println("=========== schema: ===========")
        hdfsDF.printSchema()
    
        val personEncoder: Encoder[Person] = Encoders.product
    
        val personDF = hdfsDF.map(row => {
          //val value = row.getAs[String]("value")
          val value = row.getString(0)
          val fields = value.trim.split(",")
          Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
        })(personEncoder).toDF()
    
        println("=========== show: ============")
        personDF.show()
        println("=========== schema: ===========")
        personDF.printSchema()
    
        spark.stop()
      }
    }

            运行结果:

            可以看到,“spark.read.text(dataPath)”默认将文件中的一行定义为String类型的value字段,可以通过get(0)、getString(0)或getAs[String]("value")来获取value的内容。       

            这里我们没有使用“import spark.implicits._”将Person隐式Encoder,而是通过“val personEncoder: Encoder[Person] = Encoders.product”显式定义一个Encoder[Person]类型的变量,并调用“hdfsDF.map(...)(personEncoder)”来显式Encoder,并在map之后调用".toDF"将map的结果转换为DataFrame,map的结果为DataSet类型,所以DataSet可以直接调用“.toDF”转换为DataFrame。如果使用“import spark.implicits._” ,就不需要定义“personEncoder”变量,也不需要为map的最后一个参数赋值。

    1.4.通过RDD转换为DataFrame

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataFrameTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataFrameTest")
          .getOrCreate()
    
        import spark.implicits._
        val rddDF = spark.sparkContext.textFile(dataPath)
          .map(row => row.split(","))
          .map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
          .toDF("id", "name", "age")
    
        println("=========== show: ============")
        rddDF.show()
        println("=========== schema: ===========")
        rddDF.printSchema()
    
        spark.stop()
      }
    }

            可以看到,RDD转换为DataFrame与通过Seq生成DataFrame一样,都需要“import spark.implicits._”,否则无法调用“.toDF”。

            运行结果:

    1534074813_9_w1440_h815.png

    二、生成DataSet

    2.1.通过case class构造DataSet

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataSetTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataSetTest")
          .getOrCreate()
    
        import spark.implicits._
        val caseClassDs = Seq(Person(1, "lily", 18), Person(2, "lucy", 20))
          .toDS()
    
        println("=========== show: ============")
        caseClassDs.show()
        println("=========== schema: ===========")
        caseClassDs.printSchema()
    
        spark.stop()
      }
    }

             可以看到,类似于1.1,只需要将“.toDF”换为“.toDS”即可得到DataSet类型的结果。

            运行结果:

    1534076444_12_w877_h348.png

            可以看到其结构等于DF。

    2.2.通过数值构造DataSet

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataSetTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataSetTest")
          .getOrCreate()
    
        import spark.implicits._
    
        val valueDs = Seq(1, 2, 3).toDS()
    
        println("=========== show: ============")
        valueDs.show()
        println("=========== schema: ===========")
        valueDs.printSchema()
    
        spark.stop()
      }
    }

             运行结果:

    1534076566_68_w854_h328.png

            可以看到,其结构类似于DF,但是列名默认为value。

    2.3.通过SparkSession读取数据

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataSetTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataSetTest")
          .getOrCreate()
    
        
        // 默认将整行定义为value: String, 可以为空
        val hdfsDF = spark.read.text(dataPath)
    
        println("=========== show: ============")
        hdfsDF.show()
        println("=========== schema: ===========")
        hdfsDF.printSchema()
    
        val personEncoder: Encoder[Person] = Encoders.product
    
        val personDs = hdfsDF.map(row => {
          val value = row.getAs[String]("value")
          //val value = row.getString(0)
          val fields = value.trim.split(",")
          Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2)))
        })(personEncoder)
    
        println("=========== show: ============")
        personDs.show()
        println("=========== schema: ===========")
        personDs.printSchema()
    
        spark.stop()
      }
    }

            通过1.3与2.3可以看到,SparkSession读取文件(SparkSession.read.*)得到的为DataFrame,DataFrame经过map、filter等操作后得到的为DataSet,DataSet又可以通过“.toDF”转换为DataFrame,这也印证了官网对DataFrame的定义:

    type DataFrame = DataSet[Row]

    2.4.通过RDD转换为DataSet

    package com.personal.test
    
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object DataSetTest {
    
      case class Person(id: Int, name: String, age: Int)
    
      def main(args: Array[String]): Unit = {
        val dataPath = "hdfs://192.168.60.164:9000/user/name/input"
    
        val spark = SparkSession
          .builder()
          .appName("DataSetTest")
          .getOrCreate()
    
        import spark.implicits._
        val rddDS = spark.sparkContext.textFile(dataPath)
          .map(row => row.split(","))
          .map(fields => Person(Integer.parseInt(fields(0)), fields(1), Integer.parseInt(fields(2))))
          .toDS()
    
        println("=========== show: ============")
        rddDS.show()
        println("=========== schema: ===========")
        rddDS.printSchema()
    
        spark.stop()
      }
    }

             类似于1.4,只需要将“.toDF”替换为“.toDS”即可得到DataSet类型的结果。

            通过以上(一)、(二)内容我们看到了如何将文件、数值、对象、RDD转换为DataSet/DataFrame,以及DataSet与DataFrame之间如何互转,如何隐式/显式使用Encoder。

    接下来介绍基于DataSet/DataFrame可以进行哪些操作。

    三、基于DataSet/DataFrame的操作

            除了上例中用到的map,还有filter、count、first、foreach、reduce等同样可以基于RDD进行的操作,此外,还有几个特殊操作:

    3.1. select

            select可以接受一个或多个参数,用于从DataSet/DataFrame中获取指定字段。

    3.2. createOrReplaceTempView

            createOrReplaceTempView用于将DataSet/DataFrame的数据临时保存为视图,方便后续使用SparkSession.sql()进行操作,Session结束时生命周期结束。

    3.3. printSchema

            如(一)(二)中示例所示,printSchema用于打印DataSet/DataFrame数据集的树形结构定义。

    3.4. withColumnRenamed

            withColumnRenamed用于对列重命名,类似于sql语句“select a as aa, b as bb from table”中的as。

    3.5. join

            join用于按指定的join表达式与join类型(默认为inner join)将另一个DataSet/DataFrame与当前DataSet/DataFrame合并。

            这里举一个不完整的例子,用以演示基于DataSet/DataFrame的操作。

        val exposureLogEncoder: Encoder[ExposureLog] = Encoders.product
        val exposureLogTupleEncoder: Encoder[Tuple1[ExposureLog]] = Encoders.product
        // 计数器
        val dataCounter = spark.sparkContext.longAccumulator("dataCounter")
        val legalDataCounter = spark.sparkContext.longAccumulator("legalDataCounter")
        val illegalAvCounter = spark.sparkContext.longAccumulator("illegalAvCounter")
        val illegalKvCounter = spark.sparkContext.longAccumulator("illegalKvCounter")
        val illegalReportkeyCounter = spark.sparkContext.longAccumulator("illegalReportkeyCounter")
    
        // 1.曝光日志: select id,ei,av,ui,kv from mta_t_boss_v1_5362
        val exposureLogDF = HiveUtil.readTableByPartition(tdw, exposure, spark, partition)
        if (exposureLogDF == null) {
          System.exit(2)
        }
        val exposureLogDS = exposureLogDF.select("id", "ei", "ei", "av", "ui", "kv", "log_time")
          .filter(row => {
            dataCounter.add(1)
            val av = row.getAs[String]("av")
            if (av == null
              || av.trim.startsWith("1.6.2")
              || av.compareTo("0.9.5")<0) {
              illegalAvCounter.add(1)
              false
            }
            else true
          })
          .filter(row => {
            val kv = row.getAs[String]("kv")
            if (kv == null || kv.trim.length == 0) {
              illegalKvCounter.add(1)
              false
            }
            else true
          })
          .map(row =>parseExposure(row))(exposureLogTupleEncoder)
          .filter(exposure => {
            val obj = exposure._1
            if (obj == null) {
              illegalReportkeyCounter.add(1)
              false
            }
            else {
              legalDataCounter.add(1)
              true
            }
          })
          .map(row => row._1)(exposureLogEncoder)
    
        val resultCount = exposureLogDS.persist().count()
        println(s"Data count: ${dataCounter.value}")
        println(s"Legal data count: ${legalDataCounter.value}")
        println(s"Result count: ${resultCount}")
        println(s"Illegal av count: ${illegalAvCounter.value}")
        println(s"Illegal kv count: ${illegalKvCounter.value}")
        println(s"Illegal reportKey count: ${illegalReportkeyCounter.value}")
    
    
        // 1.save log info to hdfs
        exposureLogDS.persist().map(exposure => exposure.toString())(Encoders.STRING)
          .write.mode(SaveMode.Overwrite).text(logSavePath)
        println(s"[INFO] save log to ${logSavePath} success.")
    
        exposureLogDS.persist().select("sign", "channel").createOrReplaceTempView("log")
        val sql = "select sign, channel, count(*) as data_count " +
          "from log " +
          "group by sign, channel"
        val aggDF = ss.sql(sql)
    
        aggDF.printSchema()
        // 2.save log statistics info to hdfs
        aggDF.map(row => row.mkString(","))(Encoders.STRING)
          .repartition(1)
          .write.mode(SaveMode.Overwrite)
          .text(logStatisticsInfoSavePath)
        println(s"[INFO] save statistics info to ${logStatisticsInfoSavePath} success.")

            注:HiveUtil.readTableByPartition()为自定义函数,用于从hive中读取指定数据库/表/分区的数据,结果为DataFrame类型。

            上例从hive中读取数据后,使用select获取指定字段,然后使用filter根据指定字段进行非法数据过滤,之后再调用map进行数据预处理、解析等工作,之后在调用filter进行空数据过滤,最后使用map对Tuple1拆箱。之后将处理结果通过map构造为字符串并保存到hdfs,同时使用createOrReplaceTempView创建临时视图,再通过SparkSession.sql对视图进行聚合操作,以统计sign,channel纬度的记录数,之后使用printSchema打印sql操作后数据集的schema结构,最后将聚合后的统计信息通过map构造为字符串保存到hdfs。可以看到日常数据处理过程中会经常遇到如上例一般的需求。

            另外,提一下,使用Accumulator的时候要保证只执行一次action操作,否则需要执行cache或者persist来保证计数器不重复计数,如上例中重复使用了exposureLogDS,如果不执行persist/cache会导致计数器重复计数。

            另外,注意例中第一个map返回的结构为Tuple1[ExposureLog],之所以将ExposureLog又包了一层,是因为“Product type is represented as a row, and the entire row can not be null in Spark SQL like normal databases”,所以如果需要返回null对象,就需要对其装箱,使返回值为非空对象,再在后续流程(如最后一个filter 、map)中拆箱。

    其他操作可以参考官网API:

    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

  • 相关阅读:
    【译】第33节---种子数据
    python文件的读写
    python-----实现数据库安装和连接操作
    学习进度(第三周)
    解 idea(.IntelliJIdea2019.3)双击打不开的原因
    python爬虫------处理cookie的相关请求
    安装retrying模块出现 (Retry(total=4, connect=None, read=None, redirect=None, status=None))
    java----统计疫情可视化展示--echarts(三)
    java----统计疫情可视化展示--echarts(二)
    java----统计疫情可视化展示--echarts
  • 原文地址:https://www.cnblogs.com/deep-learning-stacks/p/10213034.html
Copyright © 2011-2022 走看看