zoukankan      html  css  js  c++  java
  • Flink入门

    /* 
    *CoGroup 
    */ 
    
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
            
            DataStream<Tuple2<String, String>> dataStream1 = streamExecutionEnvironment.socketTextStream("127.0.0.1", 9000) 
                    .map(new MapFunction<String, Tuple2<String, String>>() { 
                        @Override 
                        public Tuple2<String, String> map(String s) throws Exception { 
                            List<String> strs = Arrays.asList(s.split(" ")); 
                            return new Tuple2<>(strs.get(0), strs.get(1)); 
                        } 
                    }); 
    
            DataStream<Tuple2<String, String>> dataStream2 = streamExecutionEnvironment.socketTextStream("127.0.0.1", 9001) 
                    .map(new MapFunction<String, Tuple2<String, String>>() { 
                        @Override 
                        public Tuple2<String, String> map(String s) throws Exception { 
                            List<String> strs = Arrays.asList(s.split(" ")); 
                            return new Tuple2<>(strs.get(0), strs.get(1)); 
                        } 
                    }); 
    
            dataStream1.coGroup(dataStream2) 
                    .where(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .equalTo(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) 
                    .trigger(CountTrigger.of(1)) 
                    .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() { 
                        @Override 
                        public void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple2<String, String>> second, Collector<String> out) throws Exception { 
                            StringBuilder stringBuilder = new StringBuilder("Data Stream1: 
    "); 
                            first.forEach(item -> stringBuilder.append(item.f0 + "<=>" + item.f1 + "
    ")); 
    
                            stringBuilder.append("Data Stream2: 
    "); 
                            second.forEach(item -> stringBuilder.append(item.f0 + "<=>" + item.f1 + "
    ")); 
                            out.collect(stringBuilder.toString()); 
                        } 
                    }).print(); 
    
            streamExecutionEnvironment.execute(); 
    
    /** 
    * Join 
    * Join条件为两个流中的数据((String, String))的第一个元素相同 
    */ 
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
            DataStream<Tuple2<String, String>> dataStream1 = streamExecutionEnvironment.fromElements(new Tuple2<>("hello", "flink")); 
    
            DataStream<Tuple2<String, String>> dataStream2 = streamExecutionEnvironment.fromElements(new Tuple2<>("hello", "blink")); 
    
            dataStream1.join(dataStream2) 
                    .where(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .equalTo(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) 
                    .trigger(CountTrigger.of(1)) 
                    .apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, String>() { 
                        @Override 
                        public String join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception { 
                            return first.f1 + "<=>" + second.f1; 
                        } 
                    }).print(); 
            streamExecutionEnvironment.execute(); 
    
    // 运行结果 
    2> flink<=>blink 
    
    
  • 相关阅读:
    【题解】Red-Blue Graph Codeforces 1288F 上下界费用流
    【题解】The Magician HDU 6565 大模拟
    HAOI2018游记
    【题解】【THUSC 2016】成绩单 LOJ 2292 区间dp
    【题解】【雅礼集训 2017 Day5】远行 LOJ 6038 LCT
    【题解】Catering World Finals 2015 上下界费用流
    《无问西东...》
    为了世界的和平~一起上caioj~~~!
    新征程~起航!
    bzoj4240: 有趣的家庭菜园(树状数组+贪心思想)
  • 原文地址:https://www.cnblogs.com/fangpengchengbupter/p/11903626.html
Copyright © 2011-2022 走看看