zoukankan      html  css  js  c++  java
  • Flink-Dataflows分区策略(四)

    shuffle

    场景:增大分区、提高并行度,解决数据倾斜

    DataStream → DataStream

    分区元素随机均匀分发到下游分区,网络开销比较大



    val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(1) println(stream.getParallelism) stream.shuffle.print() env.execute()

    console result: 上游数据比较随意的分发到下游

    2> 1
    1> 4
    7> 10
    4> 6
    6> 3
    5> 7
    8> 2
    1> 5
    1> 8
    1> 9

     

    rebalance

    场景:增大分区、提高并行度,解决数据倾斜
    DataStream → DataStream
    轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    val stream = env.generateSequence(1,100)
    val shuffleStream = stream.rebalance
    shuffleStream.print()
    env.execute()
    
    console result:上游数据比较均匀的分发到下游,当数据量足够大的时候,数据就比较平均
    8> 6
    3> 1
    5> 3
    7> 5
    1> 7
    2> 8
    6> 4
    4> 2
    3> 9
    4> 10

    rescale

    场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区
    DataStream → DataStream
    通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
    注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.rescale.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    
    console result:stream1:1内容 分发给stream2:1和stream2:2
    
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10

    stream2:1

    1
    5
    9

    stream2:2

    3
    7

    stream2:3

    2
    6
    10

    stream2:4

    4
    8

    broadcast

    场景:需要使用映射表、并且映射表会经常发生变动的场景
    DataStream → DataStream
    上游中每一个元素内容广播到下游每一个分区中

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
    env.execute()

    console result:stream1:1、2内容广播到了下游每个分区中

    stream1:1

    1
    3
    5
    7
    9

    stream1:2

    2
    4
    6
    8
    10

    
    

    stream2:有4个文件,每个都是全量数据

    1
    3
    5
    7
    9
    2
    4
    6
    8
    10

    场景:

    1. 广播变量每个task只有一份且是全量数据,正常任务每个subTak执行的时候都会去重新拉数据,时间跨度长

    2. map端join

    global

    场景:并行度降为1
    DataStream → DataStream
    上游分区的数据只分发给下游的第一个分区
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.global.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    
    console result:stream1:1、2内容只分发给了stream2:1
    
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10
    
    stream2:1
    1
    3
    5
    7
    9
    2
    4
    6
    8
    10

    forward

    场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略
    DataStream → DataStream
    上游分区数据分发到下游对应分区中
    partition1->partition1
    partition2->partition2
    注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:
    
    
    Forward partitioning does not allow change of parallelism
    * Upstream operation: Source: Sequence Source-1 parallelism: 2,
    * downstream operation: Sink: Unnamed-4 parallelism: 4
    * stream.forward.writeAsText("./data/stream2").setParallelism(4)
    
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.forward.writeAsText("./data/stream2").setParallelism(2)
    env.execute()
    
    console result:stream1:1->stream2:1、stream1:2->stream2:2
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10
    
    stream2:1
    1
    3
    5
    7
    9
    
    stream2:2
    2
    4
    6
    8
    10

    keyBy

    场景:与业务场景匹配
    DataStream → DataStream
    根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

    原理:
    MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
    
    

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
    env.execute()

    console result:根据元素Hash值分发到下游分区中

    PartitionCustom

    DataStream → DataStream
    通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
    
    
    
    object ShuffleOperator {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    val stream = env.generateSequence(1, 10).map((_, 1))
    stream.writeAsText("./data/stream1")
    stream.partitionCustom(new customPartitioner(), 0)
    .writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    }

    class customPartitioner extends Partitioner[Long] {
    override def partition(key: Long, numPartitions: Int): Int = {
    key.toInt % numPartitions
    }
    }

    }
  • 相关阅读:
    HDU 3081 Marriage Match II
    HDU 4292 Food
    HDU 4322 Candy
    HDU 4183 Pahom on Water
    POJ 1966 Cable TV Network
    HDU 3605 Escape
    HDU 3338 Kakuro Extension
    HDU 3572 Task Schedule
    HDU 3998 Sequence
    Burning Midnight Oil
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14882153.html
Copyright © 2011-2022 走看看