zoukankan      html  css  js  c++  java
  • 049 DSL语句

    1.说明

      

    2.sql程序

     1 package com.scala.it
     2 
     3 
     4 import org.apache.spark.sql.hive.HiveContext
     5 import org.apache.spark.{SparkConf, SparkContext}
     6 
     7 import scala.math.BigDecimal.RoundingMode
     8 
     9 object SparkSQLDSLDemo {
    10   def main(args: Array[String]): Unit = {
    11     val conf = new SparkConf()
    12       .setMaster("local[*]")
    13       .setAppName("dsl")
    14     val sc = SparkContext.getOrCreate(conf)
    15     val sqlContext = new HiveContext(sc)
    16 
    17     // =================================================
    18     sqlContext.sql(
    19       """
    20         | SELECT
    21         |  deptno as no,
    22         |  SUM(sal) as sum_sal,
    23         |  AVG(sal) as avg_sal,
    24         |  SUM(mgr) as sum_mgr,
    25         |  AVG(mgr) as avg_mgr
    26         | FROM hadoop09.emp
    27         | GROUP BY deptno
    28         | ORDER BY deptno DESC
    29       """.stripMargin).show()
    30   }
    31 }

    3.效果

      

    4.DSL对上面程序重构

     1 package com.scala.it
     2 
     3 
     4 import org.apache.spark.sql.hive.HiveContext
     5 import org.apache.spark.{SparkConf, SparkContext}
     6 
     7 import scala.math.BigDecimal.RoundingMode
     8 
     9 object SparkSQLDSLDemo {
    10   def main(args: Array[String]): Unit = {
    11     val conf = new SparkConf()
    12       .setMaster("local[*]")
    13       .setAppName("dsl")
    14     val sc = SparkContext.getOrCreate(conf)
    15     val sqlContext = new HiveContext(sc)
    16 
    17     // =================================================
    18     sqlContext.sql(
    19       """
    20         | SELECT
    21         |  deptno as no,
    22         |  SUM(sal) as sum_sal,
    23         |  AVG(sal) as avg_sal,
    24         |  SUM(mgr) as sum_mgr,
    25         |  AVG(mgr) as avg_mgr
    26         | FROM hadoop09.emp
    27         | GROUP BY deptno
    28         | ORDER BY deptno DESC
    29       """.stripMargin).show()
    30 
    31     //=================================================
    32     // 读取数据形成DataFrame,并缓存DataFrame
    33     val df = sqlContext.read.table("hadoop09.emp")
    34     df.cache()
    35     //=================================================
    36     import sqlContext.implicits._
    37     import org.apache.spark.sql.functions._
    38 
    39     //=================================================对上面sql进行DSL
    40     df.select("deptno", "sal", "mgr")
    41       .groupBy("deptno")
    42       .agg(
    43         sum("sal").as("sum_sal"),
    44         avg("sal").as("avg_sal"),
    45         sum("mgr").as("sum_mgr"),
    46         avg("mgr").as("avg_mgr")
    47       )
    48       .orderBy($"deptno".desc)
    49       .show()
    50   }
    51 }

    5.效果

      

    6.Select语句

      可以使用string,也可以使用col,或者$。

      在Select中可以使用自定义的函数进行使用。

     1 package com.scala.it
     2 
     3 
     4 import org.apache.spark.sql.hive.HiveContext
     5 import org.apache.spark.{SparkConf, SparkContext}
     6 
     7 import scala.math.BigDecimal.RoundingMode
     8 
     9 object SparkSQLDSLDemo {
    10   def main(args: Array[String]): Unit = {
    11     val conf = new SparkConf()
    12       .setMaster("local[*]")
    13       .setAppName("dsl")
    14     val sc = SparkContext.getOrCreate(conf)
    15     val sqlContext = new HiveContext(sc)
    16 
    17     // =================================================
    18     sqlContext.sql(
    19       """
    20         | SELECT
    21         |  deptno as no,
    22         |  SUM(sal) as sum_sal,
    23         |  AVG(sal) as avg_sal,
    24         |  SUM(mgr) as sum_mgr,
    25         |  AVG(mgr) as avg_mgr
    26         | FROM hadoop09.emp
    27         | GROUP BY deptno
    28         | ORDER BY deptno DESC
    29       """.stripMargin).show()
    30 
    31     //=================================================
    32     // 读取数据形成DataFrame,并缓存DataFrame
    33     val df = sqlContext.read.table("hadoop09.emp")
    34     df.cache()
    35     //=================================================
    36     import sqlContext.implicits._
    37     import org.apache.spark.sql.functions._
    38 
    53     //=================================================Select语句
    54     df.select("empno", "ename", "deptno").show()
    55     df.select(col("empno").as("id"), $"ename".as("name"), df("deptno")).show()
    56     df.select($"empno".as("id"), substring($"ename", 0, 1).as("name")).show()
    57     df.selectExpr("empno as id", "substring(ename,0,1) as name").show()
    58 
    59     //使用自定义的函数
    60     sqlContext.udf.register(
    61       "doubleValueFormat", // 自定义函数名称
    62       (value: Double, scale: Int) => {
    63         // 自定义函数处理的代码块
    64         BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_DOWN).doubleValue()
    65       })
    66     df.selectExpr("doubleValueFormat(sal,2)").show()
    67   }
    68 }

    7.Where语句

    1     //=================================================Where语句
    2     df.where("sal > 1000 and sal < 2000").show()
    3     df.where($"sal" > 1000 && $"sal" < 2000).show()

    8.groupBy语句

      建议使用第三种方式,也是最常见的使用方式。

      同样是支持自定义函数。

    package com.scala.it
    
    
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.math.BigDecimal.RoundingMode
    
    object SparkSQLDSLDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setMaster("local[*]")
          .setAppName("dsl")
        val sc = SparkContext.getOrCreate(conf)
        val sqlContext = new HiveContext(sc)
    
        // =================================================
        sqlContext.sql(
          """
            | SELECT
            |  deptno as no,
            |  SUM(sal) as sum_sal,
            |  AVG(sal) as avg_sal,
            |  SUM(mgr) as sum_mgr,
            |  AVG(mgr) as avg_mgr
            | FROM hadoop09.emp
            | GROUP BY deptno
            | ORDER BY deptno DESC
          """.stripMargin).show()
    
        //=================================================
        // 读取数据形成DataFrame,并缓存DataFrame
        val df = sqlContext.read.table("hadoop09.emp")
        df.cache()
        //=================================================
        import sqlContext.implicits._
        import org.apache.spark.sql.functions._
       
        //=================================================GroupBy语句
        //这种方式不推荐使用,下面也说明了问题
        df.groupBy("deptno").agg(
          "sal" -> "min", // 求min(sal)
          "sal" -> "max", // 求max(sal) ===> 会覆盖同列的其他聚合函数,解决方案:重新命名
          "mgr" -> "max" // 求max(mgr)
        ).show()
    
        sqlContext.udf.register("selfAvg", AvgUDAF)
        df.groupBy("deptno").agg(
          "sal" -> "selfAvg"
        ).toDF("deptno", "self_avg_sal").show()
    
        df.groupBy("deptno").agg(
          min("sal").as("min_sal"),
          max("sal").as("max_sal"),
          max("mgr")
        ).where("min_sal > 1200").show()
    
      }
    }

     9.sort、orderBy排序

    1   //=================================================数据排序
    2     // sort、orderBy ==> 全局有序
    3     // repartition ==> 局部数据有序
    4     df.sort("sal").select("empno", "sal").show()
    5     df.repartition(3).sort($"sal".desc).select("empno", "sal").show()
    6     df.repartition(3).orderBy($"sal".desc).select("empno", "sal").show()
    7     df.repartition(3).sortWithinPartitions($"sal".desc).select("empno", "sal").show()

    10.窗口函数

     1 //=================================================Hive的窗口分析函数
     2     // 必须使用HiveContext来构建DataFrame
     3     // 通过row_number函数来实现分组排序TopN的需求: 先按照某些字段进行数据分区,然后分区的数据在分区内进行topN的获取
     4     val window = Window.partitionBy("deptno").orderBy($"sal".desc)
     5     df.select(
     6       $"empno",
     7       $"ename",
     8       $"deptno",
     9       row_number().over(window).as("rnk")
    10     ).where("rnk <= 3").show()

    二:总程序总览

      1 package com.scala.it
      2 
      3 
      4 import org.apache.spark.sql.expressions.Window
      5 import org.apache.spark.sql.hive.HiveContext
      6 import org.apache.spark.{SparkConf, SparkContext}
      7 
      8 import scala.math.BigDecimal.RoundingMode
      9 
     10 object SparkSQLDSLDemo {
     11   def main(args: Array[String]): Unit = {
     12     val conf = new SparkConf()
     13       .setMaster("local[*]")
     14       .setAppName("dsl")
     15     val sc = SparkContext.getOrCreate(conf)
     16     val sqlContext = new HiveContext(sc)
     17 
     18     // =================================================
     19     sqlContext.sql(
     20       """
     21         | SELECT
     22         |  deptno as no,
     23         |  SUM(sal) as sum_sal,
     24         |  AVG(sal) as avg_sal,
     25         |  SUM(mgr) as sum_mgr,
     26         |  AVG(mgr) as avg_mgr
     27         | FROM hadoop09.emp
     28         | GROUP BY deptno
     29         | ORDER BY deptno DESC
     30       """.stripMargin).show()
     31 
     32     //=================================================
     33     // 读取数据形成DataFrame,并缓存DataFrame
     34     val df = sqlContext.read.table("hadoop09.emp")
     35     df.cache()
     36     //=================================================
     37     import sqlContext.implicits._
     38     import org.apache.spark.sql.functions._
     39 
     40     //=================================================对上面sql进行DSL
     41     df.select("deptno", "sal", "mgr")
     42       .groupBy("deptno")
     43       .agg(
     44         sum("sal").as("sum_sal"),
     45         avg("sal").as("avg_sal"),
     46         sum("mgr").as("sum_mgr"),
     47         avg("mgr").as("avg_mgr")
     48       )
     49       .orderBy($"deptno".desc)
     50       .show()
     51 
     52     //=================================================Select语句
     53     df.select("empno", "ename", "deptno").show()
     54     df.select(col("empno").as("id"), $"ename".as("name"), df("deptno")).show()
     55     df.select($"empno".as("id"), substring($"ename", 0, 1).as("name")).show()
     56     df.selectExpr("empno as id", "substring(ename,0,1) as name").show()
     57 
     58     //使用自定义的函数
     59     sqlContext.udf.register(
     60       "doubleValueFormat", // 自定义函数名称
     61       (value: Double, scale: Int) => {
     62         // 自定义函数处理的代码块
     63         BigDecimal.valueOf(value).setScale(scale, RoundingMode.HALF_DOWN).doubleValue()
     64       })
     65     df.selectExpr("doubleValueFormat(sal,2)").show()
     66 
     67     //=================================================Where语句
     68     df.where("sal > 1000 and sal < 2000").show()
     69     df.where($"sal" > 1000 && $"sal" < 2000).show()
     70 
     71     //=================================================GroupBy语句
     72     //这种方式不推荐使用,下面也说明了问题
     73     df.groupBy("deptno").agg(
     74       "sal" -> "min", // 求min(sal)
     75       "sal" -> "max", // 求max(sal) ===> 会覆盖同列的其他聚合函数,解决方案:重新命名
     76       "mgr" -> "max" // 求max(mgr)
     77     ).show()
     78 
     79     sqlContext.udf.register("selfAvg", AvgUDAF)
     80     df.groupBy("deptno").agg(
     81       "sal" -> "selfAvg"
     82     ).toDF("deptno", "self_avg_sal").show()
     83 
     84     df.groupBy("deptno").agg(
     85       min("sal").as("min_sal"),
     86       max("sal").as("max_sal"),
     87       max("mgr")
     88     ).where("min_sal > 1200").show()
     89 
     90 
     91     //=================================================数据排序
     92     // sort、orderBy ==> 全局有序
     93     // repartition ==> 局部数据有序
     94     df.sort("sal").select("empno", "sal").show()
     95     df.repartition(3).sort($"sal".desc).select("empno", "sal").show()
     96     df.repartition(3).orderBy($"sal".desc).select("empno", "sal").show()
     97     df.repartition(3).sortWithinPartitions($"sal".desc).select("empno", "sal").show()
     98 
     99     //=================================================Hive的窗口分析函数
    100     // 必须使用HiveContext来构建DataFrame
    101     // 通过row_number函数来实现分组排序TopN的需求: 先按照某些字段进行数据分区,然后分区的数据在分区内进行topN的获取
    102     val window = Window.partitionBy("deptno").orderBy($"sal".desc)
    103     df.select(
    104       $"empno",
    105       $"ename",
    106       $"deptno",
    107       row_number().over(window).as("rnk")
    108     ).where("rnk <= 3").show()
    109   }
    110 }
  • 相关阅读:
    js 函数柯里化和闭包的使用
    人员轨迹运动效果
    D3绘制柱状图
    D3选择元素和绑定数据
    h5--uni.setNavigationBarColor 动态修改顶部背景颜色
    友链
    canvas 整个透明
    JS将某个数组分割为N个对象一组(如,两两一组,三三一组等)
    小程序正则表达式
    微信小程序--设置和获取剪切板内容
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9388049.html
Copyright © 2011-2022 走看看