zoukankan      html  css  js  c++  java
  • Flink Side Outputs 分流

    官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/side_output.html#side-outputs

    场景:需把输入数据源按照需要进行拆分,比如把用户访问日志按照访问者的地理位置进行拆分

    分流的方法

    通常来说针对不同的场景,有以下三种办法进行流的拆分。

    Filter 分流

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Tuple2<String,String>> data = new ArrayList<>();
            data.add(new Tuple2<>("shanghai","url-1"));
            data.add(new Tuple2<>("shanghai","url-3"));
            data.add(new Tuple2<>("hangzhou","url-2"));
            DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data);
            SingleOutputStreamOperator<Tuple2<String, String>> shanghaiStream = source.filter(value -> value.f0.equals("shanghai"));
            SingleOutputStreamOperator<Tuple2<String, String>> hangzhouStream = source.filter(value -> value.f0.equals("hangzhou"));
    ​
    //        shanghaiStream.print();
            hangzhouStream.print();
            env.execute();
        }

    Split 分流

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Tuple2<String,String>> data = new ArrayList<>();
            data.add(new Tuple2<>("shanghai","url-1"));
            data.add(new Tuple2<>("shanghai","url-3"));
            data.add(new Tuple2<>("hangzhou","url-2"));
            DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data);
            SplitStream<Tuple2<String, String>> splitStream = source.split(new OutputSelector<Tuple2<String, String>>() {
                @Override
                public Iterable<String> select(Tuple2<String, String> value) {
                    List<String> tags = new ArrayList<>();
                    if (value.f0.equals("shanghai")) {
                        tags.add("shanghaiStream");
                    } else {
                        tags.add("hangzhouStream");
                    }
                    return tags;
                }
            });
    ​
    //        splitStream.select("shanghaiStream").print();
            splitStream.select("hangzhouStream").print();
    ​
            env.execute();
        }

    注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 shanghaiStream和 hangzhouStream流再次调用 split 切分,控制台会抛出以下异常。

    Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

    Side OutPut 分流

    SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:

    1. 定义 OutputTag

    2. 调用特定函数进行数据拆分,包括如下:

    例子:使用 ProcessFunction 执行SideOutPut:

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Tuple2<String, String>> data = new ArrayList<>();
            data.add(new Tuple2<>("shanghai", "url-1"));
            data.add(new Tuple2<>("shanghai", "url-3"));
            data.add(new Tuple2<>("hangzhou", "url-2"));
            DataStreamSource<Tuple2<String, String>> source = env.fromCollection(data);
    ​
            //定义OutputTag,类型是你实际返回的数据类型
            final OutputTag<Tuple2<String, String>> shanghaiStream = new OutputTag<Tuple2<String, String>>("shanghaiStream") {};
            final OutputTag<Tuple2<String, String>> hangzhouStream = new OutputTag<Tuple2<String, String>>("hangzhouStream") {};
    ​
            //通过ProcessFunction执行分流
            SingleOutputStreamOperator<Tuple2<String, String>> processStream = source.process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (value.f0.equals("shanghai")) {
                        ctx.output(shanghaiStream, value);
                    } else {
                        ctx.output(hangzhouStream, value);
                    }
                }
            });
    ​
            //获取对应的分流数据
            DataStream<Tuple2<String, String>> shanghaiSideOutput = processStream.getSideOutput(shanghaiStream);
            DataStream<Tuple2<String, String>> hangzhouSideOutput = processStream.getSideOutput(hangzhouStream);
    ​
            //打印
            shanghaiSideOutput.print();
    //        hangzhouSideOutput.print();
            env.execute();
        }

    注意:Side OutPut 方式拆分流是可以多次进行拆分的

  • 相关阅读:
    [OrangePi] Installation on SD Card
    网线直连笔记本玩树莓派
    vim多行缩进的方法
    对linux的根目录执行强制递归移除
    windows下快速启动 nginx 和 php-cgi 的两个批处理
    windows下nginx和php环境的配置
    c语言对文件操作完成后尽量手动关闭
    [记录]使用openGL显示点云的一个程序
    linux中使用软链接时出现 too many levels of symbolic links
    使用 nano 的时候提示找不到 libncursesw.so.5 这个共享库
  • 原文地址:https://www.cnblogs.com/zz-ksw/p/12974467.html
Copyright © 2011-2022 走看看