zoukankan      html  css  js  c++  java
  • 使用dataframe解决spark TopN问题:分组、排序、取TopN和join相关问题

    package com.profile.main
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import org.apache.log4j.{Level, Logger}
    import com.profile.tools.{DateTools, JdbcTools, LogTools, SparkTools}
    import com.dhd.comment.Constant
    import com.profile.comment.Comments
    /**
    * 测试类 //使用dataframe解决spark TopN问题:分组、排序、取TopN
    * @author
    * date 2017-09-27 14:55
    */
    object Test {

    def main(args: Array[String]): Unit = {
    val sc=SparkTools.getSparkContext
        //设置日志级别,避免输出大量无关的信息
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    1、使用dataframe解决spark TopN问题

    val df = sc.parallelize(Seq(
    (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
    (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
    (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
    (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")

    df.show
    /*
    +----+--------+----------+
    |Hour|Category|TotalValue|
    +----+--------+----------+
    | 0| cat26| 30.9|
    | 0| cat13| 22.1|
    | 0| cat95| 19.6|
    | 0| cat105| 1.3|
    | 1| cat67| 28.5|
    | 1| cat4| 26.8|
    | 1| cat13| 12.6|
    | 1| cat23| 5.3|
    | 2| cat56| 39.6|
    | 2| cat40| 29.7|
    | 2| cat187| 27.9|
    | 2| cat68| 9.8|
    | 3| cat8| 35.6|
    +----+--------+----------+
    */

    /* val w = Window.partitionBy($"Hour").orderBy($"TotalValue".desc)
    //取Top1
    val dfTop1 = df.withColumn("rn", rowNumber.over(w)).where($"rn" === 1).drop("rn")
    //注意:row_number()在spark1.x版本中为rowNumber(),在2.x版本为row_number()
    //取Top3
    val dfTop3 = df.withColumn("rn", rowNumber.over(w)).where($"rn" <= 3).drop("rn")

    dfTop1.show*/
    /*
    +----+--------+----------+
    |Hour|Category|TotalValue|
    +----+--------+----------+
    | 1| cat67| 28.5|
    | 3| cat8| 35.6|
    | 2| cat56| 39.6|
    | 0| cat26| 30.9|
    +----+--------+----------+
    */
    // dfTop3.show
    /*
    +----+--------+----------+
    |Hour|Category|TotalValue|
    +----+--------+----------+
    | 1| cat67| 28.5|
    | 1| cat4| 26.8|
    | 1| cat13| 12.6|
    | 3| cat8| 35.6|
    | 2| cat56| 39.6|
    | 2| cat40| 29.7|
    | 2| cat187| 27.9|
    | 0| cat26| 30.9|
    | 0| cat13| 22.1|
    | 0| cat95| 19.6|
    +----+--------+----------+
    */


    //使用RDD解决spark TopN问题:分组、排序、取TopN

    val rdd1 = sc.parallelize(Seq(
    (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
    (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
    (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
    (3,"cat8",35.6)))

    val rdd2 = rdd1.map(x => (x._1,(x._2, x._3))).groupByKey()
    /*
    rdd2.collect
    res9: Array[(Int, Iterable[(String, Double)])] = Array((0,CompactBuffer((cat26,30.9), (cat13,22.1), (cat95,19.6), (cat105,1.3))),
    (1,CompactBuffer((cat67,28.5), (cat4,26.8), (cat13,12.6), (cat23,5.3))),
    (2,CompactBuffer((cat56,39.6), (cat40,29.7), (cat187,27.9), (cat68,9.8))),(3,CompactBuffer((cat8,35.6))))

    */
    val N_value = 1 //取前3

    val rdd3 = rdd2.map( x => {
    val i2 = x._2.toBuffer
    val i2_2 = i2.sortBy(_._2)
    if (i2_2.length > N_value) i2_2.remove(0, (i2_2.length - N_value))
    (x._1, i2_2.toIterable)
    })

    /*
    rdd3.collect
    res8: Array[(Int, Iterable[(String, Double)])] = Array((0,ArrayBuffer((cat95,19.6), (cat13,22.1), (cat26,30.9))),
    (1,ArrayBuffer((cat13,12.6), (cat4,26.8), (cat67,28.5))),
    (2,ArrayBuffer((cat187,27.9), (cat40,29.7), (cat56,39.6))),(3,ArrayBuffer((cat8,35.6))))
    */

    val rdd4 = rdd3.flatMap(x => {
    val y = x._2
    for (w <- y) yield (x._1, w._1, w._2)
    })

    rdd4.collect
    /*
    res3: Array[(Int, String, Double)] = Array((0,cat95,19.6), (0,cat13,22.1), (0,cat26,30.9),
    (1,cat13,12.6), (1,cat4,26.8), (1,cat67,28.5),
    (2,cat187,27.9), (2,cat40,29.7), (2,cat56,39.6), (3,cat8,35.6))
    */

    rdd4.toDF("Hour", "Category", "TotalValue").show
    /* +----+--------+----------+
    |Hour|Category|TotalValue|
    +----+--------+----------+
    | 0| cat95| 19.6|
    | 0| cat13| 22.1|
    | 0| cat26| 30.9|
    | 2| cat187| 27.9|
    | 2| cat40| 29.7|
    | 2| cat56| 39.6|
    | 1| cat13| 12.6|
    | 1| cat4| 26.8|
    | 1| cat67| 28.5|
    | 3| cat8| 35.6|
    +----+--------+----------+*/

    }
    2、下面再来看DataFrame的join操作。

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}

    /**
     * spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)
     */
    object DataFrameSQL_2 {
        def main(args: Array[String]) {

        val conf = new SparkConf()
        conf.setAppName("test").setMaster("local")

        val sc = new SparkContext(conf)

       //设置日志级别
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
      Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

      val sqlContext = new HiveContext(sc)
      import sqlContext.implicits._

      val df = sc.parallelize(Array(
      ("one", "A", 1), ("one", "B", 2), ("two", "A", 3), ("two", "B", 4)
      )).toDF("key1", "key2", "value")
      df.show()
    // +----+----+-----+
    // |key1|key2|value|
    // +----+----+-----+
    // | one| A| 1|
    // | one| B| 2|
    // | two| A| 3|
    // | two| B| 4|
    // +----+----+-----+

    val df2 = sc.parallelize(Array(
       ("one", "A", 5), ("two", "A", 6)
       )).toDF("key1", "key2", "value2")
    df2.show()

    // +----+----+------+
    // |key1|key2|value2|
    // +----+----+------+
    // | one| A| 5|
    // | two| A| 6|
    // +----+----+------+


    //多列join可以用&&或者and,对其进行JOIN操作之后,发现多产生了KEY1和KEY2这样的两个字段
    val joined = df.join(df2, df("key1") === df2("key1") && df("key2") === df2("key2"), "left_outer")

    //这样也可

    //val joined = df.join(df2, df("key1") === df2("key1")  and df("key2") === df2("key2"), "left_outer") 
    joined.show()

    // +----+----+-----+----+----+------+
    // |key1|key2|value|key1|key2|value2|
    // +----+----+-----+----+----+------+
    // | two| A| 3| two| A| 6|
    // | two| B| 4|null|null| null|
    // | one| A| 1| one| A| 5|
    // | one| B| 2|null|null| null|
    // +----+----+-----+----+----+------+

    1. 假如这两个字段同时存在,那么就会报错,如下:org.apache.spark.sql.AnalysisException: Reference 'key2' is ambiguous  
    2. 因此,如何在JOIN之后删除列的,可以通过修改JOIN的表达式,完全可以避免这个问题。主要是通过Seq这个对象来实现。 

    //多列join最好还是使用Seq()

    df.join(df2, Seq("key1", "key2"), "left_outer").show()

    // +----+----+-----+------+
    // |key1|key2|value|value2|
    // +----+----+-----+------+
    // | two| A| 3| 6|
    // | two| B| 4| null|
    // | one| A| 1| 5|
    // | one| B| 2| null|
    // +----+----+-----+------+

    //多列列名相同,则可以修改列名
    val df22 = df2.withColumnRenamed("key1","k1").withColumnRenamed("key2","k2")

    df.join(df22,df("key1") === df22("k1") && df("key2") === df22("k2"), "left_outer").show()
    // +----+----+-----+----+----+------+
    // |key1|key2|value| k1| k2|value2|
    // +----+----+-----+----+----+------+
    // | two| A| 3| two| A| 6|
    // | two| B| 4|null|null| null|
    // | one| A| 1| one| A| 5|
    // | one| B| 2|null|null| null|
    // +----+----+-----+----+----+------+

    sc.stop()

    }

    }


    3、下面再来看RDD的join操作。
    /建立一个基本的键值对RDD,包含ID和名称,其中ID为1、2、3、4
    val Arrayrdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")),2)
    //建立一个行业薪水的键值对RDD,包含ID和薪水,其中ID为1、2、3、5
    val Arrayrdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")),2)

    println("//下面做Join操作,预期要得到(1,×)、(2,×)、(3,×)")
    val joinRDD=Arrayrdd1.join(Arrayrdd2).collect.foreach(println)

    println("//下面做leftOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(4,×)")
    val leftJoinRDD=Arrayrdd1.leftOuterJoin(Arrayrdd2).collect.foreach(println)
    println("//下面做rightOutJoin操作,预期要得到(1,×)、(2,×)、(3,×)、(5,×)")
    val rightJoinRDD=Arrayrdd1.rightOuterJoin(Arrayrdd2).collect.foreach(println)
    结果输出如下:

    和预期的效果一样。

    4、DataFrame 的函数

    Action 操作
    1、 collect() ,返回值是一个数组,返回dataframe集合所有的行
    2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
    3、 count() 返回一个number类型的,返回dataframe集合的行数
    4、 describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。
    例如df.describe("age", "height").show()
    5、 first() 返回第一行 ,类型是row类型
    6、 head() 返回第一行 ,类型是row类型
    7、 head(n:Int)返回n行 ,类型是row 类型
    8、 show()返回dataframe集合的值 默认是20行,返回类型是unit
    9、 show(n:Int)返回n行,,返回值类型是unit
    10、 table(n:Int) 返回n行 ,类型是row 类型
    dataframe的基本操作
    1、 cache()同步数据的内存
    2、 columns 返回一个string类型的数组,返回值是所有列的名字
    3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
    4、 explan()打印执行计划 物理的
    5、 explain(n:Boolean) 输入值为 false 或者true ,返回值是unit 默认是false ,如果输入true 将会打印 逻辑的和物理的
    6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
    7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
    8、 printSchema() 打印出字段名称和类型 按照树状结构来打印
    9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
    10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回
    11、 toDF()返回一个新的dataframe类型的
    12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
    13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据
    14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

    集成查询:
    1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值
    df.agg(max("age"), avg("salary"))
    df.groupBy().agg(max("age"), avg("salary"))
    2、 agg(exprs: Map[String, String]) 返回dataframe类型 ,同数学计算求值 map类型的
    df.agg(Map("age" -> "max", "salary" -> "avg"))
    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    3、 agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe类型 ,同数学计算求值
    df.agg(Map("age" -> "max", "salary" -> "avg"))
    df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
    4、 apply(colName: String) 返回column类型,捕获输入进去列的对象
    5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
    6、 col(colName: String) 返回column类型,捕获输入进去列的对象
    7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
    8、 distinct 去重 返回一个dataframe类型
    9、 drop(col: Column) 删除某列 返回dataframe类型
    10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
    11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
    12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分
    df.explode("name","names") {name :String=> name.split(" ")}.show();
    将name字段根据空格来拆分,拆分的字段放在names里面
    13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型 df.filter("age>10").show(); df.filter(df("age")>10).show(); df.where(df("age")>10).show(); 都可以
    14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型 df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
    15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
    16、 join(right: DataFrame, joinExprs: Column, joinType: String)
    一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
    df.join(ds,df("name")===ds("name") and df("age")===ds("age"),"outer").show();
    17、 limit(n: Int) 返回dataframe类型 去n 条数据出来
    18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行
    19、 orderBy(sortExprs: Column*) 做alise排序
    20、 select(cols:string*) dataframe 做字段的刷选 df.select($"colA", $"colB" + 1)
    21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();
    22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc
    23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
    24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
    25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();



    }
  • 相关阅读:
    HDU 1124 Factorial
    hdu 1690 Bus System
    hdu 1113 Word Amalgamation
    POJ 2482 Stars in Your Window
    hdu 1385 ZOJ 1456 Minimum Transport Cost(经典floyd)
    hdu 1907 John
    VMware 虚拟机 安装 UBuntu 9.10 命令模式转换成窗口模试
    #pragma CODE_SEG __NEAR_SEG NON_BANKED详解
    Ubuntu 下Hadoop 伪分布式 hadoop0.20.2.tar.gz 的安装
    文件拷贝代码以及疑问
  • 原文地址:https://www.cnblogs.com/hd-zg/p/7874291.html
Copyright © 2011-2022 走看看