zoukankan      html  css  js  c++  java
  • flink 使用processFunction函数的sideOutPut实现filter操作(java版)

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    /**
     * # _*_ coding:utf-8 _*_
     * # Author:xiaoshubiao
     * # Time : 2020/12/14 19:14
     **/
    public class processFunction_test {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> localhost = executionEnvironment.socketTextStream("localhost", 1111);
            // 输入a,1这样的数据,通过ProcessFunction的sideOutPut实现filter的操作
            // 区分是否属于a
            SingleOutputStreamOperator<Tuple2<String, Integer>> map = localhost.map(
                    new MapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> map(String s) throws Exception {
                            String[] split = s.split(",");
                            return new Tuple2<>(split[0], Integer.valueOf(split[1]));
                        }
                    }
            );
    
            SingleOutputStreamOperator<String> process = map.process(
                    /*
                    *参数一:输入数据类型。比如(a,1)
                    * 参数二:输出数据类型。比如(属于a)
                    * */
                    new ProcessFunction<Tuple2<String, Integer>, String>() {
                        @Override
                        public void processElement(Tuple2<String, Integer> stringIntegerTuple2, Context context, Collector<String> collector) throws Exception {
    
                            if (stringIntegerTuple2.f0.equals("a")) {
                                // 直接返回
                                collector.collect("属于a");
                            } else {
                                // 通过上下文输出,定义输出标签和值
                                context.output(new OutputTag<String>("is_not_a"){}, "不属于a");
                            }
                        }
                    }
            );
            process.print("is_a");
            process.getSideOutput(new OutputTag<String>("is_not_a"){}).print("is_not_a");
    
            executionEnvironment.execute();
        }
    }
    /*
    * 输入      输出
    * a,1       属于a
    * b,1       不属于a
    * c,1       不属于a
    *
    *
    * */
  • 相关阅读:
    Salesforce Javascript(四) 展开语法 ...
    salesforce零基础学习(一百零二)Limitation篇之 CPU Limit
    salesforce lightning零基础学习(十七) 实现上传 Excel解析其内容
    php orm的C扩展 ycdatabase
    php ice框架
    不错的PHP扩展
    php的各种序列化对比
    ubuntu 使用tree打印树形结构
    文件写入速度对比
    服务器cli模式下的定时器
  • 原文地址:https://www.cnblogs.com/7749ha/p/14135327.html
Copyright © 2011-2022 走看看