Spark 实战
目录
一、Spark分桶写出到HDFS
- Spark没有分桶sink:Spark并没有像Flink那样提供分桶sink,所以就需要自定义OutputFormat类;
- 自定义output类:MultipleTextOutputFormat
// 并行写出,每个分区同时写数据到磁盘文件,如果两个分区要写入数据到同一个文件,则后写入的会覆盖先写入的,因此要避免不同分区往同一个文件中写数据
class MultipleOutputFormat extends MultipleTextOutputFormat[String, String] {
val filePathSepChar = "/"
// 根据数据中的key和value来计算路径
override def generateFileNameForKeyValue(key: String, value: String, name: String): String = {
val dayHour = key.split(" ")
val file = dayHour(2) // 文件编号就是分区编号,这样一个分区就只写到一个文件中
val path = dayHour(0) + filePathSepChar + dayHour(1) + filePathSepChar + "part" + file
path
}
/* 不写入key */
override def generateActualKey(key: String, value: String): String = {
null
}
}
- 使用saveAsHadoopFile:并传入自定义的OutputFormat类
val midPart = source.mapPartitionsWithIndex((idx, items) => {
val num = idx % 2
val dir = addHour("2020-07-15 00", num) // 目录
val path = dir + " " + idx // 拼上分区编号作为文件名,避免多个分区同时写入同一个文件,从而导致覆盖(数据丢失)
items.map(row => (path, row))
})
midPart.foreach(println)
// 由于MultipleTextOutputFormat是分区并行落盘,所以当两个分区写入同一个文件时,就会产生覆盖,所以这里通过HashPartitioner将相同key分到同一个分区,避免覆盖
// 新的分区个数最好与上面的目标分区分数相同(2),否则多出的分区是空的
// val res = midPart.partitionBy(new HashPartitioner(2))
midPart.saveAsHadoopFile(outputPathStr, classOf[String], classOf[String], classOf[MultipleOutputFormat])
二、常用算子
2.1 DF与RDD之间的转换
-
背景:相对于spark core来说,spark sql 会生成执行计划,并且会做优化,因此最好用DataFrame代替RDD;而DSL分格相对于SQL来说,可重构和可扩展性都要强很多,所以下面就介绍DSL风格的常用算子;
-
RDD转DF:
import spark.implicits._
// 普通类型
val rdd: RDD[(Int, String, Int)] = null
val df = rdd.toDF("id", "name", "age")
val inputFields = new List("id", "name", "age") // 实际开发中,最好将列名抽离出来
val df = rdd.toDF(inputFields: _*)
// 复杂类型
val rdd: RDD[((String, String, Int), Long)] = null
val df = rdd.toDF("key", "value") // 其中key是struct(String,String,Int)的结构体
- DF转RDD:
val df: DataFrame = rdd.toDF("id", "name", "age")
val rdd: RDD[Row] = df.rdd
val mid = rdd.map(row=>{
val id = row.getAs[Int]("id")
val name = row.getAs[String]("name")
val list = row.getAs[Seq[String]]("list") // 不能使用 Array[String]
})
- DF转DS:一般是使用样例类来作为ds的schema;
val ds: Dataset[(String, Int)] = df.as[(String, Int)]
2.2 RDD常用算子
2.2.1 转换算子
- map类:对每条数据做转换;
val res = rdd.map(x=>x) // 分区内每条数据调用一次;
// 一次将一个分区的数据读进内存,然后再一条条处理;如果map里面要创建(解析json对象、jdbc对象、时间
// 转换对象)大对象或者耗时的对象,最好是使用mapPartition算子,性能有提升,但是要注意OOM问题;
val res = rdd.mapPartition(iter=>iter.map(x=>x))
// 与mapPartition类似,只不过多了分区的编号index;
val res = rdd.mapPartitionWithIndwx((index, iter)=>iter.map(x=>(index, x)))
val res = rdd.flatMap(x=>x.split(" ")) // 将返回的Array[(String, Int)]类型压平为(String, Int)类型;
val res = rdd.mapValue(x=>x) // kv算子,对value做map操作;
- 聚合类:
// kv算子,根据k分组;rdd中的groupByKey算子不会做combine优化;
val res: RDD[(String, Iterable[Int])] = rdd.groupByKey()
// kv算子,根据k分组,并聚合v;会使用combine优化;
val res = rdd.reduceByKey(_+_)
// kv算子,根据k分组,自定义分区内、分区间合并规则;用来优化分组topN问题,非常灵活;
// 其他聚合算子底层也是调用这个算子;
val res = rdd.combineByKey(craeteCombiner, mergeValue, mergeCombiners)
// 加了初始值,自定义分区间聚合、分区间聚合;
val res = rdd2.aggregateByKey(10)((part, value) => agg + 2 * value, (part1, part2) => agg + value)
val res = rdd.foldByKey(10)((agg, value)=>agg+value) // 加了初始值
val res = rdd.cogroup(rdd2) // 返回RDD[(key, (Iterable[v1], iterable[v2]))],是sparkCore的join实现方式
- join类:
// 只有kv类型的rdd才可以join,并且会自动根据key连接;
val res = rdd.join(rdd2) // 内连接,(key, (v1, v2))
val res = rdd.leftOuterJoin(rdd2) // 左连接
val res = rdd.fullOuterJoin(rdd2) // 全连接
val res = rdd.cartesian(rdd2) // 笛卡尔积;分区数是 rdd分区数 * rdd2分区数;不是shuffle算子;
- 重分区:
// 不触发shuffle的重分区,不能扩大分区数;
// 如果上一个stage0的shuffle write是1000个分区,那么当前stage1会启动1000个task来处理,一个task
// 处理一个分区;但是如果当前stage1有coalesce(100),那么当前stage1只会启动100个task,一个task处理
//10个分区(10个分区作为一组);因此coalesce后的分区数不要与原分区数相差太大,否则运行很慢甚至OOM;
val res = rdd.coalesce(100)
val res = rdd.repartition(100) // 触发shuffle的重分区,根据随机key分区,等价于rdd.coalesce(100, true);
val res = rdd.partitionBy(new HashPartitioner(100)) // kv算子,根据key的hash值分区,可以自定义分区器;
// 重分区,并使分区内有序;比先repartition再sort的性能高,因为这个是在shuffle时边shuffle边排序;
val res = rdd.repartitionAndSortWithinPartitions(new HashPartitioner(3)) // kv算子
- 其他:
val res = rdd.filter(x=>x=="name") // 过滤
val res = rdd.distinct() // 去重
// 先是分区间有序,然后分区内有序,相同key会放到同一个分区;
// sortBy采用抽样来确定分区的边界,从而使分区数据均衡;抽样时用到了collect会生成job;
// 1,sample:创建RangePartitioner,对输入数据的key做sample来估算key的分布,然后排序切分出range
// 2,shuffle write:用上面的RangePartitioner对数据重分区,使相同范围数据在同一分区,即分区间有序;
// 3,shuffle read:每个reduce拉取自己分区的数据,然后分区内排序;
// 最后就达到了分区间有序和分区内有序,也就是全局有序;
// 难点就在于如何确定range的边界,以及快速将一个值映射到partition里,解决思路是:通过抽样来确定
// range的边界,通过字典树来构建索引,来快速找到分区;
val res = rdd.sortBy(x=>x._1, ascending=false) // 排序,第二个参数:是否升序;
// 由行动算子触发;task在读取分区时,先从cacheManager判断是否有缓存,有就直接获取,没有就计算
// 获取缓存时,先通过(rddId, 分区Id)到blockManager中查找block信息,有就直接获取,没有就计算
// 此时重新计算会加锁,因为可能会有多个线程同时读取(例如笛卡尔积),获取到锁的线程会计算该分区的数据
// 并缓存起来,后续的线程直接读缓存;用show和take触发的缓存,只会缓存计算的分区,不会缓存所有分区
val res = rdd.persist(StorageLevel) // 缓存,默认是内存
val res = rdd.union(rdd2) // 并集,不会触发shuffle
val res = rdd.intersection(rdd2) // 交集
val res = rdd.subtract(rdd2) // 差集
val res = rdd.glom() // 将一个分区的数据合并为一个数组
val res = rdd.sample(false, 0.1) // 抽样,(是否可放回,抽样比例[0, 1]),0.1=10%;
val res = rdd.toDebugString // 打印rdd的
2.2.2 行动算子
- 有返回值:
val res = rdd.count() // 求数量
val res = rdd.collect() // 将数据拉取到driver端
val res = rdd.take(10) // 返回前10条数据,只计算部分分区
val res = rdd.reduce((x, y)=>x+y) // 聚合算子,直接返回聚合后的结果
val res = rdd.countByKey() // kv算子,统计key出现的次数,返回返回Map[(String, Long)]
val res = rdd.countByValue() // 统计数据出现的次数,返回Map[(String, Long)]
- 无返回值:
val res = rdd.foreach() // 遍历每条数据,类似于map,但是没有返回值
val res = rdd.saveAsTextFile("path") // 保存到磁盘
2.3 DataFrame常用算子
- select:select中只能有列名,因为参数类型为Column;
df.select("name", "age")
df.select(col("name").as("rename")) // col("name") 等价于 $"name"
df.select($"name".as("rename"))
df.select(Column("*")) // 获取所有列
// 可以在select中使用的方法
df.select(concat($"name", $"id")) // concat中只能是列名,不能是字符串;concat_ws同理
// 获取struct类型中的字段;例如:(id, (name, age))类型的RDD转为(id, info)的DF,则DF的info字段就是
// struct类型,此时struct中的字段名默认是 '_1', '_2';可以通过col("info").getField("_1")来获取name字段;
df.select(col("struct").getField("id"))
- selectExpr:执行字符串格式的SQL表达式;
df.selectExpr("concat(name, 'sd') as name", "id / 10 as id") // 直接解析SQL字符串
- where / filter:过滤
df.where($"id" =!= 2)
df.where($"id" === 3)
df.where($"id">=3 and $"name" =!= "a") // and 等价于 &&
df.where("id>=2 and name != 'a' ") // 可以直接执行SQL字符串
- sort:排序
// 全局排序;先是分区有序,然后分区内有序,相同key会放到同一个分区;
df.sort($"name".desc)
df.sortWithinPartitions($"id") // 分区内排序
- group by:分组,分组后只能接聚合算子
df.groupBy($"name").agg("id" -> "max", "id" -> "sum") // "id"->"max" 等价于 max(id)
df.groupBy($"name").agg(collect_set($"id").as("ids"), count(lit(1)).as("cnt"))
- join:连接
// 注意,这里不能把 Seq("name")改为 $"name",会报错;使用Seq("key")时,右表的key会自动去掉;
df1.join(df2, Seq("name"), "left")
df2.as("dfa").join(df2.as("dfb"), $"dfa.name" === $"dfb.name", "left") // 给表取别名,然后自定义连接字段
df1.join(df2) // 不加连接条件的就是笛卡尔积,必须设spark.sql.crossJoin.enabled为true;
- over():开窗函数
val window = Window.partitionBy("name").orderBy($"id".desc)
val mid = df.withColumn("rnk", row_number().over(window))
- 对列的操作
df.withColumn("gid", $"id"+1) // 增加一列,列名为gid,如果gid列名存在,则会更新该列;
df.withColumnRenamed("name", "new_name") // 重命名
df.drop("name") // 删除一列
- read:
// 读取CSV文件,option("header", "true")表示csv文件中包含header信息,自动加载第一行作为header
val csv = spark.read.option("header", "true").csv("path")
// 读取resource中的文件,并转为rdd;spark的read只支持读取本地和分布式文件,所以用下面的方式;
val source = this.getClass.getClassLoader.getResourceAsStream("sample.csv")
val lines = Source.fromInputStream(source)("UTF-8").getLines()
val rdd = spark.sparkContext.parallelize(lines)
三、自定义函数
- UDF:自定义基本函数
// 通过实名函数创建
spark.sqlContext.udf.register("myudf", myUdf _)
// 通过匿名函数创建
spark.sqlContext.udf.register("myudf", (a: String, b: Int)=>{"sd_" + a + b})
// 使用
df.selectExpr("myudf(name, id) as str")
def myUdf(a: String, b: Int): String ={ // 自定义函数最多支持22个参数
val res = "sd_" + a + b
res
}
- UDAF:自定义聚合函数;需要实现UserDefinedAggregateFunction或者 类型安全的Aggregator接口
// 注册自定义聚合函数
spark.sqlContext.udf.register("avgudaf", new AvgUDAF)
df.groupBy().agg("id" -> "avgudaf")
// 定义自定义聚合函数
class AvgUDAF extends UserDefinedAggregateFunction{
//输入的数据类型
override def inputSchema: StructType = {
StructType(Array(StructField("id",IntegerType)))
}
//中间聚合处理时,所处理的数据类型
override def bufferSchema: StructType = {
StructType(Array(StructField("sum", DoubleType), StructField("count", IntegerType)))
}
//函数的返回类型
override def dataType: DataType = {
DoubleType
}
// 是否为幂等函数
override def deterministic: Boolean = {
true
}
//为每个分组的数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0)=0.0 // 初始化sum
buffer(1)=0 // 初始化count
}
//指的是,每个分组,有新的值进来时,如何进行分组的聚合计算
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val in = input.getInt(0).toDouble // 获取输入的id
buffer(0) = buffer.getDouble(0) + in // buffer(0)就是sum,buffer(1)就是count
buffer(1) = buffer.getInt(1) + 1
}
//由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update
//但是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val sum1 = buffer1.getDouble(0)
val sum2 = buffer2.getDouble(0)
val count1 = buffer1.getInt(1)
val count2 = buffer2.getInt(1)
buffer1(0) = sum1 + sum2
buffer1(1) = count1 + count2
}
//一个分组的聚合值,如何通过中间的聚合值,最后返回一个最终的聚合值
override def evaluate(buffer: Row): Any = {
val sum = buffer.getDouble(0)
val count = buffer.getInt(1)
val avg = (sum / count).formatted("%.2f").toDouble
avg
}
}