zoukankan      html  css  js  c++  java
  • Flink常用的API

    Source

    基于集合

    1610677034977

    /**
     * @author WGR
     * @create 2021/9/3 -- 13:38
     */
    public class SourceTest1 {
    
        //注:并行度默认取的是CPU的线程,我电脑为8核16线程。
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setParallelism(2);
            DataStream<SensorReading> dataStream = environment.fromCollection(Arrays.asList(
                    new SensorReading("sensor_1", 1547718199L, 35.8),
                    new SensorReading("sensor_2", 1547718201L, 15.4),
                    new SensorReading("sensor_3", 1547718202L, 6.7),
                    new SensorReading("sensor_4", 1547718205L, 8.1),
                    new SensorReading("sensor_5", 1547718199L, 3.8),
                    new SensorReading("sensor_6", 1547718201L, 15.4),
                    new SensorReading("sensor_7", 1547718202L, 63.7),
                    new SensorReading("sensor_6", 1547718201L, 15.4),
                    new SensorReading("sensor_7", 1547718202L, 63.7),
                    new SensorReading("sensor_8", 1547718205L, 312.1))
            );
            DataStream<Integer> integerDataStream = environment.fromElements(1, 2, 4, 5, 6,7,8,9,10,11,12,13,14,15,16,21,22,32);
    
            dataStream.print("data");
            integerDataStream.print("int");
    
            environment.execute();
    
    //        int:15> 1
    //        int:4> 7
    //        int:15> 22
    //        int:8> 11
    //        int:9> 12
    //        int:14> 21
    //        int:7> 10
    //        data:13> SensorReading{id='sensor_5', timestamp=1547718199, temperature=3.8}
    //        data:12> SensorReading{id='sensor_4', timestamp=1547718205, temperature=8.1}
    //        int:2> 5
    //        int:16> 2
    //        int:3> 6
    //        int:6> 9
    //        int:5> 8
    //        int:16> 32
    //        data:1> SensorReading{id='sensor_7', timestamp=1547718202, temperature=63.7}
    //        int:1> 4
    //        data:10> SensorReading{id='sensor_2', timestamp=1547718201, temperature=15.4}
    //        data:11> SensorReading{id='sensor_3', timestamp=1547718202, temperature=6.7}
    //        data:9> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        data:16> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        data:2> SensorReading{id='sensor_8', timestamp=1547718205, temperature=312.1}
    //        int:11> 14
    //        int:13> 16
    //        data:14> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        int:10> 13
    //        int:12> 15
    //        data:15> SensorReading{id='sensor_7', timestamp=1547718202, temperature=63.7}
    
            //如果将并行度设置为2,则效果如下图。
    
    //int:2> 2
    //data:2> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //data:1> SensorReading{id='sensor_2', timestamp=1547718201, temperature=15.4}
    //data:2> SensorReading{id='sensor_3', timestamp=1547718202, temperature=6.7}
    //int:1> 1
    //data:1> SensorReading{id='sensor_4', timestamp=1547718205, temperature=8.1}
    //data:2> SensorReading{id='sensor_5', timestamp=1547718199, temperature=3.8}
    //int:2> 5
    //int:2> 7
    //int:2> 9
    //int:1> 4
    //int:2> 11
    //int:1> 6
    //int:2> 13
    //int:1> 8
    //int:2> 15
    //int:1> 10
    //data:1> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //int:2> 21
    //data:1> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //data:2> SensorReading{id='sensor_7', timestamp=1547718202, temperature=63.7}
    //int:1> 12
    //int:1> 14
    //data:2> SensorReading{id='sensor_7', timestamp=1547718202, temperature=63.7}
    //int:2> 32
    //data:1> SensorReading{id='sensor_8', timestamp=1547718205, temperature=312.1}
    //int:1> 16
    //int:1> 22
    //
    
    
        }
    }
    

    基于文件

    1610677361516

    /**
     * @author WGR
     * @create 2021/9/8 -- 10:06
     */
    public class SourceTest2 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStreamSource = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
            dataStreamSource.print();
            env.execute();
    //        13> sensor_1,1547718212,37.1
    //        1> sensor_1,1547718199,35.8
    //        7> sensor_10,1547718205,38.1
    //        9> sensor_1,1547718207,36.3
    //        5> sensor_7,1547718202,6.7
    //        11> sensor_1,1547718209,32.8
    //        3> sensor_6,1547718201,15.4
        }
    }
    
    

    基于Socket

    1610677810363

    /**
     * @author WGR
     * @create 2021/9/8 -- 10:16
     */
    public class SourceTest3 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> dataStream = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] arr = value.split(" ");
                    for (String word : arr) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
    
            result.print();
            env.execute();
    //        5> (hello,1)
    //        9> (world,1)
    //        3> (java,1)
    //        5> (hello,2)
    //        9> (world,2)
    //        5> (hello,3)
    //        1> (kafka,1)
    //        5> (hello,4)
    //        13> (flink,1)
        }
    }
    

    image-20210908102630689

    基于kafka

    /**
     * @author WGR
     * @create 2021/9/8 -- 10:44
     */
    public class SourceTest4 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.1.146:9092");
            properties.put("group.id","group.demo");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
            properties.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
            properties.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
            properties.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
    
            DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("dalianpai", new SimpleStringSchema(), properties));
            dataStream.print();
            env.execute();
    
        }
    }
    
    image-20210908135351725

    基于自定义Source

    1610678751806

    /**
     * @author WGR
     * @create 2021/9/8 -- 11:02
     */
    public class SourceTest5 {
    
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
            DataStream<SensorReading> dataStream = env.addSource( new MySensorSource() );
    
            // 打印输出
            dataStream.print();
    
            env.execute();
        }
    
    
        public static class MySensorSource implements SourceFunction<SensorReading>{
    
            // 定义一个标识位,用来控制数据的产生
            private boolean running = true;
    
            @Override
            public void run(SourceContext<SensorReading> ctx) throws Exception {
                // 定义一个随机数发生器
                Random random = new Random();
    
                // 设置10个传感器的初始温度
                HashMap<String, Double> sensorTempMap = new HashMap<>();
                for( int i = 0; i < 10; i++ ){
                    sensorTempMap.put("sensor_" + (i+1), 60 + random.nextGaussian() * 20);
                }
    
                while (running){
                    for( String sensorId: sensorTempMap.keySet() ){
                        // 在当前温度基础上随机波动
                        Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                        sensorTempMap.put(sensorId, newtemp);
                        ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newtemp));
                    }
                    // 控制输出频率
                    Thread.sleep(1000L);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    

    Transformation

    map/flatMap/Filter

    map:将函数作用在集合中的每一个元素上,并返回作用后的结果

    image-20210908164129895

    flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

    image-20210908164339308

    filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

    image-20210908164320837
    /**
     * @author WGR
     * @create 2021/9/8 -- 14:10
     */
    public class TransformTest1_Base {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> lines = env.socketTextStream("192.168.1.180", 9998);
    
            DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String s, Collector<String> collector) throws Exception {
                    String[] arr = s.split(" ");
                    for (String str : arr) {
                        collector.collect(str);
                    }
                }
            });
    
            DataStream<String> filted = words.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return !value.equals("TMD");//如果是TMD则返回false表示过滤掉
                }
            });
    
            DataStream<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
    
            result.print();
            env.execute();
    
        }
    
    }
    
    

    image-20210908144618967

    image-20210908144011783

    KeyBy

    image-20210908144900745

    DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

    /**
     * @author WGR
     * @create 2021/9/8 -- 14:50
     */
    public class TransformTest2 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            } );
    
            // 分组
            KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
            KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(data -> data.getId());
    
            DataStream<Long> dataStream1 = env.fromElements(1L, 34L, 4L, 657L, 23L);
            KeyedStream<Long, Integer> keyedStream2 = dataStream1.keyBy(new KeySelector<Long, Integer>() {
                @Override
                public Integer getKey(Long value) throws Exception {
                    return value.intValue() % 2;
                }
            });
            keyedStream.print("key");
            keyedStream1.print("key1");
            keyedStream2.sum(0).print("key2");
            env.execute();
    
    //        key2:3> 1
    //        key2:3> 34
    //        key2:3> 38
    //        key2:3> 658
    //        key2:3> 681
    //        key:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        key:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        key1:3> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        key1:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        key1:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        key:3> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        key1:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        key:3> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        key1:3> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        key:3> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        key1:3> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        key:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        key1:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        key:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    
        }
    }
    

    滚动聚合算子(Rolling Aggregation)

    /**
     * @author WGR
     * @create 2021/9/8 -- 15:02
     */
    public class TransformTest3 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            } );
    
            // 分组
            KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
    
            // 滚动聚合,取当前最大的温度值
            DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
    
            resultStream.print("result");
    
            env.execute();
    
    //        result:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        result:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        result:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        result:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        result:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        result:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        result:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
        }
    }
    

    Reduce

    reduce:对集合中的元素进行聚合

    image-20210908164408052
    /**
     * @author WGR
     * @create 2021/9/8 -- 15:14
     */
    public class TransformTest4 {
    
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            // 转换成SensorReading类型
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
    
            // 分组
            KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
    
            // reduce聚合,取最大的温度值,以及当前最新的时间戳
            SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
                @Override
                public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                    return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
                }
            });
    
            resultStream.print();
            env.execute();
    
    //        SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        SensorReading{id='sensor_1', timestamp=1547718209, temperature=36.3}
    //        SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
        }
    }
    

    Split 和 Select /Connect 和 CoMap /Union

    Split就是将一个流分成多个流

    Select就是获取分流后对应的数据

    注意:split函数已过期并移除

    union:

    union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

    image-20210908164458589

    connect:

    connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

    connect只能连接两个数据流,union可以连接多个数据流。

    connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

    两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

    image-20210908164630763
    /**
     * @author WGR
     * @create 2021/9/8 -- 15:24
     */
    public class TransformTest5 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            // 转换成SensorReading
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            } );
    
            //进行分流操作
            SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
    
                @Override
                public Iterable<String> select(SensorReading value) {
                    return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
                }
            });
    
            DataStream<SensorReading> highTempStream = splitStream.select("high");
            DataStream<SensorReading> lowTempStream = splitStream.select("low");
            DataStream<SensorReading> allTempStream = splitStream.select("high", "low");
    
            highTempStream.print("high");
            lowTempStream.print("low");
            allTempStream.print("all");
    
            // 2. 合流 connect,将高温流转换成二元组类型,与低温流连接合并之后,输出状态信息
            DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> map(SensorReading value) throws Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
            });
    
            ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
    
            DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
    
                @Override
                public Object map1(Tuple2<String, Double> value) throws Exception {
                    return new Tuple3<>(value.f0, value.f1, "high temp warning");
                }
    
                @Override
                public Object map2(SensorReading value) throws Exception {
                    return new Tuple2<>(value.getId(), "normal");
                }
            });
    
            resultStream.print("connect");
            DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
            union.print("union");
    
            env.execute();
    
    //        high> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        all> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        low> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        all> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        low> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        all> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        high> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        all> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        high> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        all> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        high> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        all> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        high> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        all> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        union> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        union> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        union> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        union> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        union> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        union> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        union> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        union> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    //        union> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    //        union> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    //        union> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    //        union> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    //        union> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    //        union> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    //        connect> (sensor_1,35.8,high temp warning)
    //        connect> (sensor_6,normal)
    //        connect> (sensor_10,38.1,high temp warning)
    //        connect> (sensor_7,normal)
    //        connect> (sensor_1,36.3,high temp warning)
    //        connect> (sensor_1,32.8,high temp warning)
    //        connect> (sensor_1,37.1,high temp warning)
    
    
        }
    }
    

    rebalance重平衡分区

    类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜。Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

    image-20210908164716854

    所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

    image-20210908164756964
    /**
     * @author WGR
     * @create 2021/9/8 -- 16:52
     */
    public class TransformTest6 {
        public static void main(String[] args) throws Exception {
            //TODO 0.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //TODO 1.source
            DataStream<Long> longDS = env.generateSequence(1L,100L);
            //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
            DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
                @Override
                public boolean filter(Long num) throws Exception {
                    return num > 10;
                }
            });
    
            //TODO 2.transformation
            //没有经过rebalance有可能出现数据倾斜
            SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS
                    .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                        @Override
                        public Tuple2<Integer, Integer> map(Long value) throws Exception {
                            int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
                            return Tuple2.of(subTaskId, 1);
                        }
                        //按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
                    }).keyBy(t -> t.f0).sum(1);
    
            //调用了rebalance解决了数据倾斜
            SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
                    .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                        @Override
                        public Tuple2<Integer, Integer> map(Long value) throws Exception {
                            int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
                            return Tuple2.of(subTaskId, 1);
                        }
                        //按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
                    }).keyBy(t -> t.f0).sum(1);
    
    
            //TODO 3.sink
             result1.print("result1");
             result2.print("result2");
    
    
            //TODO 4.execute
            env.execute();
        }
    }
    

    自定义富函数

    富函数”是 DataStream API 提供的一个函数类的接口, 所有 Flink 函数类都有其 Rich 版本。 它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
    ⚫ RichMapFunction
    ⚫ RichFlatMapFunction
    ⚫ RichFilterFunction
    ⚫ …
    Rich Function 有一个生命周期的概念。 典型的生命周期方法有:
    ⚫ open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
    ⚫ close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
    ⚫ getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态

    /**
     * @author WGR
     * @create 2021/9/8 -- 17:12
     */
    public class TransformTest7 {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            // 转换成SensorReading类型
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
    
            DataStream<Tuple2<String, Integer>> resultStream = dataStream.map( new MyMapper() );
    
            resultStream.print();
    
            env.execute();
    
    //        open
    //        open
    //        open
    //         open
    //        1> (sensor_1,0)
    //        4> (sensor_7,3)
    //        2> (sensor_1,1)
    //        3> (sensor_1,2)
    //        1> (sensor_1,0)
    //        4> (sensor_10,3)
    //        close
    //        close
    //        3> (sensor_6,2)
    //        close
    //        close
        }
    
        // 实现自定义富函数类
        public static class MyMapper extends RichMapFunction<SensorReading, Tuple2<String, Integer>> {
            @Override
            public Tuple2<String, Integer> map(SensorReading value) throws Exception {
    //            getRuntimeContext().getState();
                return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
            }
    
            @Override
            public void open(Configuration parameters) throws Exception {
                // 初始化工作,一般是定义状态,或者建立数据库连接
                System.out.println("open");
            }
    
            @Override
            public void close() throws Exception {
                // 一般是关闭连接和清空状态的收尾操作
                System.out.println("close");
            }
        }
    }
    

    其他分区

    image-20210908171922429
    /**
     * @author WGR
     * @create 2021/9/8 -- 17:21
     */
    public class TransformTest8 {
    
        public static void main(String[] args) throws Exception {
            //TODO 0.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            //TODO 1.source
            DataStream<String> linesDS = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\hello.txt");
            SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            //TODO 2.transformation
            DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
            DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
            DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
            DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
            DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
            DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
            DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new MyPartitioner(), t -> t.f0);
    
    
            //TODO 3.sink
            result1.print("global");
            result2.print("broadcast");
            result3.print("forward");
            result4.print("shuffle");
            result5.print("rebalance");
            result6.print("rescale");
            result7.print("partitionCustom");
    
    
            //TODO 4.execute
            env.execute();
    
    //        forward:2> (and,1)
    //        forward:3> (hello,1)
    //        forward:1> (how,1)
    //        forward:4> (hello,1)
    //        forward:3> (world,1)
    //        forward:2> (you,1)
    //        forward:1> (are,1)
    //        forward:4> (spark,1)
    //        forward:1> (you,1)
    //        forward:3> (hello,1)
    //        broadcast:4> (and,1)
    //        global:1> (and,1)
    //        broadcast:2> (and,1)
    //        partitionCustom:1> (and,1)
    //        rebalance:1> (and,1)
    //        shuffle:3> (and,1)
    //        rebalance:2> (you,1)
    //        rescale:2> (and,1)
    //        broadcast:1> (and,1)
    //        broadcast:3> (and,1)
    //        broadcast:4> (you,1)
    //        broadcast:2> (you,1)
    //        forward:4> (hello,1)
    //        broadcast:3> (you,1)
    //        partitionCustom:1> (you,1)
    //        broadcast:1> (you,1)
    //        forward:3> (flink,1)
    //        forward:1> (fine,1)
    //        forward:4> (scala,1)
    //        rescale:2> (you,1)
    //        global:1> (you,1)
    //        shuffle:3> (you,1)
    //        forward:1> (thank,1)
    //        broadcast:1> (hello,1)
    //        broadcast:2> (hello,1)
    //        broadcast:2> (spark,1)
    //        rebalance:2> (flink,1)
    //        broadcast:1> (spark,1)
    //        rescale:4> (hello,1)
    //        shuffle:3> (hello,1)
    //        broadcast:4> (hello,1)
    //        broadcast:3> (hello,1)
    //        forward:1> (you,1)
    //        broadcast:3> (spark,1)
    //        broadcast:4> (spark,1)
    //        shuffle:3> (world,1)
    //        partitionCustom:1> (hello,1)
    //        rescale:4> (spark,1)
    //        rebalance:4> (world,1)
    //        broadcast:1> (hello,1)
    //        rebalance:4> (scala,1)
    //        rebalance:2> (spark,1)
    //        shuffle:2> (thank,1)
    //        rebalance:4> (how,1)
    //        rebalance:1> (hello,1)
    //        rebalance:1> (hello,1)
    //        rebalance:1> (are,1)
    //        global:1> (hello,1)
    //        rebalance:3> (hello,1)
    //        broadcast:2> (hello,1)
    //        rescale:3> (hello,1)
    //        shuffle:4> (hello,1)
    //        shuffle:4> (flink,1)
    //        rebalance:3> (hello,1)
    //        rescale:3> (world,1)
    //        broadcast:2> (scala,1)
    //        global:1> (spark,1)
    //        broadcast:2> (hello,1)
    //        rebalance:1> (you,1)
    //        rebalance:2> (you,1)
    //        rebalance:3> (fine,1)
    //        rescale:1> (how,1)
    //        rescale:1> (are,1)
    //        rescale:1> (you,1)
    //        rescale:1> (fine,1)
    //        rescale:1> (thank,1)
    //        rescale:1> (you,1)
    //        rebalance:4> (thank,1)
    //        broadcast:1> (scala,1)
    //        broadcast:1> (hello,1)
    //        broadcast:1> (world,1)
    //        broadcast:1> (hello,1)
    //        broadcast:1> (flink,1)
    //        rescale:4> (hello,1)
    //        broadcast:1> (how,1)
    //        broadcast:1> (are,1)
    //        broadcast:1> (you,1)
    //        broadcast:1> (fine,1)
    //        broadcast:1> (thank,1)
    //        broadcast:1> (you,1)
    //        partitionCustom:1> (world,1)
    //        partitionCustom:1> (hello,1)
    //        partitionCustom:1> (flink,1)
    //        partitionCustom:1> (hello,1)
    //        partitionCustom:1> (spark,1)
    //        partitionCustom:1> (hello,1)
    //        partitionCustom:1> (scala,1)
    //        shuffle:3> (hello,1)
    //        broadcast:3> (hello,1)
    //        partitionCustom:1> (how,1)
    //        partitionCustom:1> (are,1)
    //        partitionCustom:1> (you,1)
    //        partitionCustom:1> (fine,1)
    //        partitionCustom:1> (thank,1)
    //        partitionCustom:1> (you,1)
    //        broadcast:4> (hello,1)
    //        broadcast:4> (scala,1)
    //        shuffle:3> (spark,1)
    //        broadcast:3> (scala,1)
    //        rescale:4> (scala,1)
    //        shuffle:3> (how,1)
    //        shuffle:3> (are,1)
    //        broadcast:2> (world,1)
    //        broadcast:2> (hello,1)
    //        global:1> (hello,1)
    //        rescale:3> (hello,1)
    //        shuffle:4> (hello,1)
    //        rescale:3> (flink,1)
    //        shuffle:4> (scala,1)
    //        global:1> (scala,1)
    //        global:1> (hello,1)
    //        global:1> (world,1)
    //        global:1> (hello,1)
    //        global:1> (flink,1)
    //        global:1> (how,1)
    //        global:1> (are,1)
    //        broadcast:3> (hello,1)
    //        broadcast:2> (flink,1)
    //        broadcast:4> (hello,1)
    //        broadcast:3> (world,1)
    //        broadcast:3> (hello,1)
    //        global:1> (you,1)
    //        shuffle:4> (you,1)
    //        global:1> (fine,1)
    //        broadcast:3> (flink,1)
    //        broadcast:4> (world,1)
    //        broadcast:2> (how,1)
    //        broadcast:4> (hello,1)
    //        global:1> (thank,1)
    //        shuffle:4> (fine,1)
    //        shuffle:4> (you,1)
    //        global:1> (you,1)
    //        broadcast:2> (are,1)
    //        broadcast:2> (you,1)
    //        broadcast:2> (fine,1)
    //        broadcast:2> (thank,1)
    //        broadcast:2> (you,1)
    //        broadcast:4> (flink,1)
    //        broadcast:3> (how,1)
    //        broadcast:3> (are,1)
    //        broadcast:3> (you,1)
    //        broadcast:3> (fine,1)
    //        broadcast:3> (thank,1)
    //        broadcast:3> (you,1)
    //        broadcast:4> (how,1)
    //        broadcast:4> (are,1)
    //        broadcast:4> (you,1)
    //        broadcast:4> (fine,1)
    //        broadcast:4> (thank,1)
    //        broadcast:4> (you,1)
    
        }
    
        public static class MyPartitioner implements Partitioner<String> {
            @Override
            public int partition(String key, int numPartitions) {
                return 0;
            }
        }
    }
    

    Kafka

    /**
     * @author WGR
     * @create 2021/9/9 -- 14:50
     */
    public class SinkTest1_Kafka {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    
            Properties properties = new Properties();
            properties.put("bootstrap.servers","192.168.1.146:9092");
            properties.put("group.id","group.demo");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("auto.offset.reset", "latest");
            properties.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
            properties.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
            properties.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
    
            // 从文件读取数据
            DataStream<String> inputStream = env.addSource( new FlinkKafkaConsumer011<String>("dalianpai", new SimpleStringSchema(), properties));
    
            // 转换成SensorReading类型
            DataStream<String> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
            });
    
            dataStream.addSink( new FlinkKafkaProducer011<String>("192.168.1.146:9092", "wgr", new SimpleStringSchema()));
    
            env.execute();
        }
    }
    

    image-20210909145513345

    image-20210909145529412

    Redis

    /**
     * @author WGR
     * @create 2021/9/9 -- 14:59
     */
    public class SinkTest2_Redis {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            // 转换成SensorReading
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            } );
    
            FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("192.168.1.146").setPort(6379).build();
    
            dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
    
            env.execute();
    
        }
    
        // 自定义RedisMapper
        public static class MyRedisMapper implements RedisMapper<SensorReading> {
    
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
            }
    
            @Override
            public String getKeyFromData(SensorReading data) {
                return data.getId();
            }
    
            @Override
            public String getValueFromData(SensorReading data) {
                return data.getTemperature().toString();
            }
        }
    }
    
    

    image-20210909150600924

    Elasticsearch

    /**
     * @author WGR
     * @create 2021/9/9 -- 15:38
     */
    public class SinkTest3_Es {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 从文件读取数据
            DataStream<String> inputStream = env.readTextFile("D:\IdeaSpace\flink-sgg\src\main\resources\sensor.txt");
    
            // 转换成SensorReading
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            } );
    
            // 定义es的连接配置
            ArrayList<HttpHost> httpHosts = new ArrayList<>();
            httpHosts.add(new HttpHost("192.168.1.146", 9200));
    
            dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
    
            env.execute();
    
    
        }
    
        public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
    
            @Override
            public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                HashMap<String, String> dataSource = new HashMap<>();
                dataSource.put("id",sensorReading.getId());
                dataSource.put("temp",sensorReading.getTemperature().toString());
                dataSource.put("ts",sensorReading.getTimestamp().toString());
    
                IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("sensor").source(dataSource);
    
                requestIndexer.add(indexRequest);
            }
        }
    }
    

    image-20210909162315755

    Mysql

    /**
     * @author WGR
     * @create 2021/9/9 -- 16:25
     */
    public class SinkTest4_Jdbc {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
    
            dataStream.addSink(new MyJdbcSink());
    
            env.execute();
        }
    
    
    
        // 实现自定义的SinkFunction
        public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
            // 声明连接和预编译语句
            Connection connection = null;
            PreparedStatement insertStmt = null;
            PreparedStatement updateStmt = null;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection("jdbc:mysql://192.168.1.146:3306/test", "root", "root");
                insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
                updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
            }
    
            // 每来一条数据,调用连接,执行sql
            @Override
            public void invoke(SensorReading value, Context context) throws Exception {
                // 直接执行更新语句,如果没有更新那么就插入
                updateStmt.setDouble(1, value.getTemperature());
                updateStmt.setString(2, value.getId());
                updateStmt.execute();
                if( updateStmt.getUpdateCount() == 0 ){
                    insertStmt.setString(1, value.getId());
                    insertStmt.setDouble(2, value.getTemperature());
                    insertStmt.execute();
                }
            }
    
            @Override
            public void close() throws Exception {
                insertStmt.close();
                updateStmt.close();
                connection.close();
            }
        }
    
        // 实现自定义的SourceFunction
        public static class MySensorSource implements SourceFunction<SensorReading> {
            // 定义一个标识位,用来控制数据的产生
            private boolean running = true;
    
            @Override
            public void run(SourceContext<SensorReading> ctx) throws Exception {
                // 定义一个随机数发生器
                Random random = new Random();
    
                // 设置10个传感器的初始温度
                HashMap<String, Double> sensorTempMap = new HashMap<>();
                for( int i = 0; i < 10; i++ ){
                    sensorTempMap.put("sensor_" + (i+1), 60 + random.nextGaussian() * 20);
                }
    
                while (running){
                    for( String sensorId: sensorTempMap.keySet() ){
                        // 在当前温度基础上随机波动
                        Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                        sensorTempMap.put(sensorId, newtemp);
                        ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newtemp));
                    }
                    // 控制输出频率
                    Thread.sleep(1000L);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
    image-20210909163600782
  • 相关阅读:
    (转).NET Compact Framework使用P/Invoke服务
    C#编码好习惯
    有些东西必须时刻放在心上!
    我是这样的人吗?是!!!!!!!!!
    经济学家张五常教大家四招读书的方法 (转)
    #在宏中的某些用法(转)
    牛人太强了,我该怎么努力呀?
    利用增强限制条件来求解问题
    努力呀!即将面临的deadline
    volume visualization reserach时刻记在心的要点
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15247670.html
Copyright © 2011-2022 走看看