zoukankan      html  css  js  c++  java
  • 2,Spark分桶sink、常用算子、以及自定义函数(UDF、UDAF)

    Spark 实战

    一、Spark分桶写出到HDFS

    • Spark没有分桶sink:Spark并没有像Flink那样提供分桶sink,所以就需要自定义OutputFormat类;
    • 自定义output类:MultipleTextOutputFormat
    // 并行写出,每个分区同时写数据到磁盘文件,如果两个分区要写入数据到同一个文件,则后写入的会覆盖先写入的,因此要避免不同分区往同一个文件中写数据
    class MultipleOutputFormat extends MultipleTextOutputFormat[String, String] {
      val filePathSepChar = "/"
    
      // 根据数据中的key和value来计算路径
      override def generateFileNameForKeyValue(key: String, value: String, name: String): String = {
        val dayHour = key.split(" ")
        val file = dayHour(2) // 文件编号就是分区编号,这样一个分区就只写到一个文件中
        val path = dayHour(0) + filePathSepChar + dayHour(1) + filePathSepChar + "part" + file
        path
      }
    
        /* 不写入key */
        override def generateActualKey(key: String, value: String): String = {
          null
        }
    }
    
    • 使用saveAsHadoopFile:并传入自定义的OutputFormat类
     val midPart = source.mapPartitionsWithIndex((idx, items) => {
          val num = idx % 2
          val dir = addHour("2020-07-15 00", num) // 目录
          val path = dir + " " + idx  // 拼上分区编号作为文件名,避免多个分区同时写入同一个文件,从而导致覆盖(数据丢失)
          items.map(row => (path, row))
        })
        midPart.foreach(println)
    
        // 由于MultipleTextOutputFormat是分区并行落盘,所以当两个分区写入同一个文件时,就会产生覆盖,所以这里通过HashPartitioner将相同key分到同一个分区,避免覆盖
        // 新的分区个数最好与上面的目标分区分数相同(2),否则多出的分区是空的
        //    val res = midPart.partitionBy(new HashPartitioner(2))
        midPart.saveAsHadoopFile(outputPathStr, classOf[String], classOf[String], classOf[MultipleOutputFormat])
    


    二、常用算子

    2.1 DF与RDD之间的转换

    • 背景:相对于spark core来说,spark sql 会生成执行计划,并且会做优化,因此最好用DataFrame代替RDD;而DSL分格相对于SQL来说,可重构和可扩展性都要强很多,所以下面就介绍DSL风格的常用算子;

    • RDD转DF:

    import spark.implicits._
    // 普通类型
    val rdd: RDD[(Int, String, Int)] = null    
    val df = rdd.toDF("id", "name", "age")
    val inputFields = new List("id", "name", "age")  // 实际开发中,最好将列名抽离出来
    val df = rdd.toDF(inputFields: _*)
    // 复杂类型
    val rdd: RDD[((String, String, Int), Long)] = null  
    val df = rdd.toDF("key", "value")   // 其中key是struct(String,String,Int)的结构体
    
    • DF转RDD:
    val df: DataFrame = rdd.toDF("id", "name", "age")
    val rdd: RDD[Row] = df.rdd
    val mid = rdd.map(row=>{
        val id = row.getAs[Int]("id")
        val name = row.getAs[String]("name")
        val list = row.getAs[Seq[String]]("list")   // 不能使用 Array[String]
    })
    
    • DF转DS:一般是使用样例类来作为ds的schema;
    val ds: Dataset[(String, Int)] = df.as[(String, Int)]
    

    2.2 RDD常用算子

    2.2.1 转换算子
    • map类:对每条数据做转换;
    val res = rdd.map(x=>x)  // 分区内每条数据调用一次;
    // 一次将一个分区的数据读进内存,然后再一条条处理;如果map里面要创建(解析json对象、jdbc对象、时间
    // 转换对象)大对象或者耗时的对象,最好是使用mapPartition算子,性能有提升,但是要注意OOM问题;
    val res = rdd.mapPartition(iter=>iter.map(x=>x))
    // 与mapPartition类似,只不过多了分区的编号index;
    val res = rdd.mapPartitionWithIndwx((index, iter)=>iter.map(x=>(index, x)))
    val res = rdd.flatMap(x=>x.split(" "))  // 将返回的Array[(String, Int)]类型压平为(String, Int)类型;
    val res = rdd.mapValue(x=>x) // kv算子,对value做map操作;
    
    • 聚合类:
    // kv算子,根据k分组;rdd中的groupByKey算子不会做combine优化;
    val res: RDD[(String, Iterable[Int])] = rdd.groupByKey() 
    // kv算子,根据k分组,并聚合v;会使用combine优化;
    val res = rdd.reduceByKey(_+_)
    // kv算子,根据k分组,自定义分区内、分区间合并规则;用来优化分组topN问题,非常灵活;
    // 其他聚合算子底层也是调用这个算子;
    val res = rdd.combineByKey(craeteCombiner, mergeValue, mergeCombiners)
    // 加了初始值,自定义分区间聚合、分区间聚合;
    val res = rdd2.aggregateByKey(10)((part, value) => agg + 2 * value, (part1, part2) => agg + value)
    val res = rdd.foldByKey(10)((agg, value)=>agg+value)  // 加了初始值
    val res = rdd.cogroup(rdd2)   // 返回RDD[(key, (Iterable[v1], iterable[v2]))],是sparkCore的join实现方式
    
    • join类:
    // 只有kv类型的rdd才可以join,并且会自动根据key连接;
    val res = rdd.join(rdd2)  // 内连接,(key, (v1, v2))
    val res = rdd.leftOuterJoin(rdd2)  // 左连接
    val res = rdd.fullOuterJoin(rdd2)  // 全连接
    val res = rdd.cartesian(rdd2)  // 笛卡尔积;分区数是 rdd分区数 * rdd2分区数;不是shuffle算子;
    
    • 重分区:
    // 不触发shuffle的重分区,不能扩大分区数;
    // 如果上一个stage0的shuffle write是1000个分区,那么当前stage1会启动1000个task来处理,一个task
    // 处理一个分区;但是如果当前stage1有coalesce(100),那么当前stage1只会启动100个task,一个task处理
    //10个分区(10个分区作为一组);因此coalesce后的分区数不要与原分区数相差太大,否则运行很慢甚至OOM;
    val res = rdd.coalesce(100)   
    val res = rdd.repartition(100)  // 触发shuffle的重分区,根据随机key分区,等价于rdd.coalesce(100, true);
    val res = rdd.partitionBy(new HashPartitioner(100)) // kv算子,根据key的hash值分区,可以自定义分区器;
    // 重分区,并使分区内有序;比先repartition再sort的性能高,因为这个是在shuffle时边shuffle边排序;
    val res = rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3))  // kv算子
    
    • 其他:
    val res = rdd.filter(x=>x=="name")  // 过滤
    val res = rdd.distinct()  // 去重
    // 先是分区间有序,然后分区内有序,相同key会放到同一个分区;
    // sortBy采用抽样来确定分区的边界,从而使分区数据均衡;抽样时用到了collect会生成job;
    // 1,sample:创建RangePartitioner,对输入数据的key做sample来估算key的分布,然后排序切分出range
    // 2,shuffle write:用上面的RangePartitioner对数据重分区,使相同范围数据在同一分区,即分区间有序;
    // 3,shuffle read:每个reduce拉取自己分区的数据,然后分区内排序;
    // 最后就达到了分区间有序和分区内有序,也就是全局有序;
    // 难点就在于如何确定range的边界,以及快速将一个值映射到partition里,解决思路是:通过抽样来确定
    // range的边界,通过字典树来构建索引,来快速找到分区;
    val res = rdd.sortBy(x=>x._1, ascending=false)  // 排序,第二个参数:是否升序;
    // 由行动算子触发;task在读取分区时,先从cacheManager判断是否有缓存,有就直接获取,没有就计算
    // 获取缓存时,先通过(rddId, 分区Id)到blockManager中查找block信息,有就直接获取,没有就计算
    // 此时重新计算会加锁,因为可能会有多个线程同时读取(例如笛卡尔积),获取到锁的线程会计算该分区的数据
    // 并缓存起来,后续的线程直接读缓存;用show和take触发的缓存,只会缓存计算的分区,不会缓存所有分区
    val res = rdd.persist(StorageLevel)  // 缓存,默认是内存
    val res = rdd.union(rdd2)  // 并集,不会触发shuffle
    val res = rdd.intersection(rdd2)  // 交集
    val res = rdd.subtract(rdd2)  // 差集
    val res = rdd.glom()  // 将一个分区的数据合并为一个数组
    val res = rdd.sample(false, 0.1)  // 抽样,(是否可放回,抽样比例[0, 1]),0.1=10%;
    val res = rdd.toDebugString  // 打印rdd的
    

    2.2.2 行动算子
    • 有返回值:
    val res = rdd.count()  // 求数量
    val res = rdd.collect()  // 将数据拉取到driver端
    val res = rdd.take(10)  // 返回前10条数据,只计算部分分区
    val res = rdd.reduce((x, y)=>x+y)  // 聚合算子,直接返回聚合后的结果
    val res = rdd.countByKey()  // kv算子,统计key出现的次数,返回返回Map[(String, Long)]
    val res = rdd.countByValue()  // 统计数据出现的次数,返回Map[(String, Long)]
    
    • 无返回值:
    val res = rdd.foreach()  // 遍历每条数据,类似于map,但是没有返回值
    val res = rdd.saveAsTextFile("path")  // 保存到磁盘
    


    2.3 DataFrame常用算子

    • select:select中只能有列名,因为参数类型为Column;
    df.select("name", "age")
    df.select(col("name").as("rename"))    // col("name") 等价于 $"name"
    df.select($"name".as("rename"))
    df.select(Column("*"))  // 获取所有列
    // 可以在select中使用的方法
    df.select(concat($"name", $"id"))     // concat中只能是列名,不能是字符串;concat_ws同理
    // 获取struct类型中的字段;例如:(id, (name, age))类型的RDD转为(id, info)的DF,则DF的info字段就是
    // struct类型,此时struct中的字段名默认是 '_1', '_2';可以通过col("info").getField("_1")来获取name字段;
    df.select(col("struct").getField("id")) 
    
    • selectExpr:执行字符串格式的SQL表达式;
    df.selectExpr("concat(name, 'sd') as name", "id / 10 as id")   // 直接解析SQL字符串
    
    • where / filter:过滤
    df.where($"id" =!= 2)
    df.where($"id" === 3)
    df.where($"id">=3 and $"name" =!= "a")  // and 等价于 &&
    df.where("id>=2 and name != 'a' ")   // 可以直接执行SQL字符串
    
    • sort:排序
    // 全局排序;先是分区有序,然后分区内有序,相同key会放到同一个分区;
    df.sort($"name".desc)   
    df.sortWithinPartitions($"id")   // 分区内排序
    
    • group by:分组,分组后只能接聚合算子
    df.groupBy($"name").agg("id" -> "max", "id" -> "sum")  //  "id"->"max" 等价于 max(id)
    df.groupBy($"name").agg(collect_set($"id").as("ids"), count(lit(1)).as("cnt"))
    
    • join:连接
    // 注意,这里不能把 Seq("name")改为  $"name",会报错;使用Seq("key")时,右表的key会自动去掉;
    df1.join(df2, Seq("name"), "left")   
    df2.as("dfa").join(df2.as("dfb"), $"dfa.name" === $"dfb.name", "left")  // 给表取别名,然后自定义连接字段
    df1.join(df2)  // 不加连接条件的就是笛卡尔积,必须设spark.sql.crossJoin.enabled为true;
    
    • over():开窗函数
    val window = Window.partitionBy("name").orderBy($"id".desc)
    val mid = df.withColumn("rnk", row_number().over(window))
    
    • 对列的操作
    df.withColumn("gid", $"id"+1)   // 增加一列,列名为gid,如果gid列名存在,则会更新该列;
    df.withColumnRenamed("name", "new_name")   // 重命名
    df.drop("name")  // 删除一列
    
    • read:
    // 读取CSV文件,option("header", "true")表示csv文件中包含header信息,自动加载第一行作为header
    val csv = spark.read.option("header", "true").csv("path")
    // 读取resource中的文件,并转为rdd;spark的read只支持读取本地和分布式文件,所以用下面的方式;
    val source = this.getClass.getClassLoader.getResourceAsStream("sample.csv")
    val lines = Source.fromInputStream(source)("UTF-8").getLines()
    val rdd = spark.sparkContext.parallelize(lines)
    


    三、自定义函数

    • UDF:自定义基本函数
    // 通过实名函数创建
    spark.sqlContext.udf.register("myudf", myUdf _)
    // 通过匿名函数创建
    spark.sqlContext.udf.register("myudf", (a: String, b: Int)=>{"sd_" + a + b})
    // 使用
    df.selectExpr("myudf(name, id) as str")
    
    def myUdf(a: String, b: Int): String ={   // 自定义函数最多支持22个参数
        val res = "sd_" + a + b
        res
      }
    
    • UDAF:自定义聚合函数;需要实现UserDefinedAggregateFunction或者 类型安全的Aggregator接口
    // 注册自定义聚合函数
    spark.sqlContext.udf.register("avgudaf", new AvgUDAF)
    df.groupBy().agg("id" -> "avgudaf")
    
    // 定义自定义聚合函数
    class AvgUDAF extends UserDefinedAggregateFunction{
      //输入的数据类型
      override def inputSchema: StructType = {
        StructType(Array(StructField("id",IntegerType)))
      }
      //中间聚合处理时,所处理的数据类型
      override def bufferSchema: StructType = {
        StructType(Array(StructField("sum", DoubleType), StructField("count", IntegerType)))
      }
      //函数的返回类型
      override def dataType: DataType = {
        DoubleType
      }
      // 是否为幂等函数
      override def deterministic: Boolean = {
        true
      }
      //为每个分组的数据初始化
      override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0)=0.0  // 初始化sum
        buffer(1)=0    // 初始化count
      }
      //指的是,每个分组,有新的值进来时,如何进行分组的聚合计算
      override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val in = input.getInt(0).toDouble   // 获取输入的id
        buffer(0) = buffer.getDouble(0) + in   // buffer(0)就是sum,buffer(1)就是count
        buffer(1) = buffer.getInt(1) + 1
      }
      //由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update
      //但是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并
      override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        val sum1 = buffer1.getDouble(0)
        val sum2 = buffer2.getDouble(0)
        val count1 = buffer1.getInt(1)
        val count2 = buffer2.getInt(1)
        buffer1(0) = sum1 + sum2
        buffer1(1) = count1 + count2
      }
      //一个分组的聚合值,如何通过中间的聚合值,最后返回一个最终的聚合值
      override def evaluate(buffer: Row): Any = {
        val sum = buffer.getDouble(0)
        val count = buffer.getInt(1)
        val avg = (sum / count).formatted("%.2f").toDouble
        avg
      }
    }
    
  • 相关阅读:
    day55---前端基础之BOM操作和DOM操作
    每日作业5/8
    数据库之索引
    数据库之视图、触发器、事务、存储过程、内置函数、流程控制
    每日作业5/7
    数据备份与pymysql模块
    Navicat与MySQL使用
    每日作业5/6
    数据库之多表查询
    数据库之单表查询
  • 原文地址:https://www.cnblogs.com/shendeng23/p/15240728.html
Copyright © 2011-2022 走看看