zoukankan      html  css  js  c++  java
  • spark-sql性能优化之——动态实现多个列应用同一个函数

    在对一个dataframe的多个列实现应用同一个函数时,是否能动态的指定?

    例如:

    对A,B,C三列实现分组统计

    1.初始化spark,构建DF

      val spark = SparkSession.builder()
        .appName("name")
        .master("local[2]")
        .getOrCreate()
      val df = spark.read.json("src\main\resources\json.txt")

    2.静态实现

      val newDF = df
        .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time")))
        .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time")))
        .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time")))

    3. 动态实现

    3.1 方法一:select 实现

      import spark.implicits._
    
      df.select($"*" +: Seq("A", "B", "C").map(c =>
        sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
      ): _*)
    
      //定义函数
       def withColumns(cols : Seq[String],df : DataFrame,f : String => Column) = {
         df.select($"*" +: cols.map(c => f(c)) : _*)
      }

    3.2 方法二:foldLeft实现

      
    Seq("A", "B", "C").foldLeft(df)((df, c) => df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time"))) )
    //定义函数 def withColumn(cols : Seq[String],df : DataFrame,f : String => Column, name : String => String = identity) = { cols.foldLeft(df)((df,c) => df.withColumn(name(c),f(c))) }
  • 相关阅读:
    wxpython的安装

    拓扑排序
    树,二叉树,森林,三者的转换
    二叉树的遍历
    最短路径
    图的遍历
    图的最小生成树
    哈夫曼树的应用
    哈夫曼树
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/10530739.html
Copyright © 2011-2022 走看看