zoukankan      html  css  js  c++  java
  • Flink(三)【核心编程】

    和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,当前Java版本,从开发步骤的角度来讲,主要分为四大部分

    一.Environment

    批处理

    // 批处理环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    

    流处理

    // 流式数据处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

    二.Source

    从集合读取数据

    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink01_Source_Collection {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Integer> list = Arrays.asList(1, 2, 3, 4);
            // 1.Source:读取数据
            DataStreamSource<Integer> listDS = env.fromCollection(list);
            // 2.打印
            listDS.print();
            // 3.执行
            env.execute();
        }
    }
    

    从文件读取数据

    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink01_Source_File {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.Source:读取数据
            DataStreamSource<String> fileDS = env.readTextFile("D:\SoftWare\idea-2019.2.3\wordspace\13_flinkdemo\input");
            // 2.打印
            fileDS.print();
            // 3.执行
            env.execute();
        }
    }
    

    从kakfa读取数据(常用)

    官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

    依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.10.0</version>
    </dependency>
    

    注意:根据官网说明的flink和kafka版本的适配关系,选用对应得依赖。

    java代码

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    import java.util.Properties;
    
    /**
     * @description: 从kafka读取数据
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink03_Source_Kafka {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.Source:读取数据
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "hadoop102:9092");
            props.setProperty("group.id", "consumer-group");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("auto.offset.reset", "latest");
    
            DataStreamSource<String> kafkaDS = env.addSource(
                    new FlinkKafkaConsumer011<>(
                            "flink-test",
                            new SimpleStringSchema(),
                            props));
            // 2.打印
            kafkaDS.print();
            // 3.执行
            env.execute();
        }
    }
    

    自定义数据源

    可以用来造数据,测试程序

    pojo

    public class WaterSensor {
    
        private String id;
        private Long ts;
        private Integer vc;
        ......
    }
    

    Java代码

    import com.flink.bean.WaterSensor;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import java.util.Random;
    
    /**
     * @description: 自定义数据源
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink04_Source_CustomMySource {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.Source:读取数据
            DataStreamSource<WaterSensor> mySourceDS = env.addSource(new MySourceFuntion());
            // 2.打印
            mySourceDS.print();
            // 3.执行
            env.execute();
        }
    
        /**
         * 自定义数据源
         * 1. 实现 SourceFunction,指定输出的类型
         * 2. 重写 两个方法:  run() ,cancel()
         */
        private static class MySourceFuntion implements SourceFunction<WaterSensor> {
            // 定义一个标志位,控制数据的产生
            private Boolean flag = true;
    
            @Override
            public void run(SourceContext<WaterSensor> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    //构建bean对象
                    WaterSensor waterSensor = new WaterSensor(
                            "sensor" + random.nextInt(3),
                            System.currentTimeMillis(),
                            random.nextInt(10) + 40
                    );
                    Thread.sleep(2000);
                    ctx.collect(waterSensor);
                }
            }
    
            @Override
            public void cancel() {
                this.flag = false;
            }
        }
    }
    

    三.Transform

    map

    映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

    参数:Scala匿名函数或MapFunction

    返回:DataStream

    需求:将以下日志转为WaterSensor实体类

    sensor-data.log

    sensor_1,1549044122,1
    sensor_1,1549044123,2
    sensor_1,1549044124,3
    sensor_2,1549044125,4
    

    WaterSensor.java

    public class WaterSensor {
    
        private String id;
        private Long ts;
        private Integer vc;
        ......
    }
    

    Java代码

    import com.flink.bean.WaterSensor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink05_Transform_Map {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.Source:读取数据
            DataStreamSource<String> fileDS = env.readTextFile("D:\SoftWare\idea-2019.2.3\wordspace\13_flinkdemo\input\sensor-data.log");
            // 2.Transform: Map转换为实体类
            SingleOutputStreamOperator<WaterSensor> mapDS = fileDS.map(new MyMapFunction());
            // 3.打印
            mapDS.print();
            // 4.执行
            env.execute();
        }
    
        /**
         * 实现MapFunction,指定输入的类型,返回的类型
         * 重写 map方法
         */
        private static class MyMapFunction implements MapFunction<String, WaterSensor> {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] data = s.split(",");
                return new WaterSensor(
                        data[0],
                        Long.valueOf(data[1]),
                        Integer.valueOf(data[2])
    
                );
            }
        }
    }
    

    Rich版本函数

    Flink函数类都有其Rich版本,它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction

    import com.flink.bean.WaterSensor;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink06_Transform_RichMap {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 全局并行度设为 1
            env.setParallelism(1);
            // 1.Source:读取数据
            DataStreamSource<String> fileDS = env.readTextFile("D:\SoftWare\idea-2019.2.3\wordspace\13_flinkdemo\input\sensor-data.log");
            // 2.Transform : RichMap实现转换
            SingleOutputStreamOperator<WaterSensor> richMapDS = fileDS.map(new MyRichMap());
            // 3.打印
            richMapDS.print();
            // 4.执行
            env.execute();
        }
    
        /**
         * 继承 RichMapFunction,指定输入的类型,返回的类型
         * 提供了 open()和 close() 生命周期管理方法
         * 能够获取 运行时上下文对象 =》 可以获取 状态、任务信息 等环境信息
         */
        private static class MyRichMap extends RichMapFunction<String, WaterSensor> {
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open~~~");
            }
    
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] data = s.split(",");
                RuntimeContext context = getRuntimeContext();
                String taskName = context.getTaskName();
                return new WaterSensor(
                        "taskName:"+taskName+"||"+data[0],
                        Long.valueOf(data[1]),
                        Integer.valueOf(data[2])
                );
            }
    
            @Override
            public void close() throws Exception {
                System.out.println("open~~~");
            }
        }
    }
    

    Rich Function有一个生命周期的概念。典型的生命周期方法有:

    1. open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用

    2. close()方法是生命周期中的最后一个调用的方法,做一些清理工作

    3. getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行 的并行度,任务的名字,以及state状态

    flatMap

    扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素

    参数:Scala匿名函数或FlatMapFunction

    返回:DataStream

    需求:将集合中的集合数据乘以10.

    list = {{1,2,3,4},{6,7,8,9}}
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink07_Transform_flatMap {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.Source:读取数据
            DataStreamSource<List<Integer>> listDS = env.fromCollection(
                    Arrays.asList(
                            Arrays.asList(1, 2, 3, 4),
                            Arrays.asList(5, 6, 7, 8)
                    ));
            // 2.Transform: 扁平化
            SingleOutputStreamOperator<Integer> flaMapDS = listDS.flatMap(
                    new FlatMapFunction<List<Integer>, Integer>() {
                        @Override
                        public void flatMap(List<Integer> integers, Collector<Integer> collector) throws Exception {
                            for (Integer integer : integers) {
                                collector.collect(integer * 10);
                            }
                        }
                    }
            );
            // 3.打印
            flaMapDS.print();
            // 4.执行
            env.execute();
        }
    }
    

    keyBy

    分流:根据指定的Key的hashcode将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的

    参数:Scala匿名函数或POJO属性或元组索引,不能使用数组

    注意

    1.建议实现 KeySelector的方式指定分组的key。位置索引 或 字段名称 ,返回 Key的类型,无法确定返回 Tuple,使用麻烦

    2.和spark处理不一样,Spark是直接聚合成list集合(key,List(value1,value2...)),Flink是给元素打标签,相同标签是同一组

    返回:KeyedStream

    java代码

    import com.flink.bean.WaterSensor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink08_Transform_KeyBy {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            // 1.Source:读取数据
            DataStreamSource<String> fileDS = env.readTextFile("D:\SoftWare\idea-2019.2.3\wordspace\13_flinkdemo\input\sensor-data.log");
            fileDS.print("File:");
            // 2.Transform: Map转换为实体类
            SingleOutputStreamOperator<WaterSensor> waterSensorDS = fileDS.map(
                    new MapFunction<String, WaterSensor>() {
                        @Override
                        public WaterSensor map(String s) throws Exception {
                            String[] data = s.split(",");
                            return new WaterSensor(
                                    data[0],
                                    Long.valueOf(data[1]),
                                    Integer.valueOf(data[2])
                            );
                        }
                    }
            );
    
            // 3.分组
            // 通过 位置索引 或 字段名称 ,返回 Key的类型,无法确定,所以会返回 Tuple,后续使用key的时候,很麻烦
            // 通过 明确的指定 key 的方式, 获取到的 key就是具体的类型 => 实现 KeySelector 或 lambda
            KeyedStream<WaterSensor, String> waterSensorGroupDS = waterSensorDS.keyBy(
                    new KeySelector<WaterSensor, String>() {
                        @Override
                        public String getKey(WaterSensor waterSensor) throws Exception {
                            return waterSensor.getId();
                        }
                    }
            );
            // 4.打印
            waterSensorGroupDS.print("KeyBy");
            // 5.执行
            env.execute();
        }
    }
    

    注意

    通过 位置索引 或 字段名称 ,返回 Key的类型,无法确定,所以会返回 Tuple,后续使用key的时候,很麻烦
    通过 明确的指定 key 的方式, 获取到的 key就是具体的类型 => 实现 KeySelector 或 lambda

    filter

    过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

    参数:Scala匿名函数或FilterFunction

    返回:DataStream

    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.Arrays;
    
    /**
     * @description:
     * @author: HaoWu
     * @create: 2020年09月16日
     */
    public class Flink09_Transform_Filter {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 1.Source:读取数据
            DataStreamSource<Integer> listDS = env.fromCollection(
                    Arrays.asList(1, 2, 3, 4, 5, 6, 7));
            // 2.过滤:只保留偶数
            SingleOutputStreamOperator<Integer> filterDS = listDS.filter(
                    new FilterFunction<Integer>() {
                        @Override
                        public boolean filter(Integer integer) throws Exception {
                            return integer % 2 == 0;
                        }
                    }
            );
            // 2.打印
            filterDS.print();
            // 3.执行
            env.execute();
        }
    }
    

    shuffle

    打乱重组(洗牌):将数据按照均匀分布打散到下游

    参数:无

    返回:DataStream

    split + select

    split:根据数据特征给数据打标签; select:根据标签取出目标数据

    import com.flink.bean.WaterSensor;
    import org.apache.flink.streaming.api.collector.selector.OutputSelector;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.datastream.SplitStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.Arrays;
    
    /**
     * @description: TODO Split + Select
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink11_Tranform_Split {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 全局并行度设为 1
            env.setParallelism(2);
            // 1.Source:读取数据
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.readTextFile("13_flinkdemo/input/sensor-data.log")
                    .map(new Flink05_Transform_Map.MyMapFunction());
            SplitStream<WaterSensor> splitDS = sensorDS
                    // TODO Split: 水位低于 50 正常,水位 [50,80) 警告, 水位高于 80 告警
                    .split(
                            new OutputSelector<WaterSensor>() {
                                @Override
                                public Iterable<String> select(WaterSensor value) {
                                    if (value.getVc() < 40) {
                                        return Arrays.asList("normal");
                                    } else if (value.getVc() < 60) {
                                        return Arrays.asList("warn");
                                    } else {
                                        return Arrays.asList("alert");
                                    }
                                }
                            });
            //取出 normal
            splitDS.select("normal").print("normal");
            //取出 warn
            splitDS.select("warn").print("warn");
            //取出 alert
            splitDS.select("alert").print("alert");
    
            // 3.执行
            env.execute();
        }
    }
    

    connect

    可以不同类型流连接,同床异梦; 只能两条流,数据分开处理

    将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。

    Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

    java代码

    import com.flink.bean.WaterSensor;
    import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    import java.util.Arrays;
    
    /**
     * @description: TODO Connect:可以不同类型流连接,同床异梦;  只能两条流,数据分开处理
     *
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink12_Tranform_Connect {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 全局并行度设为 1
            env.setParallelism(2);
            // 第1条流
            SingleOutputStreamOperator<WaterSensor> sensorDS = env.readTextFile("13_flinkdemo/input/sensor-data.log")
                    .map(new Flink05_Transform_Map.MyMapFunction());
            // 第2条流
            DataStreamSource<Integer> listDS = env.fromCollection(Arrays.asList(1, 2, 3, 4, 2, 5, 6, 7));
    
            // TODO Connect
            ConnectedStreams<WaterSensor, Integer> connectDS = sensorDS.connect(listDS);
            // 做map转换 ,实现CoMapFunction
            SingleOutputStreamOperator<Object> resultDS = connectDS.map(new CoMapFunction<WaterSensor, Integer, Object>() {
                @Override
                public Object map1(WaterSensor value) throws Exception {
                    return value.toString();
                }
    
                @Override
                public Object map2(Integer value) throws Exception {
                    return value + 10;
                }
            });
            // 打印
            resultDS.print();
    
            // 3.执行
            env.execute();
        }
    }
    

    union

    对两个或者两个以上的流合为一体,流类型要相同,返回一个DataStream包含所有元素。

    /**
     * @description: TODO:1.流类型必须相同,合为一体 ;2.可以连接多个流,数据一起处理
     *
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink13_Tranform_Union {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 全局并行度设为 1
            env.setParallelism(2);
            // 第1条流
            DataStreamSource<Integer> listDS1 = env.fromCollection(Arrays.asList(1, 2, 3, 4));
            // 第2条流
            DataStreamSource<Integer> listDS2 = env.fromCollection(Arrays.asList( 5, 6, 7,8));
            // 第3条流
            DataStreamSource<Integer> listDS3 = env.fromCollection(Arrays.asList( 5, 6, 7,8));
    
            // TODO Union
            DataStream<Integer> resultDS = listDS1.union(listDS2, listDS3);
            // 打印
            resultDS.print();
            // 3.执行
            env.execute();
        }
    }
    

    connect 和 union 的区别

    1)  union之前两个流的类型必须是一样,connect可以不一样
    2)  connect只能操作两个流,union可以操作多个。
    

    四.Operator

    keyBy对数据进行分流后,可以对数据进行相应的统计分析

    滚动聚合算子(Rolling Aggregation): sum、max、min

    KeyedStream的每一个支流做聚合, 然后将聚合结果合并成1个DataStream

    对每个key分组后,组内的聚合,然后将聚合结果返回。

    注意:非key和非聚合的字段,对分组和聚合不影响

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * @description: TODO 对KeyStream每个流单独聚合,将聚合结果合并,返回DataSream
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink14_Tranform_RollingAgg {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取数据
            env.socketTextStream("hadoop102", 9999)
                    // 转换:string -> Tuple3(id,ts,vc)
                    .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                        @Override
                        public Tuple3<String, Long, Integer> map(String s) throws Exception {
                            String[] datas = s.split(",");
                            return new Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                        }
                    })
                    // 按照id分组
                    .keyBy(tuple3 -> tuple3.f0)
                    //求和:sum
                    //最大值:max
                    //求最小值
                    .min(2).print(); // 打印
            
            // 2.执行
            env.execute();
        }
    }
    

    测试

    输入数据

    [root@hadoop102 ~]$ nc -lk 9999
    sensor_1,1549044122,10
    sensor_1,1549044122,5     
    sensor_2,1549044123,20                  
    sensor_1,1549044126,50
    sensor_1,1549044128,70
    sensor_1,1549044126111,2
    

    实时输出

    5> (sensor_1,1549044122,10)
    5> (sensor_1,1549044122,5)
    2> (sensor_2,1549044123,20)
    5> (sensor_1,1549044122,5)
    5> (sensor_1,1549044122,5)
    5> (sensor_1,1549044122,2)
    

    reduce

    分组后,对组内的数据两两做归约操作

    注意:每组的第一个元素不进行reduce操作,直接输出,当第二个元素到来才会归约。

    /**
     * @description: TODO 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink15_Tranform_Reduce {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取数据
            env.socketTextStream("hadoop102", 9999)
                    // 转换:string -> Tuple3(id,ts,vc)
                    .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                        @Override
                        public Tuple3<String, Long, Integer> map(String s) throws Exception {
                            String[] datas = s.split(",");
                            return Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                        }
                    })
                    // 按照id分组
                    .keyBy(tuple3 -> tuple3.f0)
                    // 分组内聚合
                    .reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
                                @Override
                                public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> t0, 
                                                                            Tuple3<String, Long, Integer> t1) throws Exception {
                                    return Tuple3.of(t0.f0, 123L, t0.f2 + t1.f2);
                                }
                            }
    
                    ).print();
    
            // 2.执行
            env.execute();
        }
    }
    

    process

    keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现

    /**
     * @description: TODO keyBy进行分流处理后,如果想要处理过程中获取环境相关信息
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink16_Tranform_Process {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取数据
            env.socketTextStream("hadoop102", 9999)
                    // 转换:string -> Tuple3(id,ts,vc)
                    .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                        @Override
                        public Tuple3<String, Long, Integer> map(String s) throws Exception {
                            String[] datas = s.split(",");
                            return Tuple3.of(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                        }
                    })
                    // 按照id分组
                    .keyBy(tuple3 -> tuple3.f0)
                    // 获取环境信息
                    .process(
                            new KeyedProcessFunction<String, Tuple3<String, Long, Integer>, String>() {
                                @Override
                                public void processElement(Tuple3<String, Long, Integer> value, Context ctx, Collector<String> out) throws Exception {
                                    String currentKey = ctx.getCurrentKey();
                                    Long timestamp = ctx.timestamp();
                                    out.collect("当前数据的key:"+currentKey+",当前的时间戳:"+timestamp+",数据:"+value.toString());
                                }
                            }
                    ).print();//打印
            // 2.执行
            env.execute();
        }
    }
    

    测试

    输入数据

    sensor_1,1549044122,5
    sensor_2,1549044123,20
    

    输出数据

    5> 当前数据的key:sensor_1,当前的时间戳:null,数据:(sensor_1,1549044122,5)
    2> 当前数据的key:sensor_2,当前的时间戳:null,数据:(sensor_2,1549044123,20)
    

    五.Sink

    Kafka Sink

    官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

    将处理完的数据发送到Kafka消息队列中

    依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>1.10.0</version>
    </dependency>
    

    注意:根据官网说明的flink和kafka版本的适配关系,选用对应得依赖。

    java代码

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
    
    /**
     * @description: 输出到 Kafka
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink17_Sink_Kafka {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取socket流
            env.socketTextStream("hadoop102", 9999)
                    //写入kafka
                    .addSink(new FlinkKafkaProducer011(
                            "hadoop102:9092,hadoop103:9092,hadoop104:9092",
                            "flink-test2",
                            new SimpleStringSchema())
                    );
            // 2.执行
            env.execute();
        }
    }
    

    Redis Sink

    处理完的数据发送到Redis缓存数据库

    依赖

            <!-- redis依赖 -->
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>
    

    java代码

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @description: 输出到 Redis
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink18_Sink_Redis {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取socket流
            env.socketTextStream("hadoop102", 9999)
                    //写入Redis
                    .addSink(new RedisSink<String>(
                            new FlinkJedisPoolConfig.Builder()
                                    .setHost("hadoop102")
                                    .setPort(6379)
                                    .build(),
                            new RedisMapper<String>() {
                                // redis 的命令: key是最外层的 key
                                @Override
                                public RedisCommandDescription getCommandDescription() {
                                    return new RedisCommandDescription(RedisCommand.HSET, "sensor0421");
                                }
    
                                // Hash类型:这个指定的是 hash 的key
                                @Override
                                public String getKeyFromData(String data) {
                                    String[] datas = data.split(",");
                                    return datas[1];
                                }
    
                                // Hash类型:这个指定的是 hash 的 value
                                @Override
                                public String getValueFromData(String data) {
                                    String[] datas = data.split(",");
                                    return datas[2];
                                }
                            }
                    ));
            // 2.执行
            env.execute();
        }
    }
    

    ElasticSearch Sink

    依赖

      <!-- ES依赖 -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
    

    java代码

    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @description: 输出到 ES
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink19_Sink_ES {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取socket流
            DataStreamSource<String> socktDS = env.readTextFile("13_flinkdemo/input/sensor-data.log");
            // 2.写入ES
            // 2.1创建ES Sink
            ElasticsearchSink<String> esBuildSink = new ElasticsearchSink.Builder<>(
                    Arrays.asList(
                            new HttpHost("hadoop102", 9200),
                            new HttpHost("hadoop103", 9200),
                            new HttpHost("hadoop104", 9200)
                    ),
                    new ElasticsearchSinkFunction<String>() {
                        @Override
                        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                            // 将数据放在Map中
                            Map<String, String> dataMap = new HashMap<>();
                            dataMap.put("data", element);
                            // 创建 IndexRequest =》 指定index,指定type,指定source
                            IndexRequest indexRequest = Requests.indexRequest("sensor").type("_doc").source(dataMap);
                            // 添加到 RequestIndexer
                            indexer.add(indexRequest);
                        }
                    }
            ).build();
            //写入ES
            socktDS.addSink(esBuildSink);
            // 2.执行
            env.execute();
        }
    }
    

    自定义Sink(写mysql)

    如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,怎么办?没事,Flink提供了自定义Sink,你自己决定如何进行存储。

    依赖

            <!-- mysql连接 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.27</version>
            </dependency>
    

    java代码

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    /**
     * @description: 自定义Sink输出到Mysql
     * @author: HaoWu
     * @create: 2020年09月18日
     */
    public class Flink20_Sink_CustomMySQLSink {
        public static void main(String[] args) throws Exception {
            // 0.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 1.读取数据
            DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
    
            // 2.写入Mysql
            inputDS.addSink(new RichSinkFunction<String>() {
    
                                private Connection conn = null;
                                private PreparedStatement pstmt = null;
    
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "root");
                                    pstmt = conn.prepareStatement("INSERT INTO sensor VALUES (?,?,?)");
                                }
    
                                /**
                                 * 关流
                                 * @throws Exception
                                 */
                                @Override
                                public void close() throws Exception {
                                    pstmt.close();
                                    conn.close();
                                }
    
                                /**
                                 * 填充占位符
                                 * @param value 输入数据
                                 * @param context 上下文
                                 * @throws Exception
                                 */
                                @Override
                                public void invoke(String value, Context context) throws Exception {
                                    String[] datas = value.split(",");
                                    pstmt.setString(1, datas[0]);
                                    pstmt.setLong(2, Long.valueOf(datas[1]));
                                    pstmt.setInt(3, Integer.valueOf(datas[2]));
                                    pstmt.execute();
                                }
                            }
            );
            // 3.执行
            env.execute();
        }
    }
    
  • 相关阅读:
    Version
    Windows Server Protocols (WSPP)
    Tomcat启动问题jvm访问拒绝的解决方法
    vue3中使用draggable插件实现元素的拖拽,排序,克隆
    消息队列的思考
    jenkins_ssh
    jenkins_构建配置
    minikube清理sh
    stream源码导读
    源码rabbit_3_消息链路追踪
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13693422.html
Copyright © 2011-2022 走看看