zoukankan      html  css  js  c++  java
  • sparkSQL中的example学习(3)

    UserDefinedTypedAggregation.scala(用户可自定义类型)

    
    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
    
    object UserDefinedTypedAggregation {
    
     case class Employee(name: String, salary: Long)
     case class Average(var sum: Long, var count: Long)
    
    
     object MyAverage extends Aggregator[Employee, Average, Double] {
    
      //A zero value for this aggregation. Should satisfy the property that any b + zero = b
      def zero: Average = Average(0L, 0L)
    
      //Commine two values to produce a new value. For performance, the function may modify `buffer`
      //and return it instead of constructiong a new object
      def reduce(buffer: Average, employee: Employee): Average = {
       buffer.sum += employee.salary
       buffer.count += 1
       buffer
      }
    
      //Merge two intermediate values
      def merge(b1: Average, b2: Average): Average = {
       b1.sum += b2.sum
       b1.count += b2.count
       b1
      }
    
      //Transform the ouput of the reduction
      def finish(reducetion: Average): Double = reducetion.sum.toDouble / reducetion.count
    
      //Specifies the Encoder for the intermediate value type
      def bufferEncoder: Encoder[Average] = Encoders.product
    
      //Specifies the Encoder for the final output value type
      def outputEncoder: Encoder[Double] = Encoders.scalaDouble
     }
    
    // $example off: type_custom_aggregation$
    
    
     def main(args: Array[String]): Unit = {
      val spark = SparkSession
        .builder()
        .appName("Spark SQL user-defined Datasets aggregation example")
        .master("local")
        .getOrCreate()
    
      import spark.implicits._
    
      val ds = spark.read.json("/Users/hadoop/app/spark/examples/src/main/resources/employees.json").as[Employee]
      ds.show()
    
      val averageSalary = MyAverage.toColumn.name("average_salary")
      val result = ds.select(averageSalary)
      result.show()
    
    
    
      spark.stop()
     }
    
    }
    
    

    屏幕快照 2019-05-14 03.57.12

  • 相关阅读:
    iOS离屏渲染简书
    iOS Waxpatch项目(动态更新)
    waxpatch修改任意类的用法
    ios waxpatch lua语法
    ios WaxPatch热更新原理
    WaxPatch中demo注意问题
    ios wax热更新之安装wax(xcode7.3.1)
    获取第三方键盘高度(包括自带键盘高度)
    25个增强iOS应用程序性能的提示和技巧(高级篇)(2)
    JS基础_一元运算符
  • 原文地址:https://www.cnblogs.com/suixingc/p/sparksql-zhong-deexample-xue-xi-3.html
Copyright © 2011-2022 走看看