zoukankan      html  css  js  c++  java
  • flink state专项练习 踏雪扬尘

    简介

    flink state 毫不夸张的讲是 flink最核心的功能,个人理解是比 spark强大百倍的最关键实现,既然如此核心,所以接下来就进行专项练习以求彻底搞懂并且学习。

    案例执行

    测试代码

    public class TestKeyedStateMain {
        public static void main(String[] args) {
            //程序入口
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //数据源
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
                            Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
    // new AggregatingStateContainValue() 可以依据不同的测试, new 不同的对象
            SingleOutputStreamOperator<Tuple2<Long, String>> tuple2SingleOutputStreamOperator = dataStreamSource.keyBy(0).flatMap(new AggregatingStateContainValue());
            tuple2SingleOutputStreamOperator.print();
            try {
                env.execute("TestKeyedStateMain");
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }
    
    

    使用ValueState 计算窗口平均值

    public class CountWindowAverageWithValueState  extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    
        private ValueState<Tuple2<Long, Long>> countAndSum;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor<>(
                    "average",
                    Types.TUPLE(Types.LONG, Types.LONG)
            );
            countAndSum = getRuntimeContext().getState(average);
        }
    
        /**
         * state tuple  f1+count  f2 + value
         * @param value
         * @param out
         * @throws Exception
         */
        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
            Tuple2<Long, Long> valueStateTuple = countAndSum.value();
            if (valueStateTuple==null) {
                valueStateTuple = new Tuple2<>(0L, 0L);
            }
            valueStateTuple.f0 += 1;
            valueStateTuple.f1 += value.f1;
            countAndSum.update(valueStateTuple);
            if (valueStateTuple.f0>=3) {
                //每三个元素触发计算
                double v = valueStateTuple.f1 / (double) valueStateTuple.f0;
                out.collect(Tuple2.of(value.f0,v));
                //触发之后清除状态
                countAndSum.clear();
            }
        }
    }
    

    触发效果:

    使用 ListState 计算窗口平均值

    public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    
        private ListState<Tuple2<Long, Long>> elementsByKey;
    
    
        @Override
        public void open(Configuration parameters) throws Exception {
    
            ListStateDescriptor<Tuple2<Long, Long>> average = new ListStateDescriptor<>(
                    "average",
                    Types.TUPLE(Types.LONG, Types.LONG)
            );
            elementsByKey = getRuntimeContext().getListState(average);
        }
    
    
        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
            // list  state 直接 add tuple
            elementsByKey.add(value);
            List<Tuple2<Long, Long>> listStateResult = Lists.newArrayList(elementsByKey.get());
            if (listStateResult.size()>=3) {
                int size = listStateResult.size();
                Long aLong1 = listStateResult.stream().map(longLongTuple2 -> longLongTuple2.f1).reduce((aLong, aLong2) -> aLong + aLong2).get();
                double v = aLong1 / (double)size;
                out.collect(Tuple2.of(value.f0, v));
                // 清除  list state 状态
                elementsByKey.clear();
            }
        }
    }
    

    触发效果:

    使用 MapState 计算窗口平均值

    public class CountWindowAverageWithMapState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    
        private MapState<String, Long> mapState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            MapStateDescriptor<String, Long> descriptor =
                    new MapStateDescriptor<String, Long>(
                            "average",  // 状态的名字
                            String.class, Long.class); // 状态存储的数据类型
            mapState = getRuntimeContext().getMapState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
            mapState.put(UUID.randomUUID().toString(), value.f1);
            List<Long> longs = Lists.newArrayList(mapState.values());
            if (longs.size()>=3) {
                int size = longs.size();
                Long aLong1 = longs.stream().reduce((aLong, aLong2) -> aLong + aLong2).get();
                double  v = aLong1 / (double)size;
                out.collect(Tuple2.of(value.f0, v));
                mapState.clear();
            }
        }
    }
    
    

    触发效果:

    使用 AggregatingState 实现 字符串拼装游戏

    public class AggregatingStateContainValue extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
    
        /**
         * 1, contains:3 and 5
         */
        private AggregatingState<Long, String> totalStr;//辅助字段
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            AggregatingStateDescriptor<Long, String, String> descriptor =
                    new AggregatingStateDescriptor<Long, String, String>(
                            "totalStr",  // 状态的名字
    
                            //SparkSQL 自定义聚合函数
                            new AggregateFunction<Long, String, String>() {
                                //初始化的操作,只运行一次哦
                                @Override
                                public String createAccumulator() {
                                    return "Contains:";
                                }
    
                                // 字符 拼凑
                                @Override
                                public String add(Long value, String accumulator) {
                                    if ("Contains:".equals(accumulator)) {
                                        return accumulator + value;
                                    }
                                    return accumulator + " and " + value;
                                }
    
    
                                // 不同 slot的结果 merge
                                @Override
                                public String merge(String a, String b) {
                                    return a + " and " + b;
                                }
    
                                // 得到最终结果
                                @Override
                                public String getResult(String accumulator) {
                                    //contains:1
                                    //contains: 1 and 3 and
                                    return accumulator;
                                }
                            }, String.class); // 状态存储的数据类型
            totalStr = getRuntimeContext().getAggregatingState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, String>> out) throws Exception {
            totalStr.add(value.f1);
            out.collect(Tuple2.of(value.f0, totalStr.get()));
        }
    }
    
    

    触发效果:

    使用 ReducingState 实现数字累加效果

    public class ReducingStateSumFunction
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    
        //sum = 最终累加的结果的数据类型
        private ReducingState<Long> sumState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            ReducingStateDescriptor<Long> descriptor =
                    new ReducingStateDescriptor<Long>(
                            "sum",  // 状态的名字
                            new ReduceFunction<Long>() { // 聚合函数
                                @Override
                                public Long reduce(Long value1, Long value2) throws Exception {
                                    return value1 + value2;
                                }
                            }, Long.class); // 状态存储的数据类型
            sumState = getRuntimeContext().getReducingState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
            sumState.add(value.f1);
            out.collect(Tuple2.of(value.f0, sumState.get()));
        }
    }
    
    

    触发效果:

    总结

    以上刻意练习绝对会对 flink state 的使用,得心应手。

  • 相关阅读:
    20200323 Go语言基础
    20200313 图表工具与redis使用
    20200312 CMDB的磁盘数据查询
    20200311 CMDB的表设计
    20200320 代码发布之完结
    20200319 代码发布之任务发布钩子脚本
    Ubuntu 安装 MySQL 服务
    使用U盘重装系统(删除掉系统自带的Windows 10)
    Linux & Windows 上安装 Qt
    初次使用Tampermonkey脚本管理器
  • 原文地址:https://www.cnblogs.com/yyystar/p/15708891.html
Copyright © 2011-2022 走看看