场景:需把输入数据源按照需要进行拆分,比如把用户访问日志按照访问者的地理位置进行拆分
分流的方法
通常来说针对不同的场景,有以下三种办法进行流的拆分。
Filter 分流
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<String,String>> data = new ArrayList<>(); data.add(new Tuple2<>("shanghai","url-1")); data.add(new Tuple2<>("shanghai","url-3")); data.add(new Tuple2<>("hangzhou","url-2")); DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data); SingleOutputStreamOperator<Tuple2<String, String>> shanghaiStream = source.filter(value -> value.f0.equals("shanghai")); SingleOutputStreamOperator<Tuple2<String, String>> hangzhouStream = source.filter(value -> value.f0.equals("hangzhou")); // shanghaiStream.print(); hangzhouStream.print(); env.execute(); }
Split 分流
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<String,String>> data = new ArrayList<>(); data.add(new Tuple2<>("shanghai","url-1")); data.add(new Tuple2<>("shanghai","url-3")); data.add(new Tuple2<>("hangzhou","url-2")); DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data); SplitStream<Tuple2<String, String>> splitStream = source.split(new OutputSelector<Tuple2<String, String>>() { @Override public Iterable<String> select(Tuple2<String, String> value) { List<String> tags = new ArrayList<>(); if (value.f0.equals("shanghai")) { tags.add("shanghaiStream"); } else { tags.add("hangzhouStream"); } return tags; } }); // splitStream.select("shanghaiStream").print(); splitStream.select("hangzhouStream").print(); env.execute(); }
注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 shanghaiStream和 hangzhouStream流再次调用 split 切分,控制台会抛出以下异常。
Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
Side OutPut 分流
SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
-
定义 OutputTag
-
调用特定函数进行数据拆分,包括如下:
-
CoProcessFunction
-
KeyedCoProcessFunction
-
ProcessAllWindowFunction
例子:使用 ProcessFunction 执行SideOutPut:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<String, String>> data = new ArrayList<>(); data.add(new Tuple2<>("shanghai", "url-1")); data.add(new Tuple2<>("shanghai", "url-3")); data.add(new Tuple2<>("hangzhou", "url-2")); DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data); //定义OutputTag,类型是你实际返回的数据类型 final OutputTag<Tuple2<String, String>> shanghaiStream = new OutputTag<Tuple2<String, String>>("shanghaiStream") {}; final OutputTag<Tuple2<String, String>> hangzhouStream = new OutputTag<Tuple2<String, String>>("hangzhouStream") {}; //通过ProcessFunction执行分流 SingleOutputStreamOperator<Tuple2<String, String>> processStream = source.process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() { @Override public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { if (value.f0.equals("shanghai")) { ctx.output(shanghaiStream, value); } else { ctx.output(hangzhouStream, value); } } }); //获取对应的分流数据 DataStream<Tuple2<String, String>> shanghaiSideOutput = processStream.getSideOutput(shanghaiStream); DataStream<Tuple2<String, String>> hangzhouSideOutput = processStream.getSideOutput(hangzhouStream); //打印 shanghaiSideOutput.print(); // hangzhouSideOutput.print(); env.execute(); }
注意:Side OutPut 方式拆分流是可以多次进行拆分的