zoukankan      html  css  js  c++  java
  • Flink

     

    对于DataStream,可以选择如下的Strategy,

    /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are broadcasted to every parallel instance of the next operation.
         *
         * @return The DataStream with broadcast partitioning set.
         */
        public DataStream<T> broadcast() {
            return setConnectionType(new BroadcastPartitioner<T>());
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are shuffled uniformly randomly to the next operation.
         *
         * @return The DataStream with shuffle partitioning set.
         */
        @PublicEvolving
        public DataStream<T> shuffle() {
            return setConnectionType(new ShufflePartitioner<T>());
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are forwarded to the local subtask of the next operation.
         *
         * @return The DataStream with forward partitioning set.
         */
        public DataStream<T> forward() {
            return setConnectionType(new ForwardPartitioner<T>());
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are distributed evenly to instances of the next operation in a round-robin
         * fashion.
         *
         * @return The DataStream with rebalance partitioning set.
         */
        public DataStream<T> rebalance() {
            return setConnectionType(new RebalancePartitioner<T>());
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output elements
         * are distributed evenly to a subset of instances of the next operation in a round-robin
         * fashion.
         *
         * <p>The subset of downstream operations to which the upstream operation sends
         * elements depends on the degree of parallelism of both the upstream and downstream operation.
         * For example, if the upstream operation has parallelism 2 and the downstream operation
         * has parallelism 4, then one upstream operation would distribute elements to two
         * downstream operations while the other upstream operation would distribute to the other
         * two downstream operations. If, on the other hand, the downstream operation has parallelism
         * 2 while the upstream operation has parallelism 4 then two upstream operations will
         * distribute to one downstream operation while the other two upstream operations will
         * distribute to the other downstream operations.
         *
         * <p>In cases where the different parallelisms are not multiples of each other one or several
         * downstream operations will have a differing number of inputs from upstream operations.
         *
         * @return The DataStream with rescale partitioning set.
         */
        @PublicEvolving
        public DataStream<T> rescale() {
            return setConnectionType(new RescalePartitioner<T>());
        }
    
        /**
         * Sets the partitioning of the {@link DataStream} so that the output values
         * all go to the first instance of the next processing operator. Use this
         * setting with care since it might cause a serious performance bottleneck
         * in the application.
         *
         * @return The DataStream with shuffle partitioning set.
         */
        @PublicEvolving
        public DataStream<T> global() {
            return setConnectionType(new GlobalPartitioner<T>());
        }

     

    逻辑都是由Partitoner来实现的,

    BroadcastPartitioner

    public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        int[] returnArray;
        boolean set;
        int setNumber;
    
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {
            if (set && setNumber == numberOfOutputChannels) {
                return returnArray;
            } else {
                this.returnArray = new int[numberOfOutputChannels];
                for (int i = 0; i < numberOfOutputChannels; i++) {
                    returnArray[i] = i;
                }
                set = true;
                setNumber = numberOfOutputChannels;
                return returnArray;
            }
        }

    int[] returnArray, 数组,select的channel id

    broadcast,要发到所有channel,所以returnArray要包含所有的channel id

     

    ShufflePartitioner,随机选一个channel

    public class ShufflePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private Random random = new Random();
    
        private int[] returnArray = new int[1];
    
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {
            returnArray[0] = random.nextInt(numberOfOutputChannels);
            return returnArray;
        }

     

    ForwardPartitioner,对于forward,应该只有一个输出channel,所以就选第一个channel就可以

    public class ForwardPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private int[] returnArray = new int[] {0};
    
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
            return returnArray;
        }

     

    RebalancePartitioner,就是roundrobin,循环选择

    public class RebalancePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private int[] returnArray = new int[] {-1};
    
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {
            this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
            return this.returnArray;
        }

     

    GlobalPartitioner,默认选第一个

    public class GlobalPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
    
        private int[] returnArray = new int[] { 0 };
    
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {
            return returnArray;
        }

     

    在RecordWriter中,emit会调用selectChannels来选取channel

        public void emit(T record) throws IOException, InterruptedException {
            for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
                sendToTarget(record, targetChannel);
            }
        }
  • 相关阅读:
    c#序列化和反序列化list
    centos7安装oracle 11gr2
    centos7系统备份与还原
    FAT32和NTFS最大支持的单个文件大小分别是多大?
    linux挂载远程windows服务器上的ISO,给内网的服务器安装软件
    tomcat8启动慢原因及解决办法
    CentOS统的7个运行级别的含义
    对称加密和分组加密中的四种模式(ECB、CBC、CFB、OFB)
    空侃简介
    linux环境部署
  • 原文地址:https://www.cnblogs.com/fxjwind/p/6707741.html
Copyright © 2011-2022 走看看