zoukankan      html  css  js  c++  java
  • Flink实例(三十一):状态管理(二)自定义键控状态(一)ValueState

    0 简介

    ValueState[T]保存单个的值,值的类型为T。

    • get操作: ValueState.value()
    • set操作: ValueState.update(value: T)

    1 实例

    1.1 实例一

    scala version

    复制代码
    val sensorData: DataStream[SensorReading] = ...
    val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
    
    val alerts: DataStream[(String, Double, Double)] = keyedData
      .flatMap(new TemperatureAlertFunction(1.7))
    
    class TemperatureAlertFunction(val threshold: Double)
      extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
      private var lastTempState: ValueState[Double] = _
    
      override def open(parameters: Configuration): Unit = {
        val lastTempDescriptor = new ValueStateDescriptor[Double](
          "lastTemp", classOf[Double])
    
        lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
      }
    
      override def flatMap(
        reading: SensorReading,
        out: Collector[(String, Double, Double)]
      ): Unit = {
        val lastTemp = lastTempState.value()
        val tempDiff = (reading.temperature - lastTemp).abs
        if (tempDiff > threshold) {
          out.collect((reading.id, reading.temperature, tempDiff))
        }
        this.lastTempState.update(reading.temperature)
      }
    }
    复制代码

    上面例子中的FlatMapFunction只能访问当前处理的元素所包含的key所对应的状态变量。

    不同key对应的keyed state是相互隔离的。

    • 通过RuntimeContext注册StateDescriptor。StateDescriptor以状态state的名字和存储的数据类型为参数。数据类型必须指定,因为Flink需要选择合适的序列化器。
    • 在open()方法中创建state变量。注意复习之前的RichFunction相关知识。

    1.2.实例二

    package com.nx.streaming.lesson02;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    
    /**
     *  ValueState<T> :这个状态为每一个 key 保存一个值
     *      value() 获取状态值
     *      update() 更新状态值
     *      clear() 清除状态
     *
     *      IN,输入的数据类型
     *      OUT:数据出的数据类型
     */
    public class CountAverageWithValueState
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
        /**
         * 定义一个state:keyed state
         *
         * 1. ValueState里面只能存一条数据,如果来了第二条,就会覆盖第一条。
         * 2. Tuple2<Long, Long>
         *      Long:
         *          当前的key出现多少次  count 3
         *      Long:
         *          当前的value的总和   sum
         *
         *          sum/count = avg
         *
         *
         *  如果我们想要使用这个state,首先要对state进行注册(初始化),固定的套路
         *
         *
         */
        private ValueState<Tuple2<Long, Long>> countAndSum;
    
        /**
         * 这个方法其实是一个初始化的方法,只会执行一次
         * 我们可以用来注册我们的状态
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<Tuple2<Long, Long>>(
                            "average",  // 状态的名字
                            Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
    
            countAndSum = getRuntimeContext().getState(descriptor);
        }
    
        /**
         * 每来一条数据,都会调用这个方法
         * key相同
         * @param element
         * @param out
         * @throws Exception
         */
        @Override
        public void flatMap(Tuple2<Long, Long> element,
                            Collector<Tuple2<Long, Double>> out) throws Exception {
            // 拿到当前的 key 的状态值
            Tuple2<Long, Long> currentState = countAndSum.value();
    
            // 如果状态值还没有初始化,则初始化
            if (currentState == null) {
                currentState = Tuple2.of(0L, 0L);
            }
            // 更新状态值中的元素的个数 : count
            currentState.f0 += 1;
    
            // 更新状态值中的总值
            currentState.f1 += element.f1;
            // 更新状态
            countAndSum.update(currentState);
    
            // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
            if (currentState.f0 == 3) {
                double avg = (double)currentState.f1 / currentState.f0;
                // 输出 key 及其对应的平均值
                out.collect(Tuple2.of(element.f0, avg));
                //  清空状态值
                countAndSum.clear();
            }
        }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13794625.html

  • 相关阅读:
    全局变量引用与声明
    Oracle基础~dg原理
    Oracle基础~dg管理
    Oracle基础~rman克隆
    oracle基础~rman恢复篇
    oracle基础~linux整体性能优化
    oracle基础~报错汇总与解决办法
    oracle基础~用户和权限
    oracle基础~rac-asm
    oracle基础~awr报告
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13794625.html
Copyright © 2011-2022 走看看