0 简介
ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:
- ListState.add(value: T)
- ListState.addAll(values: java.util.List[T])
- ListState.get()返回Iterable[T]
- ListState.update(values: java.util.List[T])
ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。
1.实例
1.1 实例一
首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:
private var itemState : ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { //命名状态变量的名字和类型 val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]) itemState = getRuntimeContext.getListState(itemStateDescription) }
ListStateDescriptor提供了几种不同的定义方式:
两个参数分别是ListStateDescriptor的名字和typeClass
1.2 实例二
package com.nx.streaming.lesson02; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector; import java.util.Collections; import java.util.List; /** * ListState<T> :这个状态为每一个 key 保存集合的值 * get() 获取状态值 * add() / addAll() 更新状态值,将数据放到状态中 * clear() 清除状态 */ public class CountAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> { // managed keyed state /** * ValueState : 里面只能存一条元素 * ListState : 里面可以存很多数据 */ private ListState<Tuple2<Long, Long>> elementsByKey; @Override public void open(Configuration parameters) throws Exception { // 注册状态 ListStateDescriptor<Tuple2<Long, Long>> descriptor = new ListStateDescriptor<Tuple2<Long, Long>>( "average", // 状态的名字 Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型 elementsByKey = getRuntimeContext().getListState(descriptor); } @Override public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception { // 拿到当前的 key 的状态值 Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get(); // 如果状态值还没有初始化,则初始化 if (currentState == null) { elementsByKey.addAll(Collections.emptyList()); } // 更新状态 elementsByKey.add(element); // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出 List<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get()); if (allElements.size() == 3) { long count = 0; long sum = 0; for (Tuple2<Long, Long> ele : allElements) { count++; sum += ele.f1; } double avg = (double) sum / count; out.collect(Tuple2.of(element.f0, avg)); // 清除状态 elementsByKey.clear(); } } }
总结
Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:
- 直接基于keyedStream或者由keyedStream转换来的windowedStream
- 必须继承RichFunction
实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:
val fromTransactionDataStream = watermarkTransaction .keyBy(_.code) .window(TumblingEventTimeWindows.of(Time.seconds(10))) val transaction = fromTransactionDataStream .apply(new StockTransactionApply) .keyBy(_._3) .flatMap(new TransactionStateFlatMapFunction)