zoukankan      html  css  js  c++  java
  • FLINK基础(135):DS算子与窗口(25)Flink DataStream 八大分区策略与自定义分区器(1)

    分区策略决定了一条数据如何发送给下游。Flink中默认提供了八大分区策略(也叫分区器)。

    本文基于Flink 1.9.0总结Flink DataStream中的八大分区策略以及手动实现一个自定义分区器。

    八大分区策略继承关系图

    八大分区策略继承关系图

    • ChannelSelector: 接口,决定将记录写入哪个Channel。有3个方法:
    1. void setup(int numberOfChannels): 初始化输出Channel的数量。
    2. int selectChannel(T record): 根据当前记录以及Channel总数,决定应将记录写入下游哪个Channel。八大分区策略的区别主要在这个方法的实现上。
    3. boolean isBroadcast(): 是否是广播模式。决定了是否将记录写入下游所有Channel
    • StreamPartitioner:抽象类,也是所有流分区器GlobalPartitioner,ShufflePartitioner,RebalancePartitioner,RescalePartitioner,BroadcastPartitioner,ForwardPartitioner,KeyGroupStreamPartitioner,CustomPartitioner的基类。

    注意:

    1. 这里以及下边提到的Channel可简单理解为下游Operator的某个实例。
    2. Flink 中改变并行度,默认RebalancePartitioner分区策略。
    3. 分区策略,可在Flink WebUI上直观看出,如REBALANCE,即使用了RebalancePartitioner分区策略;SHUFFLE,即使用了ShufflePartitioner分区策略。

    GlobalPartitioner: DataStream => DataStream

    GlobalPartitioner,GLOBAL分区。将记录输出到下游Operator的第一个实例。

    selectChannel实现

    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            //对每条记录,只选择下游operator的第一个Channel
            return 0;
    }

    API使用

    dataStream
        .setParallelism(2)
        // 采用GLOBAL分区策略重分区
        .global()
        .print()
        .setParallelism(1);

    ShufflePartitioner: DataStream => DataStream

    ShufflePartitionerSHUFFLE分区。将记录随机输出到下游Operator的每个实例。

    selectChannel实现

    private Random random = new Random();
    
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //对每条记录,随机选择下游operator的某个Channel
        return random.nextInt(numberOfChannels);
    }

    API使用

    dataStream
        .setParallelism(2)
        // 采用SHUFFLE分区策略重分区
        .shuffle()
        .print()
        .setParallelism(4);

    RebalancePartitioner: DataStream => DataStream

    RebalancePartitioner,REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。

    selectChannel实现

    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //第一条记录,输出到下游的第一个Channel;第二条记录,输出到下游的第二个Channel...如此循环
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    API使用

    dataStream
            .setParallelism(2)
            // 采用REBALANCE分区策略重分区
            .rebalance()
            .print()
            .setParallelism(4);

    RescalePartitioner: DataStream => DataStream

    RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

    selectChannel实现

    private int nextChannelToSendTo = -1;
    
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用RESCALE分区策略重分区
        .rescale()
        .print()
        .setParallelism(4);

    BroadcastPartitioner: DataStream => DataStream

    BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //广播分区不支持选择Channel,因为会输出到下游每个Channel中
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }
    
    @Override
    public boolean isBroadcast() {
        //启用广播模式,此时Channel选择器会选择下游所有Channel
        return true;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用BROADCAST分区策略重分区
        .broadcast()
        .print()
        .setParallelism(4);

    ForwardPartitioner

    ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用FORWARD分区策略重分区
        .forward()
        .print()
        .setParallelism(2);

    KeyGroupStreamPartitioner(HASH方式):

    KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用HASH分区策略重分区
        .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0)
        .print()
        .setParallelism(4);

    CustomPartitionerWrapper

    CustomPartitionerWrapper,CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。

    selectChannel实现

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;
    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
        return partitioner.partition(key, numberOfChannels);
    }

    自定义分区器将指定的Key分到指定的分区

    // 自定义分区器,将不同的Key(用户ID)分到指定的分区
    // key: 根据key的值来分区
    // numPartitions: 下游算子并行度
    static class CustomPartitioner implements Partitioner<String> {
          @Override
          public int partition(String key, int numPartitions) {
              switch (key){
                  case "user_1":
                      return 0;
                  case "user_2":
                      return 1;
                  case "user_3":
                      return 2;
                  default:
                      return 3;
              }
          }
      }

    使用自定义分区器

    dataStream
        .setParallelism(2)
        // 采用CUSTOM分区策略重分区
        .partitionCustom(new CustomPartitioner(),0)
        .print()
        .setParallelism(4);

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13767324.html

  • 相关阅读:
    119. Pascal's Triangle II
    118. Pascal's Triangle
    112. Path Sum
    111. Minimum Depth of Binary Tree
    110. Balanced Binary Tree
    108. Convert Sorted Array to Binary Search Tree
    88. Merge Sorted Array
    83. Remove Duplicates from Sorted List
    70. Climbing Stairs
    陌陌面试经历
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13767324.html
Copyright © 2011-2022 走看看