zoukankan      html  css  js  c++  java
  • Flink状态之AggregateState

    1、主类

    package com.example.demo.flink;
    
    import com.example.demo.flink.impl.CountAverageWithAggregateState;
    import com.example.demo.flink.impl.CountAverageWithReduceState;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    /**
     * @program: demo
     * @description: valuestate
     * @author: yang
     * @create: 2020-12-28 15:46
     */
    public class TestKeyedAggregateStateMain {
        public static void main(String[] args) throws  Exception{
            //获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
            //StreamExecutionEnvironment.getExecutionEnvironment();
            //设置并行度
            env.setParallelism(16);
            //获取数据源
            DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                    env.fromElements(
                            Tuple2.of(1L, 3L),
                            Tuple2.of(1L, 7L),
                            Tuple2.of(2L, 4L),
                            Tuple2.of(1L, 5L),
                            Tuple2.of(2L, 2L),
                            Tuple2.of(2L, 6L));
    
    
            // 输出:
            //(1,5.0)
            //(2,4.0)
            dataStreamSource
                    .keyBy(0)
                    .flatMap(new CountAverageWithAggregateState())
                    .print();
    
    
            env.execute("TestStatefulApi");
        }
    
    }

    2、处理实现类

    package com.example.demo.flink.impl;
    
    /**
     * @program: demo
     * @description: valuestate
     * @author: yang
     * @create: 2020-12-28 16:26
     */
    
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.AggregatingState;
    import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    import org.apache.flink.api.common.state.ReducingState;
    import org.apache.flink.api.common.state.ReducingStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    
    /**
     *  ValueState<T> :这个状态为每一个 key 保存一个值
     *      value() 获取状态值
     *      update() 更新状态值
     *      clear() 清除状态
     *
     *      IN,输入的数据类型
     *      OUT:数据出的数据类型
     */
    public class CountAverageWithAggregateState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
    
        private AggregatingState<Long,String> aggregatingState;
    
        /**初始化*/
        @Override
        public void open(Configuration parameters) throws Exception {
            AggregatingStateDescriptor descriptor = new AggregatingStateDescriptor<Long,String,String>("AggregatingDescriptor", new AggregateFunction<Long,String,String>() {
                //变量初始化
                @Override
                public String createAccumulator() {
                    return "Contains";
                }
    
                //数据处理
                @Override
                public String add(Long value, String accumulator) {
                    return "Contains".equals(accumulator) ? accumulator + value : accumulator + "and" + value;
                }
                //返回值函数
                @Override
                public String getResult(String accumulator) {
                    return accumulator;
                }
                //好像无用.......debug并没有使用到该函数
                @Override
                public String merge(String o, String acc1) {
                    return o + "and1111" + acc1;
                }
            },String.class);
    
            aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<Long, Long> ele, Collector<Tuple2<Long, String>> collector) throws Exception {
            aggregatingState.add(ele.f1);
            collector.collect(Tuple2.of(ele.f0,aggregatingState.get()));
        }
    }
  • 相关阅读:
    跟初学者学习IbatisNet第三篇
    跟初学者学习IbatisNet第二篇
    跟初学者学习IbatisNet第一篇
    读书时如何做读书笔记
    CCNET+MSBuild+SVN实现每日构建
    WebApi 通过类名获取类并实例化
    HttpClient设置代理
    dotnet core命令行启动如何支持如何找到进程
    redis 在Windows下的安装及基本操作(更新中~~~)
    Spring实现IoC的多种方式
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14228589.html
Copyright © 2011-2022 走看看