简介
Apache Flink® — Stateful Computations over Data Streams
有状态的流是其最大的特性, 可以使用 stage 进行各种状态的保存。
flink 架构图
flink 核心概念
有状态
分布式
并行度
flink 分布式运行环境:
TaskManager -> slot -> task -> 并行度
数据传输的策略
forward strategy
- 一个 task 的输出只发送给一个 task 作为输入
- 如果两个 task 都在一个 JVM 中的话,那么就可以避免网络开销
key based strategy (key by)
- 数据需要按照某个属性(我们称为 key)进行分组(或者说分区)
- 相同 key 的数据需要传输给同一个 task,在一个 task 中进行处理
broadcast strategy
就是把数据广播到下游的所有task中
random strategy
- 数据随机的从一个 task 中传输给下一个 operator 所有的 subtask
- 保证数据能均匀的传输给所有的 subtask
Operator Chain
Operator Chain的条件:
- 数据传输策略是 forward strategy
- 在同一个 TaskManager 中运行
如下 keyBy-sum 会和 map 合并在一个 JVM线程中进行处理,也就是搞成一个task去运行里面的业务逻辑。
Flink四层图结构
Stream Graph
Job Graph
Execution Graph
Physical Execution Graph
Flink之数据源
获取source的方式
(1)基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
(2)基于socket
socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。
(3)基于集合
fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
(4)自定义输入
addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
常见 transform 操作
union :
合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的
connect,conMap和conFlatMap :
和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
Split和Select:
根据规则把一个数据流切分为多个流
应用场景:
- 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在
根据一定的规则, - 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了
state机制
此机制毫不夸张的讲 是 flink 最牛B的特性之一, 比 spark streaming的 状态管理好用 100倍。就靠这点也足够支持 flink作为最牛逼的实时计算框架。
接下来会对 state 进行非常详细的讲解
State类型
state : Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原
始状态(raw state)和托管状态(managed state)
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态
内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用
户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑他。
Operator State(task级别的)
- operator state是task级别的state,说白了就是每个task对应一个state
- Kafka Connector source中的每个分区(task)都需要记录消费的topic的partition和offset等信
息。
Keyed State(针对每一个key) 可以理解 这个 stage 最顶层是一个 map, map的key 就是 keyBy的 key,故可以存储和使用具体任意一个 KEY 对应的状态
- keyed state 记录的是每个key的状态
- Keyed state托管状态有六种类型:
- ValueState
- ListState
- MapState
- ReducingState
- AggregatingState
- FoldingState
state 分类图谱:
State backend 状态存储也就是这些状态数据后端是怎么存储,以及使用啥存储的
Flink支持的StateBackend:
MemoryStateBackend
默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到
JobManager 的堆内存中。
缺点:
只能保存数据量小的状态
状态数据有可能会丢失
优点:
开发测试很方便
FsStateBackend
状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS
等文件系统)
缺点:
状态大小受TaskManager内存限制(默认支持5M)
优点:
状态访问速度很快
状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况
RocksDBStateBackend
状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
缺点:
状态访问速度有所下降
优点:
可以存储超大量的状态信息
状态信息不会丢失
用于: 生产,可以存储超大量的状态信息
StateBackend配置方式
(1)单任务调整
修改当前任务代码
env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
(2)全局调整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
checkpoint(容错)
概述:
(1)为了保证state的容错性,Flink需要对state进行checkpoint。
(2)Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个
Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩
溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
(3)Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列
(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)
生成快照。5s
恢复快照
checkpoint配置
默认checkpoint功能是disabled的,想要使用的时候需要先启用,checkpoint开启之后,
checkPointMode有两种,Exactly-once和At-least-once,默认的checkPointMode是Exactly-once,
Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终
延迟为几毫秒)。
默认checkpoint功能是disabled的,想要使用的时候需要先启用
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的
Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCl
eanup.RETAIN_ON_CANCELLATION);
恢复数据(容错)
默认的重启策略是在 Flink 的配置文件 flink-conf.yaml 中指定。
restart-strategy 具体包括如下:
(1)固定间隔 (Fixed delay)
(2)失败率 (Failure rate)
(3)无重启 (No restart)
没有启动 CK 则肯定没有重启策略,若启用了,则默认是 Fixed delay 的重启策略, 尝试重启次数
默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在
应用代码中动态指定,会覆盖全局配置。
重启策略
固定间隔 (Fixed delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
失败率 (Failure rate)
第一种:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.noRestart());
多checkpoint
默认只保留最近成功生成的1个 checkpoint , 程序失败时从这个CK来进行恢复。但是也可以配置多个CK , 这样恢复能力更强一些。
conf/flink-conf.yam
以下配置最多需要保存CK的个数:
state.checkpoints.num-retained: 20
通过以下查看, 就能知道所有的CK 列表信息,若要恢复回退只需要指定对应的CK启动即可
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
Watermark
window 切分
根据每隔5秒执行最近10秒的数据,Flink划分的窗口, 左闭右开, 是按照时间间隔为最小粒度切分窗口的。
[00:00:00, 00:00:05) [00:00:05, 00:00:10)
[00:00:10, 00:00:15) [00:00:15, 00:00:20)
[00:00:20, 00:00:25) [00:00:25, 00:00:30)
[00:00:30, 00:00:35) [00:00:35, 00:00:40)
[00:00:40, 00:00:45) [00:00:45, 00:00:50)
[00:00:50, 00:00:55) [00:00:55, 00:01:00)
[00:01:00, 00:01:05) ...
Time的种类
Event Time:事件产生的时间,它通常由事件中的时间戳描述。
Ingestion time:事件(日志,数据,消息)进入Flink的时间(不考虑)
Processing Time:事件被处理时当前系统的时间
如下图所示,需求模拟, 第13秒发送 2个事件, 第 16秒发送 一个事件:
现在需求是每隔5S计算10S之内的窗口结果;
Process Time Window(有序)
下图是按照 执行时间窗口计算的示意图:最终计算出来的结果就是
(hadoop,2)
(hadoop,3)
(hadoop,1)
Process Time Window(无序)
如下图所示 原本 16S 的一个事件延迟到了 19S 才发送出去,最终结果如下;
发现按照执行时间处理的话, 在发生事件延迟到达的时候, 对结果的准确度影响非常大;
(hadoop,1)
(hadoop,3)
(hadoop,2)
使用Event Time处理无序
关键代码
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2<String, Long> element,
long previousElementTimestamp) {
return element.f1;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
}
如下图所示执行计算,最终结果如下:
(hadoop,1)
(hadoop,3)
(hadoop,1)
可以发现按照事件时间是比较接近真实结果的, 但是比如第一个窗口消息已经延迟了, 则不会触发计算。 最终放大招,终极解决方案就是 waterMark
使用【WaterMark】机制解决无序
如下图所示执行过程,划红线的绿色框就代表着水印, 水印左边的是真实窗口可以理解为水杯中存储的实际的水, 右边的代表水面以上部分到达杯沿, 只有杯沿 时间 大于等于 窗口结束时间时就会触发窗口计算。这也是为啥叫 watermaker的原因;
最终执行结果如下 : 可以发现完全符合预期,可以水印 把 19S延迟到达的消息还是放到 第一个窗口去计算了 。
(hadoop,2)
(hadoop,3)
(hadoop,1)
关键核心代码:
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2<String, Long> element,
long previousElementTimestamp) {
return element.f1;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
//window延迟5秒触发
return new Watermark(System.currentTimeMillis() - 5000);
}
}
WaterMark机制
产生的根本原因是消息乱序,延迟到达, kafka 是可能出现消息延迟的, 比如挤压、业务处理延迟、或者消费失败重试 都可以导致先处理的消息把后处理的消息延迟到达;
既然乱序肯定会产生那如何催进窗口触发计算呢? 那当然是通过 watermaker机制,通过此机制触发窗口啥时候 进行计算。
有序的流的watermarks
无序的流的watermarks
多并行度流的watermarks
一个window可能会接受到多个waterMark,我们以最小的为准。
需求-得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件 - 滚动窗口计算
关键代码:
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置waterMark产生的周期为1s
env.getConfig().setAutoWatermarkInterval(1000);
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime = 0L;
private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2<String, Long> element,
long previousElementTimestamp) {
long currentElementEventTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,
currentElementEventTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event
Time
+ "|" +
dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementEventTime;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
/**
* WasterMark会周期性的产生,默认就是每隔200毫秒产生一个
*
* 设置 watermark 产生的周期为 1000ms
* env.getConfig().setAutoWatermarkInterval(1000);
*/
//window延迟5秒触发
System.out.println("water mark...");
return new Watermark(currentMaxEventTime - maxOutOfOrderness);
}
}
}
总结下window 触发计算的机制如下:
也就是 watermaker 驱动着窗口的计算,并且窗口内的必须有数据,并且水印计算的时候 窗口必须按照 事件事件处理
1, watermaker 时间 >= window_end_time
2, 在 [window_start_time, window_end_time) 区间中有数据存在,注意是左闭右开的区间,而且是以 event time 来计算的
迟到太多的事件
上述 使用【WaterMark】机制解决无序 的案例中,WaterMark 要是不够长, 或者消息延迟太多, 还是可能会导致消息丢失不会在窗口中进行计算。
具体延迟太多的消息是否触发计算, 可以考虑直接丢失, 因为延迟太多意义不大; 或者采取其他手段,接下来就介绍这种情况该如何处理。
总体延迟消息的处理一共下述三种方案:
- 丢弃,这个是默认的处理方式
- allowedLateness 指定允许数据延迟的时间(我们一般) 这种方式会导致业务异常复杂不好处理。
).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
- sideOutputLateData 收集迟到的数据(修正实时的数据)
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置waterMark产生的周期为1s
env.getConfig().setAutoWatermarkInterval(1000);
// 保存迟到的,会被丢弃的数据
OutputTag<Tuple2<String, Long>> outputTag =
new OutputTag<Tuple2<String, Long>>("late-date"){};
SingleOutputStreamOperator<String> result = dataStream.map(new
MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0], Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(3))
// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.sideOutputLateData(outputTag) // 保存迟到太多的数据
.process(new SumProcessWindowFunction());
DataStream<String> lateDataStream
= result.getSideOutput(outputTag).map(new
MapFunction<Tuple2<String, Long>, String>() {
@Override
public String map(Tuple2<String, Long> stringLongTuple2) throws
Exception {
return "迟到的数据:" + stringLongTuple2.toString();
}
});
// 以上迟到的消息就可以再保存到 kafka 后续再进行处理;
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime = 0L;
private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2<String, Long> element,
long previousElementTimestamp) {
long currentElementEventTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,
currentElementEventTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event
Time
+ "|" +
dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementEventTime;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
/**
* WasterMark会周期性的产生,默认就是每隔200毫秒产生一个
*
* 设置 watermark 产生的周期为 1000ms
* env.getConfig().setAutoWatermarkInterval(1000);
*/
System.out.println("water mark...");
return new Watermark(currentMaxEventTime - maxOutOfOrderness);
}
}
}
多并行度下的WaterMark 测试验证演示
结论如下:
一个window可能会接受到多个waterMark,我们以最小的为准。
其实 就是 source transform sink 等算子的并发度 大于 1 , 至少2个以上, 会导致
接下来要举例论证上述原理: 下面验证一种场景 3S 滚动窗口计算,按照事件时间处理
1、 设置实时任务并行度为2
按照如下打印执行事件:
当前线程ID、事件时间、max事件时间、水印(max事件时间 - 10S)
打印消费消息结果如下:
当前线程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33
当前线程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20
当前线程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38
会发现最终处理的窗口是 window start time : 19:34:30、window end time : 19:34:33, 此时处理的窗口中的事件是 (000001,1461756870000)|19:34:30
线程ID 56 最终水印以最大的为准,也就是 19:34:38;而此时线程 55 的水印 是 19:34:33 ; 不同线程的水印以最小的为准,所以最终的水印就是 19:34:33 会触发
【19:34:30,19:34:33) 窗口的计算,最终打印的日志也证明了猜想。
Window
Window类型
窗口通常被区分为不同的类型:
tumbling windows:滚动窗口 【没有重叠】 (time,count)
sliding windows:滑动窗口 【有重叠】(time,count)
session windows:会话窗口 (time)
global windows: 没有窗口
Window类型总结
Keyed Window 和 Non Keyed Window
Keyed Windows:
stream
.keyBy(...) 1 <- keyed versus non-keyed windows
.window(...) 1 <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] 1 <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] 1 <- optional: "output tag" (else no side
output for late data)
.reduce/aggregate/fold/apply() 1 <- required: "function"
[.getSideOutput(...)] 1 <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output
for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
global window
global window + trigger 一起配合才能使用
单词每出现三次统计一次
stream.keyBy(0)
.window(GlobalWindows.create())
//如果不加这个程序是启动不起来的
.trigger(CountTrigger.of(3))
.sum(1)
.print();
以上需求实现是每三个元素到来就触发统计,但是 sum stage 会一直累加保留,那有啥办法可以每次只打印最近三个元素的和呢 ? 解决方案就是需要自定义 trigger,状态自己存储,当触发窗口计算时
对已经计算的窗口元素清除。 具体实现见下述所示。
Trigger
需求:实现每隔3个单词,计算最近的计数,计数结果不能包含之前的计数结果。
/**
* 使用Trigger 自己实现一个类似CountWindow的效果
*/
public class CountWindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream =
env.socketTextStream("10.148.15.10", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> stream =
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>>
collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(Tuple2.of(word, 1));
}
}
});
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow
= stream.keyBy(0)
.window(GlobalWindows.create())
.trigger(new MyCountTrigger(3));
//可以看看里面的源码,跟我们写的很像
// WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow>
keyedWindow = stream.keyBy(0)
// .window(GlobalWindows.create())
// .trigger(CountTrigger.of(3));
DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);
wordCounts.print().setParallelism(1);
env.execute("Streaming WordCount");
}
private static class MyCountTrigger
extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
// 表示指定的元素的最大的数量
private long maxCount;
// 用于存储每个 key 对应的 count 值
private ReducingStateDescriptor<Long> stateDescriptor
= new ReducingStateDescriptor<Long>("count", new
ReduceFunction<Long>() {
@Override
public Long reduce(Long aLong, Long t1) throws Exception {
return aLong + t1;
}
}, Long.class);
public MyCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
/**
* 当一个元素进入到一个 window 中的时候就会调用这个方法
* @param element 元素
* @param timestamp 进来的时间
* @param window 元素所属的窗口
* @param ctx 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除
window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2<String, Integer> element,
long timestamp,
GlobalWindow window,
TriggerContext ctx) throws Exception {
// 拿到当前 key 对应的 count 状态值
ReducingState<Long> count =
ctx.getPartitionedState(stateDescriptor);
// count 累加 1
count.add(1L);
// 如果当前 key 的 count 值等于 maxCount
if (count.get() == maxCount) {
count.clear();
// 触发 window 计算,删除数据
return TriggerResult.FIRE_AND_PURGE;
}
// 否则,对 window 不做任何的处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time,
GlobalWindow window,
TriggerContext ctx) throws
Exception {
// 写基于 Processing Time 的定时器任务逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time,
GlobalWindow window,
TriggerContext ctx) throws Exception {
// 写基于 Event Time 的定时器任务逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws
Exception {
// 清除状态值
ctx.getPartitionedState(stateDescriptor).clear();
}
}
}
Evictor
需求:实现每隔2个单词,计算最近3个单词; 要求不使用现成的API 实现
实现思路:
1、 定义自定义 trigger 当收集到2个单词之后触发计算
2、自定义 trigger时只会触发计算,不会把窗口中的元素清除掉
3、自定义 Evictor 每次触发计算时,只计算最近三个。把之前的老数据清除掉
4、以上组合就实现了 每隔2个单词,计算最近三个单词的效果。
private static class MyCountEvictor
implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
// window 的大小
private long windowCount;
public MyCountEvictor(long windowCount) {
this.windowCount = windowCount;
}
/**
* 在 window 计算之前删除特定的数据
* @param elements window 中所有的元素
* @param size window 中所有元素的大小
* @param window window
* @param evictorContext 上下文
*/
@Override
public void evictBefore(Iterable<TimestampedValue<Tuple2<String,
Integer>>> elements,
int size, GlobalWindow window, EvictorContext
evictorContext) {
if (size <= windowCount) {
return;
} else {
int evictorCount = 0;
Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator =
elements.iterator();
while (iterator.hasNext()) {
iterator.next();
evictorCount++;
// 如果删除的数量小于当前的 window 大小减去规定的 window 的大小,就
需要删除当前的元素
if (evictorCount > size - windowCount) {
break;
} else {
iterator.remove();
}
}
}
}
/**
* 在 window 计算之后删除特定的数据
* @param elements window 中所有的元素
* @param size window 中所有元素的大小
* @param window window
* @param evictorContext 上下文
*/
@Override
public void evictAfter(Iterable<TimestampedValue<Tuple2<String,
Integer>>> elements,
4.3.4 window增量聚合
窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果
常用的聚合算子
int size, GlobalWindow window, EvictorContext
evictorContext) {
}
}
}
window增量聚合
窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果; 所以该算子是性能很高的, 只保留最终的状态。
常用的聚合算子
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
window全量聚合
等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。类似于map和RichMap的关系
window join
两个window之间可以进行join,join操作只支持三种类型的window:滚动窗口,滑动窗口,会话窗口
stream.join(otherStream) //两个流进行关联
.where(<KeySelector>) //选择第一个流的key作为关联字段
.equalTo(<KeySelector>)//选择第二个流的key作为关联字段
.window(<WindowAssigner>)//设置窗口的类型
.apply(<JoinFunction>) //对结果做操作
Tumbling Window Join
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
Sliding Window Join
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */,
Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
Session Window Join
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
Interval Join
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
out.collect(first + "," + second);
}
});