shuffle
场景:增大分区、提高并行度,解决数据倾斜
DataStream → DataStream
分区元素随机均匀分发到下游分区,网络开销比较大
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(1) println(stream.getParallelism) stream.shuffle.print() env.execute()
console result: 上游数据比较随意的分发到下游
2> 1
1> 4
7> 10
4> 6
6> 3
5> 7
8> 2
1> 5
1> 8
1> 9
rebalance
场景:增大分区、提高并行度,解决数据倾斜 DataStream → DataStream 轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val stream = env.generateSequence(1,100) val shuffleStream = stream.rebalance shuffleStream.print() env.execute() console result:上游数据比较均匀的分发到下游,当数据量足够大的时候,数据就比较平均
8> 6 3> 1 5> 3 7> 5 1> 7 2> 8 6> 4 4> 2 3> 9 4> 10
rescale
场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区 DataStream → DataStream 通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素 注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游 val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.rescale.writeAsText("./data/stream2").setParallelism(4) env.execute() console result:stream1:1内容 分发给stream2:1和stream2:2 stream1:1 1 3 5 7 9 stream1:2 2 4 6 8 10
stream2:1
1
5
9
stream2:2
3
7
stream2:3
2
6
10
stream2:4
4
8
broadcast
场景:需要使用映射表、并且映射表会经常发生变动的场景
DataStream → DataStream
上游中每一个元素内容广播到下游每一个分区中
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()
console result:stream1:1、2内容广播到了下游每个分区中
stream1:1
1
3
5
7
9
stream1:2
2
4
6
8
10
stream2:有4个文件,每个都是全量数据
1
3
5
7
9
2
4
6
8
10
场景:
1. 广播变量每个task只有一份且是全量数据,正常任务每个subTak执行的时候都会去重新拉数据,时间跨度长
2. map端join
global
场景:并行度降为1 DataStream → DataStream 上游分区的数据只分发给下游的第一个分区 val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.global.writeAsText("./data/stream2").setParallelism(4) env.execute() console result:stream1:1、2内容只分发给了stream2:1 stream1:1 1 3 5 7 9 stream1:2 2 4 6 8 10 stream2:1 1 3 5 7 9 2 4 6 8 10
forward
场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略 DataStream → DataStream 上游分区数据分发到下游对应分区中 partition1->partition1 partition2->partition2 注意:必须保证上下游分区数(并行度)一致,不然会有如下异常: Forward partitioning does not allow change of parallelism * Upstream operation: Source: Sequence Source-1 parallelism: 2, * downstream operation: Sink: Unnamed-4 parallelism: 4 * stream.forward.writeAsText("./data/stream2").setParallelism(4) val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(2) stream.writeAsText("./data/stream1").setParallelism(2) stream.forward.writeAsText("./data/stream2").setParallelism(2) env.execute() console result:stream1:1->stream2:1、stream1:2->stream2:2 stream1:1 1 3 5 7 9 stream1:2 2 4 6 8 10 stream2:1 1 3 5 7 9 stream2:2 2 4 6 8 10
keyBy
场景:与业务场景匹配
DataStream → DataStream
根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区
原理:MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()
console result:根据元素Hash值分发到下游分区中
PartitionCustom
DataStream → DataStream
通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
object ShuffleOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val stream = env.generateSequence(1, 10).map((_, 1))
stream.writeAsText("./data/stream1")
stream.partitionCustom(new customPartitioner(), 0)
.writeAsText("./data/stream2").setParallelism(4)
env.execute()
}
class customPartitioner extends Partitioner[Long] {
override def partition(key: Long, numPartitions: Int): Int = {
key.toInt % numPartitions
}
}
}