zoukankan      html  css  js  c++  java
  • spark 系列之二 Dataframe的使用

    本篇主要是Dataframe的相关使用

    RDD在我们的使用过程,每一行存储的是一个RDD对象。

    RDD中有很多算子,可以供我们使用。比如最简单的wordcount,我们只需要简单的三个算子就可以完成hadoop写若干行代码才能完成的事,开发效率大大提升。

    我们上一讲提到的算子有map,reduceByKey,flatMap,groupByKey,mapValues,sortBy,sortByKey

    但是把数据的一行作为一个对象(黑盒),总觉得不够灵活,于是又引进了DataFrame,这个跟python pandas 里面的DataFrame对象有点类似。

    先说明RDD和DataFrame在构建上的区别

    上代码:

        /**
         * 创建sparksession 对象
         */
        val sparkSession = SparkSession.builder()
                          .master("local")
                          .appName("wordCount")
                          .getOrCreate()
    
        /**
         * 引入隐式转换
         */
        import sparkSession.implicits._
        val inputFile =  "file:///D:/software_download/spark_text/word.txt"
    
        /**
         * 这种方式是把文件先读成DataFrame 然后转化成 Dataset,再转化成RDD,再使用RDD的算子
         */
        sparkSession.read.option("charset","UTF-8")
                          .text(inputFile)
                          .as[String]
                          .rdd
                          .flatMap(line => line.split(" "))
                          .map(word => (word, 1))
                          .reduceByKey((a, b) => a + b)
                          .foreach(println)
    
        /**
         * 这种方式是把文件直接读成RDD,然后使用RDD的算子。
         */
        sparkSession.sparkContext.textFile(inputFile)
                         .flatMap(line => line.split(" "))
                         .map(word => (word, 1))
                         .reduceByKey((a, b) => a + b)
                         .foreach(println)

    上面代码中as[String]的操作是把dataframe 转换成了 dataset ,具体dataFrame和dataset有什么区别,可以先看下这篇文章。https://zhuanlan.zhihu.com/p/29830732

    下面是DataFrame的一些常用操作。

    /**
         * 这种方式是把文件直接读成RDD,然后使用RDD的算子。
         */
        sparkSession.sparkContext.textFile(inputFile)
                         .flatMap(line => line.split(" "))
                         .map(word => (word, 1))
                         .reduceByKey((a, b) => a + b)
                         .foreach(println)
    
        /**
         * 下面来读取一个csv文件,实践一些DataFrame的常规操作
         */
        //第一种方式
        val inputcsvFile="file:///D:/software_download/meta-nlp-competitor-car-model-mentioned-20201202.csv"
        val df = sparkSession.read.format("csv")
                          .option("header","true")
                          .option("charset","UTF-8")
                          .load(inputcsvFile)
        //第二种方式
        val df1 = sparkSession.read
                          .option("header","true")
                          .option("charset","UTF-8")
                          .csv(inputcsvFile)
        df.printSchema()
    
        // 像写sql一样读取 brandName,modelID,modelName,mainBrandName,haveOnSale 字段,并把字段brandName 改名为brandName_bn
        val dataFrame = df.select(df("brandName") as ("brandName_bn"), df("modelID"), df("modelName"), df("mainBrandName"), df("haveOnSale"))
        dataFrame.show(5)
    
        /**
         *把df注册成car_model表,使用sql语句查询,实现与上面相同的功能
         */
        df.createOrReplaceTempView("car_model")
    
        val sql_df = sparkSession.sql(
                                    """select
                                      |brandName as brandName_bn,modelID,modelName,mainBrandName,haveOnSale
                                      |from car_model
                                      |""".stripMargin)
        sql_df.show(5)
    
        /**
         * dataframe 的一些常规操作
         */
    
        dataFrame.filter(df("brandName").contains("汽车") && df("modelID")>5000).show()
        dataFrame.filter(df("brandName").contains("汽车") and df("modelID")>5000).sort(df("modelID").desc).show()
        dataFrame.filter(df("brandName").contains("汽车")).filter(df("modelID")>5000).show()
    
        /**
         * 更改字段类型,由于读取csv的默认的类型都是String类型,但是有时候我们需要转换成其它类型进行操作
         * 比如我们把modelID类型转换成long型
         */
    
        dataFrame.printSchema()
        val dataFrame_change = dataFrame.withColumn("modelID", col("modelID").cast(LongType))
        dataFrame_change.printSchema()

    上一篇的问题解答:

        val rdd = sparkSession.sparkContext.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
        /**
         * x._1 表示value中的第一个值,v._2 表示value中的第二个值
         */
        rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).foreach(println)

     下一讲:讲DataFrame与RDD之间的一些转化操作

  • 相关阅读:
    【leetcode】71. Simplify Path
    【leetcode】891. Sum of Subsequence Widths
    【leetcode】68. Text Justification
    【leetcode】84. Largest Rectangle in Histogram
    【leetcode】726. Number of Atoms
    【leetcode】429. N-ary Tree Level Order Traversal
    【leetcode】436. Find Right Interval
    【leetcode】778. Swim in Rising Water
    BEC listen and translation exercise 9
    BEC listen and translation exercise 8
  • 原文地址:https://www.cnblogs.com/suzhenxiang/p/14189378.html
Copyright © 2011-2022 走看看