zoukankan      html  css  js  c++  java
  • Flink实例(三十二):状态管理(三)自定义键控状态(二)ListState

    0 简介

    ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:

    • ListState.add(value: T)
    • ListState.addAll(values: java.util.List[T])
    • ListState.get()返回Iterable[T]
    • ListState.update(values: java.util.List[T])

      ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。

    1.实例

    1.1 实例一

    首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:

        private var itemState : ListState[ItemViewCount] = _
    
        override def open(parameters: Configuration): Unit = {
    
          //命名状态变量的名字和类型
          val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
          itemState = getRuntimeContext.getListState(itemStateDescription)
        }

    ListStateDescriptor提供了几种不同的定义方式:

    两个参数分别是ListStateDescriptor的名字和typeClass

    1.2 实例二

    package com.nx.streaming.lesson02;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
    import org.apache.flink.util.Collector;
    
    import java.util.Collections;
    import java.util.List;
    
    /**
     *  ListState<T> :这个状态为每一个 key 保存集合的值
     *      get() 获取状态值
     *      add() / addAll() 更新状态值,将数据放到状态中
     *      clear() 清除状态
     */
    public class CountAverageWithListState
            extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
        // managed keyed state
        /**
         * ValueState : 里面只能存一条元素
         * ListState : 里面可以存很多数据
         */
        private ListState<Tuple2<Long, Long>> elementsByKey;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 注册状态
            ListStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ListStateDescriptor<Tuple2<Long, Long>>(
                            "average",  // 状态的名字
                            Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
            elementsByKey = getRuntimeContext().getListState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<Long, Long> element,
                            Collector<Tuple2<Long, Double>> out) throws Exception {
            // 拿到当前的 key 的状态值
            Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
    
            // 如果状态值还没有初始化,则初始化
            if (currentState == null) {
                elementsByKey.addAll(Collections.emptyList());
            }
    
            // 更新状态
            elementsByKey.add(element);
    
    
            // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
            List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());
    
            if (allElements.size() == 3) {
                long count = 0;
                long sum = 0;
                for (Tuple2<Long, Long> ele : allElements) {
                    count++;
                    sum += ele.f1;
                }
                double avg = (double) sum / count;
                out.collect(Tuple2.of(element.f0, avg));
                // 清除状态
                elementsByKey.clear();
            }
        }
    }

    总结

    Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:

    • 直接基于keyedStream或者由keyedStream转换来的windowedStream
    • 必须继承RichFunction

    实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:

    val fromTransactionDataStream = watermarkTransaction
          .keyBy(_.code)
          .window(TumblingEventTimeWindows.of(Time.seconds(10)))
          
    val transaction = fromTransactionDataStream
          .apply(new StockTransactionApply)
          .keyBy(_._3)
          .flatMap(new TransactionStateFlatMapFunction)

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13794687.html

  • 相关阅读:
    PHP openssl_encrypt函数安全漏洞
    WordPress Organizer插件安全漏洞
    WordPress Poll插件多个SQL注入和安全绕过漏洞
    Redis 重写任意文件漏洞
    WordPress Events Manager插件多个跨站脚本漏洞
    项目建议书编写总结
    www.beihua.edu.cn计划摘录
    《易经》对中华文化的影响
    网站调研资料记录
    积分和排名
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13794687.html
Copyright © 2011-2022 走看看