zoukankan      html  css  js  c++  java
  • FLINK基础(135):DS算子与窗口(25)Flink DataStream 八大分区策略与自定义分区器(1)

    分区策略决定了一条数据如何发送给下游。Flink中默认提供了八大分区策略(也叫分区器)。

    本文基于Flink 1.9.0总结Flink DataStream中的八大分区策略以及手动实现一个自定义分区器。

    八大分区策略继承关系图

    八大分区策略继承关系图

    • ChannelSelector: 接口,决定将记录写入哪个Channel。有3个方法:
    1. void setup(int numberOfChannels): 初始化输出Channel的数量。
    2. int selectChannel(T record): 根据当前记录以及Channel总数,决定应将记录写入下游哪个Channel。八大分区策略的区别主要在这个方法的实现上。
    3. boolean isBroadcast(): 是否是广播模式。决定了是否将记录写入下游所有Channel
    • StreamPartitioner:抽象类,也是所有流分区器GlobalPartitioner,ShufflePartitioner,RebalancePartitioner,RescalePartitioner,BroadcastPartitioner,ForwardPartitioner,KeyGroupStreamPartitioner,CustomPartitioner的基类。

    注意:

    1. 这里以及下边提到的Channel可简单理解为下游Operator的某个实例。
    2. Flink 中改变并行度,默认RebalancePartitioner分区策略。
    3. 分区策略,可在Flink WebUI上直观看出,如REBALANCE,即使用了RebalancePartitioner分区策略;SHUFFLE,即使用了ShufflePartitioner分区策略。

    GlobalPartitioner: DataStream => DataStream

    GlobalPartitioner,GLOBAL分区。将记录输出到下游Operator的第一个实例。

    selectChannel实现

    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
            //对每条记录,只选择下游operator的第一个Channel
            return 0;
    }

    API使用

    dataStream
        .setParallelism(2)
        // 采用GLOBAL分区策略重分区
        .global()
        .print()
        .setParallelism(1);

    ShufflePartitioner: DataStream => DataStream

    ShufflePartitionerSHUFFLE分区。将记录随机输出到下游Operator的每个实例。

    selectChannel实现

    private Random random = new Random();
    
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //对每条记录,随机选择下游operator的某个Channel
        return random.nextInt(numberOfChannels);
    }

    API使用

    dataStream
        .setParallelism(2)
        // 采用SHUFFLE分区策略重分区
        .shuffle()
        .print()
        .setParallelism(4);

    RebalancePartitioner: DataStream => DataStream

    RebalancePartitioner,REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。

    selectChannel实现

    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //第一条记录,输出到下游的第一个Channel;第二条记录,输出到下游的第二个Channel...如此循环
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

    API使用

    dataStream
            .setParallelism(2)
            // 采用REBALANCE分区策略重分区
            .rebalance()
            .print()
            .setParallelism(4);

    RescalePartitioner: DataStream => DataStream

    RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

    selectChannel实现

    private int nextChannelToSendTo = -1;
    
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用RESCALE分区策略重分区
        .rescale()
        .print()
        .setParallelism(4);

    BroadcastPartitioner: DataStream => DataStream

    BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        //广播分区不支持选择Channel,因为会输出到下游每个Channel中
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }
    
    @Override
    public boolean isBroadcast() {
        //启用广播模式,此时Channel选择器会选择下游所有Channel
        return true;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用BROADCAST分区策略重分区
        .broadcast()
        .print()
        .setParallelism(4);

    ForwardPartitioner

    ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用FORWARD分区策略重分区
        .forward()
        .print()
        .setParallelism(2);

    KeyGroupStreamPartitioner(HASH方式):

    KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。

    selectChannel实现

    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
    
    // KeyGroupRangeAssignment中的方法
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    API示例

    dataStream
        .setParallelism(2)
        // 采用HASH分区策略重分区
        .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0)
        .print()
        .setParallelism(4);

    CustomPartitionerWrapper

    CustomPartitionerWrapper,CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。

    selectChannel实现

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;
    public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        this.partitioner = partitioner;
        this.keySelector = keySelector;
    }
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }
        return partitioner.partition(key, numberOfChannels);
    }

    自定义分区器将指定的Key分到指定的分区

    // 自定义分区器,将不同的Key(用户ID)分到指定的分区
    // key: 根据key的值来分区
    // numPartitions: 下游算子并行度
    static class CustomPartitioner implements Partitioner<String> {
          @Override
          public int partition(String key, int numPartitions) {
              switch (key){
                  case "user_1":
                      return 0;
                  case "user_2":
                      return 1;
                  case "user_3":
                      return 2;
                  default:
                      return 3;
              }
          }
      }

    使用自定义分区器

    dataStream
        .setParallelism(2)
        // 采用CUSTOM分区策略重分区
        .partitionCustom(new CustomPartitioner(),0)
        .print()
        .setParallelism(4);

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13767324.html

  • 相关阅读:
    vue / js使用video获取视频时长
    vuex的使用
    eclipse中没有tomcat小猫
    Windows下安装Redis服务
    postman上传文件对参数的contentType类型设置方式
    !与&&优先级的问题
    备份数据库中的某个表的数据报错Statement violates GTID consistency
    Error 'Cannot add or update a child row: a foreign key constraint fails故障解决
    解除项目与其他服务的强依赖
    常见的几种异常类型 Exception
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13767324.html
Copyright © 2011-2022 走看看