对于DataStream,可以选择如下的Strategy,
/** * Sets the partitioning of the {@link DataStream} so that the output elements * are broadcasted to every parallel instance of the next operation. * * @return The DataStream with broadcast partitioning set. */ public DataStream<T> broadcast() { return setConnectionType(new BroadcastPartitioner<T>()); } /** * Sets the partitioning of the {@link DataStream} so that the output elements * are shuffled uniformly randomly to the next operation. * * @return The DataStream with shuffle partitioning set. */ @PublicEvolving public DataStream<T> shuffle() { return setConnectionType(new ShufflePartitioner<T>()); } /** * Sets the partitioning of the {@link DataStream} so that the output elements * are forwarded to the local subtask of the next operation. * * @return The DataStream with forward partitioning set. */ public DataStream<T> forward() { return setConnectionType(new ForwardPartitioner<T>()); } /** * Sets the partitioning of the {@link DataStream} so that the output elements * are distributed evenly to instances of the next operation in a round-robin * fashion. * * @return The DataStream with rebalance partitioning set. */ public DataStream<T> rebalance() { return setConnectionType(new RebalancePartitioner<T>()); } /** * Sets the partitioning of the {@link DataStream} so that the output elements * are distributed evenly to a subset of instances of the next operation in a round-robin * fashion. * * <p>The subset of downstream operations to which the upstream operation sends * elements depends on the degree of parallelism of both the upstream and downstream operation. * For example, if the upstream operation has parallelism 2 and the downstream operation * has parallelism 4, then one upstream operation would distribute elements to two * downstream operations while the other upstream operation would distribute to the other * two downstream operations. If, on the other hand, the downstream operation has parallelism * 2 while the upstream operation has parallelism 4 then two upstream operations will * distribute to one downstream operation while the other two upstream operations will * distribute to the other downstream operations. * * <p>In cases where the different parallelisms are not multiples of each other one or several * downstream operations will have a differing number of inputs from upstream operations. * * @return The DataStream with rescale partitioning set. */ @PublicEvolving public DataStream<T> rescale() { return setConnectionType(new RescalePartitioner<T>()); } /** * Sets the partitioning of the {@link DataStream} so that the output values * all go to the first instance of the next processing operator. Use this * setting with care since it might cause a serious performance bottleneck * in the application. * * @return The DataStream with shuffle partitioning set. */ @PublicEvolving public DataStream<T> global() { return setConnectionType(new GlobalPartitioner<T>()); }
逻辑都是由Partitoner来实现的,
BroadcastPartitioner
public class BroadcastPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; int[] returnArray; boolean set; int setNumber; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { if (set && setNumber == numberOfOutputChannels) { return returnArray; } else { this.returnArray = new int[numberOfOutputChannels]; for (int i = 0; i < numberOfOutputChannels; i++) { returnArray[i] = i; } set = true; setNumber = numberOfOutputChannels; return returnArray; } }
int[] returnArray, 数组,select的channel id
broadcast,要发到所有channel,所以returnArray要包含所有的channel id
ShufflePartitioner,随机选一个channel
public class ShufflePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private Random random = new Random(); private int[] returnArray = new int[1]; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { returnArray[0] = random.nextInt(numberOfOutputChannels); return returnArray; }
ForwardPartitioner,对于forward,应该只有一个输出channel,所以就选第一个channel就可以
public class ForwardPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int[] returnArray = new int[] {0}; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { return returnArray; }
RebalancePartitioner,就是roundrobin,循环选择
public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int[] returnArray = new int[] {-1}; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; return this.returnArray; }
GlobalPartitioner,默认选第一个
public class GlobalPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int[] returnArray = new int[] { 0 }; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { return returnArray; }
在RecordWriter中,emit会调用selectChannels来选取channel
public void emit(T record) throws IOException, InterruptedException { for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); } }