接上节继续,今天学习Flink中状态的使用。数据处理的过程中,对当前数据的处理,有时候要依赖前一条数据的值,这种被称为“有状态”的计算。
举个例子:有这么一个公司,喜欢用内部沟通软件(类似企业微信)来做员工考勤,假设这个软件会定时上报每个员工的在线状态,如果在线(online),认为员工在上班,如果离线(offline),认为没在工作。
上报的数据结构类似:
{ "event_datetime": "2020-12-20 20:17:04.291", "employee": "mike", "event_timestamp": "1608466624291", "status": "online" }
或
{ "event_datetime": "2020-12-20 20:17:14.294", "employee": "jerry", "event_timestamp": "1608466634294", "status": "offline" }
需求:根据上报的数据,实时统计每个员工online状态与offline状态的累加时长。(注:不用太计较这个例子的合理性,只是为了说明flink中状态的用法)。
列个表格分析一下:
序号 | 员工 | 上报时间 | 上报状态 | 状态累加时长(ms) |
1 | jerry | 2020-12-20 15:31:48 | offline | offline:0 (第1条数据的时长初始值) |
2 | jerry | 2020-12-20 15:31:49 | offline | offline: 1000 |
3 | jerry | 2020-12-20 15:31:50 | online | offline:2000,online:0 (首次遇到online的时长初始值) |
4 | jerry | 2020-12-20 15:31:51 | online | offline: 2000,online:1000 |
假设员工jerry,连续上报了4条数据,第2条数据上报过来时,发现与第1条数据相比,状态没变,还是offline状态,所以offline的累加时长为1000ms(即:1秒),第3条数据上报过来时,变成了online,即:offline状态结束了,累加时长再加1秒,变成 2000ms,第4条数据上报过来时,相比前1条状态没变,还是online状态,认为是online状态的延续,online时长为1000ms,一直这样处理下去...。
很容易想到,每次数据处理的时候,至少需要3个辅助“变量”:
1、 记录上一条数据的状态 (用于判断本条状态是否发生了变化)
2、 记录上一条数据的上报时间 (用于计算本条数据与上条数据之间的时间差,另外也可用于判断数据是否乱序-即:先发后到)
3、 记录每种状态当前的累加时间。
这种辅助变量,在flink中就是状态, 1、2对应的是ValueState,3对应的是 MapState。
铺垫了这么多,上代码:
package com.cnblogs.yjmyzz.flink.demo; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; import java.text.SimpleDateFormat; import java.util.Map; import java.util.Properties; /** * @author 菩提树下的杨过(http : / / yjmyzz.cnblogs.com /) */ public class KafkaKeyedStateSample { private final static Gson gson = new Gson(); private final static String SOURCE_TOPIC = "test5"; private final static String SINK_TOPIC = "test6"; private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); public static void main(String[] args) throws Exception { // 1 设置环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 定义数据 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "test-read-group-4"); props.put("deserializer.encoding", "GB2312"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); DataStreamSource<String> text = env.addSource(new FlinkKafkaConsumer011<>( SOURCE_TOPIC, new SimpleStringSchema(), props)); // 3. 处理逻辑 DataStream<Tuple2<String, Long>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Map<String, String>>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Map<String, String>>> out) throws Exception { if (StringUtils.isNullOrWhitespaceOnly(value)) { return; } //解析message中的json Map<String, String> map = gson.fromJson(value, new TypeToken<Map<String, String>>() { }.getType()); String employee = map.getOrDefault("employee", ""); out.collect(new Tuple2<>(employee, map)); } }) .keyBy(value -> value.f0) .flatMap(new RichFlatMapFunction<Tuple2<String, Map<String, String>>, Tuple2<String, Long>>() { //保存最后1次上报状态的时间戳 ValueState<Long> lastTimestamp = null; //保存最后1次的状态 ValueState<String> lastStatus = null; //记录每个状态的持续时长累加值 MapState<String, Long> statusDuration = null; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Long> lastTimestampDescriptor = new ValueStateDescriptor<>("lastTimestamp", Long.class); lastTimestamp = getRuntimeContext().getState(lastTimestampDescriptor); ValueStateDescriptor<String> lastStatusDescriptor = new ValueStateDescriptor<>("lastStatus", String.class); lastStatus = getRuntimeContext().getState(lastStatusDescriptor); MapStateDescriptor<String, Long> statusDurationDescriptor = new MapStateDescriptor<>("statusDuration", String.class, Long.class); statusDuration = getRuntimeContext().getMapState(statusDurationDescriptor); } @Override public void flatMap(Tuple2<String, Map<String, String>> in, Collector<Tuple2<String, Long>> out) throws Exception { long timestamp = Long.parseLong(in.f1.get("event_timestamp")); String employee = in.f1.get("employee"); String empStatus = in.f1.get("status"); String collectEmpStatus = empStatus; long duration = 0; if (lastTimestamp == null || lastTimestamp.value() == null) { //第1条数据 duration = 0; } else if (timestamp > lastTimestamp.value()) { //不接受乱序数据 if (empStatus.equalsIgnoreCase(lastStatus.value())) { //状态没变,时长累加 duration = statusDuration.get(collectEmpStatus) + (timestamp - lastTimestamp.value()); } else { //状态变了,上次的状态时长累加 collectEmpStatus = lastStatus.value(); duration = statusDuration.get(collectEmpStatus) + (timestamp - lastTimestamp.value()); } } else { return; } lastTimestamp.update(timestamp); lastStatus.update(empStatus); statusDuration.put(collectEmpStatus, duration); if (!collectEmpStatus.equalsIgnoreCase(empStatus) && !statusDuration.contains(empStatus)) { statusDuration.put(empStatus, 0L); } out.collect(new Tuple2<>(employee + ":" + collectEmpStatus, duration)); } }) .keyBy(v -> v.f0); // 4. 打印结果 counts.addSink(new FlinkKafkaProducer010<>("localhost:9092", SINK_TOPIC, (SerializationSchema<Tuple2<String, Long>>) element -> ("(" + element.f0 + "," + element.f1 + ")").getBytes())); counts.print(); // execute program env.execute("Kafka Streaming KeyedState sample"); } }
代码看似很长,挺吓人,但是结构其实与之前几节讲的wordCount类似,头尾不用看,都是常规套路,开头从数据源拿数据,最后把计算结果输出。关键看60-126这一段处理逻辑,分为几个步骤:
1、 62-74行,解析kafka消息中的json体,考虑到会有多个员工上报状态,所以按员工名称做一个分组,方便下一步每位员工分别统计时长
2、 75行,这里注意一下,要使用状态,必须使用RichFlapMapFunction,它的第1个参数,为上一步按员工号分组后的信息;第2个参数,为处理后的输出结果。
3、 76-81行,这时定义了3个状态(即:前面提到的辅助变量)
4、 84-93行,上面定义的3个状态都没有初始化,必须在open函数里进行初始化。
5、 96-123行,就是业务处理过程(也就是实现业务需求的核心处理),注意每次处理完后,记得更新状态的值,这样下一条数据进来时,才能记住前1条数据的"状态"。另外out.collect(...) 相当于向下游输出计算结果。
测试方法:
1、 先准备4条数据:
{"event_datetime":"2020-12-20 15:31:48","employee":"jerry","event_timestamp":"1608449508304","status":"offline"} {"event_datetime":"2020-12-20 15:31:49","employee":"jerry","event_timestamp":"1608449509304","status":"offline"} {"event_datetime":"2020-12-20 15:31:50","employee":"jerry","event_timestamp":"1608449510304","status":"online"} {"event_datetime":"2020-12-20 15:31:51","employee":"jerry","event_timestamp":"1608449511304","status":"online"} {"event_datetime":"2020-12-20 15:31:52","employee":"jerry","event_timestamp":"1608449512304","status":"offline"}
2、 启动kafka,打开自带的producer控制台
./kafka-console-producer.sh --broker-list localhost:9092 --topic test5
3、 把示例代码跑起来后,在producer控制台,依次输入上面4条数据 ,观察flink的输出: