zoukankan      html  css  js  c++  java
  • Flink-Dataflows分区策略(四)

    shuffle

    场景:增大分区、提高并行度,解决数据倾斜

    DataStream → DataStream

    分区元素随机均匀分发到下游分区,网络开销比较大



    val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10).setParallelism(1) println(stream.getParallelism) stream.shuffle.print() env.execute()

    console result: 上游数据比较随意的分发到下游

    2> 1
    1> 4
    7> 10
    4> 6
    6> 3
    5> 7
    8> 2
    1> 5
    1> 8
    1> 9

     

    rebalance

    场景:增大分区、提高并行度,解决数据倾斜
    DataStream → DataStream
    轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    val stream = env.generateSequence(1,100)
    val shuffleStream = stream.rebalance
    shuffleStream.print()
    env.execute()
    
    console result:上游数据比较均匀的分发到下游,当数据量足够大的时候,数据就比较平均
    8> 6
    3> 1
    5> 3
    7> 5
    1> 7
    2> 8
    6> 4
    4> 2
    3> 9
    4> 10

    rescale

    场景:减少分区 防止发生大量的网络传输 不会发生全量的重分区
    DataStream → DataStream
    通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素
    注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.rescale.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    
    console result:stream1:1内容 分发给stream2:1和stream2:2
    
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10

    stream2:1

    1
    5
    9

    stream2:2

    3
    7

    stream2:3

    2
    6
    10

    stream2:4

    4
    8

    broadcast

    场景:需要使用映射表、并且映射表会经常发生变动的场景
    DataStream → DataStream
    上游中每一个元素内容广播到下游每一个分区中

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
    env.execute()

    console result:stream1:1、2内容广播到了下游每个分区中

    stream1:1

    1
    3
    5
    7
    9

    stream1:2

    2
    4
    6
    8
    10

    
    

    stream2:有4个文件,每个都是全量数据

    1
    3
    5
    7
    9
    2
    4
    6
    8
    10

    场景:

    1. 广播变量每个task只有一份且是全量数据,正常任务每个subTak执行的时候都会去重新拉数据,时间跨度长

    2. map端join

    global

    场景:并行度降为1
    DataStream → DataStream
    上游分区的数据只分发给下游的第一个分区
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.global.writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    
    console result:stream1:1、2内容只分发给了stream2:1
    
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10
    
    stream2:1
    1
    3
    5
    7
    9
    2
    4
    6
    8
    10

    forward

    场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略
    DataStream → DataStream
    上游分区数据分发到下游对应分区中
    partition1->partition1
    partition2->partition2
    注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:
    
    
    Forward partitioning does not allow change of parallelism
    * Upstream operation: Source: Sequence Source-1 parallelism: 2,
    * downstream operation: Sink: Unnamed-4 parallelism: 4
    * stream.forward.writeAsText("./data/stream2").setParallelism(4)
    
    
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.forward.writeAsText("./data/stream2").setParallelism(2)
    env.execute()
    
    console result:stream1:1->stream2:1、stream1:2->stream2:2
    stream1:1
    1
    3
    5
    7
    9
    
    stream1:2
    2
    4
    6
    8
    10
    
    stream2:1
    1
    3
    5
    7
    9
    
    stream2:2
    2
    4
    6
    8
    10

    keyBy

    场景:与业务场景匹配
    DataStream → DataStream
    根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

    原理:
    MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
    
    

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1,10).setParallelism(2)
    stream.writeAsText("./data/stream1").setParallelism(2)
    stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
    env.execute()

    console result:根据元素Hash值分发到下游分区中

    PartitionCustom

    DataStream → DataStream
    通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区
    
    
    
    object ShuffleOperator {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    val stream = env.generateSequence(1, 10).map((_, 1))
    stream.writeAsText("./data/stream1")
    stream.partitionCustom(new customPartitioner(), 0)
    .writeAsText("./data/stream2").setParallelism(4)
    env.execute()
    }

    class customPartitioner extends Partitioner[Long] {
    override def partition(key: Long, numPartitions: Int): Int = {
    key.toInt % numPartitions
    }
    }

    }
  • 相关阅读:
    (数据挖掘-入门-6)十折交叉验证和K近邻
    (数据挖掘-入门-5)基于内容的协同过滤与分类
    (数据挖掘-入门-4)基于物品的协同过滤
    (数据挖掘-入门-3)基于用户的协同过滤之k近邻
    (数据挖掘-入门-2)相似度衡量的方法
    (数据挖掘-入门-1)基于用户的协同过滤之最近邻
    SDL播放声音
    清空目录
    yuv转bmp
    ffmpeg解码视频流
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14882153.html
Copyright © 2011-2022 走看看