zoukankan      html  css  js  c++  java
  • Flink实例(三十三):状态管理(四)自定义键控状态(三)MapState

    0 简介

    MapState[K, V]保存Key-Value对。

    • MapState.get(key: K)
    • MapState.put(key: K, value: V)
    • MapState.contains(key: K)
    • MapState.remove(key: K)

    1 实例

    1.1 实例一:

      去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。

      此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。

    实现步骤分析:

    • 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分
    • 数据分组使用广告位ID+点击事件所属的小时
    • 选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量
    • 计算完成之后的数据清理,按照时间进度注册定时器清理

    广告数据

    case class AdData(id:Int,devId:String,time:Long)

    分组数据

    case class AdKey(id:Int,time:Long)

    主流程

    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    val kafkaConfig=new Properties()
    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
    
    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)
    val ds=env.addSource(consumer)
            .map(x=>{
                    val s=x.split(",")
                    AdData(s(0).toInt,s(1),s(2).toLong)
                    }
                ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)){
                    override def extractTimestamp(element: AdData): Long = element.time
                })
          .keyBy(x=>{
                val endTime= 
                TimeWindow.getWindowStartWithOffset(x.time, 0, Time.hours(1).toMilliseconds)+Time.hours(1).toMilliseconds
                AdKey(x.id,endTime)
          })

      指定时间时间属性,这里设置允许1min的延时,可根据实际情况调整;
      时间的转换选择TimeWindow.getWindowStartWithOffset Flink在处理window中自带的方法,使用起来很方便,第一个参数 表示数据时间,第二个参数offset偏移量,默认为0,正常窗口划分都是整点方式,例如从0开始划分,这个offset就是相对于0的偏移量,第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位ID作为分组的Key。

    去重逻辑
      自定义Distinct1ProcessFunction 继承了KeyedProcessFunction, 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后在输出;
      定义两个状态:MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,mapstate中value作为rocksdb中的value, rocksdb中value 大小是有上限的,这种方式可以减少rocksdb value的大小;

      另外一个ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。

    class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {
    
      var devIdState: MapState[String, Int] = _
      var devIdStateDesc: MapStateDescriptor[String, Int] = _
      var countState: ValueState[Long] = _
      var countStateDesc: ValueStateDescriptor[Long] = _
      
      override def open(parameters: Configuration): Unit = {
        devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
        devIdState = getRuntimeContext.getMapState(devIdStateDesc)
        countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
        countState = getRuntimeContext.getState(countStateDesc)
      }
    
      override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {
        val currW=ctx.timerService().currentWatermark()
        if(ctx.getCurrentKey.time+1<=currW) {
            println("late data:" + value)
            return
          }
    
        val devId = value.devId
        devIdState.get(devId) match {
          case 1 => {
            //表示已经存在
          }
    
          case _ => {
            //表示不存在
            devIdState.put(devId, 1)
            val c = countState.value()
            countState.update(c + 1)
            //还需要注册一个定时器
            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)
          }
        }
        println(countState.value())
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {
        println(timestamp + " exec clean~~~")
        println(countState.value())
        devIdState.clear()
        countState.clear()
      }
    }

    数据清理通过注册定时器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示当watermark大于该小时结束时间+1就会执行清理动作,调用onTimer方法。

    在处理逻辑里面加了

    val currW=ctx.timerService().currentWatermark()
    if(ctx.getCurrentKey.time+1<=currW){
            println("late data:" + value)
            return
      }

    主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果,做了一个类似window机制里面的一个延时判断,将延时的数据过滤掉,也可以使用OutputTag 单独处理。

     1.2 实例二:

      我们知道电商平台会将用户与商品的交互行为收集记录下来,行为数据主要包括几个字段:userId、itemId、categoryId、behavior和timestamp。其中userId和itemId分别代表用户和商品的唯一ID,categoryId为商品类目ID,behavior表示用户的行为类型,包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)等,timestamp记录行为发生时间。本文采用阿里巴巴提供的一个淘宝用户行为数据集,为了精简需要,只节选了部分数据。下面的代码使用MapState[String, Int]记录某个用户某种行为出现的次数。这里读取了数据集文件,模拟了一个淘宝用户行为数据流。

    /**
      * 用户行为
      * categoryId为商品类目ID
      * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)
      * */
    case class UserBehavior(userId: Long,
                              itemId: Long,
                              categoryId: Int,
                              behavior: String,
                              timestamp: Long)
    
    class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] {
    
      // 指向MapState的句柄
      private var behaviorMapState: MapState[String, Int] = _
    
      override def open(parameters: Configuration): Unit = {
        // 创建StateDescriptor
        val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])
        // 通过StateDescriptor获取运行时上下文中的状态
        behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)
      }
    
      override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = {
        var behaviorCnt = 1
        // behavior有可能为pv、cart、fav、buy等
        // 判断状态中是否有该behavior
        if (behaviorMapState.contains(input.behavior)) {
          behaviorCnt = behaviorMapState.get(input.behavior) + 1
        }
        // 更新状态
        behaviorMapState.put(input.behavior, behaviorCnt)
        collector.collect((input.userId, input.behavior, behaviorCnt))
      }
    }
    
    def main(args: Array[String]): Unit = {
    
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      env.setParallelism(8)
    
      // 获取数据源
      val sourceStream: DataStream[UserBehavior] = env
      .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() {
        override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = {
          // 原始数据单位为秒,乘以1000转换成毫秒
          userBehavior.timestamp * 1000
        }
      }                                                                                            )
    
      // 生成一个KeyedStream
      val keyedStream =  sourceStream.keyBy(user => user.userId)
    
      // 在KeyedStream上进行flatMap
      val behaviorCountStream = keyedStream.flatMap(new MapStateFunction)
    
      behaviorCountStream.print()
    
      env.execute("state example")
    }
    
    class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] {
    
      var isRunning: Boolean = true
      // 输入源
      var streamSource: InputStream = _
    
      override def run(sourceContext: SourceContext[UserBehavior]): Unit = {
        // 从项目的resources目录获取输入
        streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path)
        val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines
        while (isRunning && lines.hasNext) {
          val line = lines.next()
          val itemStrArr = line.split(",")
          val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong)
          sourceContext.collect(userBehavior)
        }
      }
    
      override def cancel(): Unit = {
        streamSource.close()
        isRunning = false
      }
    }

      Keyed State是针对KeyedStream的状态,必须先对一个DataStream进行keyBy操作。在本例中,我们对用户ID进行了keyBy,那么用户ID为1的行为数据共享同一状态数据,以此类推,每个用户ID的行为数据共享自己的状态数据。

      之后,我们需要实现Rich类函数,比如RichFlatMapFunction,或者KeyedProcessFunction等函数类。这些算子函数类都是RichFunction的一种实现,他们都有运行时上下文RuntimeContextRuntimeContext包含了状态数据。 在实现这些算子函数类时,一般是在open方法中声明状态。open是算子的初始化方法,它在实际处理函数之前调用。

      具体到状态的使用,我们首先要注册一个StateDescriptor。从名字中可以看出,StateDescriptor是状态的一种描述,它描述了状态的名字和状态的数据结构。状态的名字可以用来区分不同的状态,一个算子内可以有多个不同的状态,每个状态的StateDescriptor需要设置不同的名字。同时,我们也需要指定状态的具体数据结构,指定具体的数据结构非常重要,因为Flink要对其进行序列化和反序列化,以便进行Checkpoint和必要的恢复。数据结构的类型和序列化机制可以参考我之前的文章:Flink进阶教程:数据类型和序列化机制简介

      在本例中,我们使用val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])注册了一个MapStateStateDescriptor,key为某种行为,如pv、buy等,数据类型为String,value为该行为出现的次数,数据类型为Int。此外,每种类型的状态都有对应的StateDescriptor,比如MapStateDescriptor对应MapStateValueStateDescriptor对应ValueState

      接着我们通过StateDescriptorRuntimeContext中获取状态句柄。本例中对应的代码为:behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)。状态句柄并不存储状态,它只是Flink提供的一种访问状态的接口,状态数据实际存储在State Backend中。

    使用和更新状态发生在实际的处理函数上,比如RichFlatMapFunction中的flatMap方法,在实现自己的业务逻辑时访问和修改状态,比如通过get方法获取状态。

     1.3 实例三

    package com.nx.streaming.lesson02;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    import org.apache.flink.util.Collector;
    
    import java.util.List;
    import java.util.UUID;
    
    /**
     *  MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合
     *      put() 将对应的 key 的键值对放到状态中
     *      values() 拿到 MapState 中所有的 value
     *      clear() 清除状态
     */
    public class CountAverageWithMapState
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
        // managed keyed state
        //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值
        /**
         * MapState:
         *      Map集合的特点,相同key,会覆盖数据。
         */
        private MapState<String, Long> mapState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            MapStateDescriptor<String, Long> descriptor =
                    new MapStateDescriptor<String, Long>(
                            "average",  // 状态的名字
                            String.class, Long.class); // 状态存储的数据类型
            mapState = getRuntimeContext().getMapState(descriptor);
        }
    
        /**
         *
         * @param element
         * @param out
         * @throws Exception
         */
        @Override
        public void flatMap(Tuple2<Long, Long> element,
                            Collector<Tuple2<Long, Double>> out) throws Exception {
    
            mapState.put(UUID.randomUUID().toString(), element.f1);
    
            // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
            List<Long> allElements = Lists.newArrayList(mapState.values());
    
            if (allElements.size() == 3) {
                long count = 0;
                long sum = 0;
                for (Long ele : allElements) {
                    count++;
                    sum += ele;
                }
                double avg = (double) sum / count;
                //
                out.collect(Tuple2.of(element.f0, avg));
                // 清除状态
                mapState.clear();
    
            }
        }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13794811.html

  • 相关阅读:
    迭代器和生成器
    New test
    MySQL笔记整理
    Python基础数据类型
    Python基础
    Python入门知识
    Linux / MacOS 下Redis 安装、配置和连接
    NuGet的使用心得
    简单工厂模式和策略模式的区别与结合
    NuGet的使用和服务搭建
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13794811.html
Copyright © 2011-2022 走看看