zoukankan      html  css  js  c++  java
  • Flink入门

    /* 
    *CoGroup 
    */ 
    
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
            
            DataStream<Tuple2<String, String>> dataStream1 = streamExecutionEnvironment.socketTextStream("127.0.0.1", 9000) 
                    .map(new MapFunction<String, Tuple2<String, String>>() { 
                        @Override 
                        public Tuple2<String, String> map(String s) throws Exception { 
                            List<String> strs = Arrays.asList(s.split(" ")); 
                            return new Tuple2<>(strs.get(0), strs.get(1)); 
                        } 
                    }); 
    
            DataStream<Tuple2<String, String>> dataStream2 = streamExecutionEnvironment.socketTextStream("127.0.0.1", 9001) 
                    .map(new MapFunction<String, Tuple2<String, String>>() { 
                        @Override 
                        public Tuple2<String, String> map(String s) throws Exception { 
                            List<String> strs = Arrays.asList(s.split(" ")); 
                            return new Tuple2<>(strs.get(0), strs.get(1)); 
                        } 
                    }); 
    
            dataStream1.coGroup(dataStream2) 
                    .where(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .equalTo(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) 
                    .trigger(CountTrigger.of(1)) 
                    .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() { 
                        @Override 
                        public void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple2<String, String>> second, Collector<String> out) throws Exception { 
                            StringBuilder stringBuilder = new StringBuilder("Data Stream1: 
    "); 
                            first.forEach(item -> stringBuilder.append(item.f0 + "<=>" + item.f1 + "
    ")); 
    
                            stringBuilder.append("Data Stream2: 
    "); 
                            second.forEach(item -> stringBuilder.append(item.f0 + "<=>" + item.f1 + "
    ")); 
                            out.collect(stringBuilder.toString()); 
                        } 
                    }).print(); 
    
            streamExecutionEnvironment.execute(); 
    
    /** 
    * Join 
    * Join条件为两个流中的数据((String, String))的第一个元素相同 
    */ 
    final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); 
    
            DataStream<Tuple2<String, String>> dataStream1 = streamExecutionEnvironment.fromElements(new Tuple2<>("hello", "flink")); 
    
            DataStream<Tuple2<String, String>> dataStream2 = streamExecutionEnvironment.fromElements(new Tuple2<>("hello", "blink")); 
    
            dataStream1.join(dataStream2) 
                    .where(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .equalTo(new KeySelector<Tuple2<String, String>, String>() { 
                        @Override 
                        public String getKey(Tuple2<String, String> stringStringTuple2) throws Exception { 
                            return stringStringTuple2.f0; 
                        } 
                    }) 
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) 
                    .trigger(CountTrigger.of(1)) 
                    .apply(new JoinFunction<Tuple2<String, String>, Tuple2<String, String>, String>() { 
                        @Override 
                        public String join(Tuple2<String, String> first, Tuple2<String, String> second) throws Exception { 
                            return first.f1 + "<=>" + second.f1; 
                        } 
                    }).print(); 
            streamExecutionEnvironment.execute(); 
    
    // 运行结果 
    2> flink<=>blink 
    
    
  • 相关阅读:
    Python数据可视化——散点图
    [ffmpeg 扩展第三方库编译系列] 关于 mingw32 下编译libcaca
    独立python环境之virtualenv和virtualenvwrapper
    深入理解maven及应用(一):生命周期和插件
    Android页面事件挂接模拟
    第六课 Struts的视图组件
    wxWidgets笔记_1_linux环境下wxwidgets的安装与配置
    使用 gradle 在编译时动态设置 Android resValue / BuildConfig / Manifes中&lt;meta-data&gt;变量的值
    ubuntu 下安装eclipse &amp;java环境配置
    [Swift]LeetCode695. 岛屿的最大面积 | Max Area of Island
  • 原文地址:https://www.cnblogs.com/fangpengchengbupter/p/11903626.html
Copyright © 2011-2022 走看看