Flink Uid设计剖析
首先我们先了解一下flink 是如何将应用状态映射到数据集的
一个flink job通常由一个或多个source operators、一些处理计算的operators、和一个或者多个sink oper组成。每个operators在一个或者多个task中并行运行,并且使用不同的类型的state。
如果operators应用于key steam,它可以有零个、一个或者多个“key state”,它的作用是从每一条记录中提取出它的key值,可以将它理解为处理了一个分布式的map。
下图显示了应用程序“ MyApp”,该应用程序由称为“ Src”,“ Proc”和“ Snk”的三个运算符组成。 Src具有一个操作员状态(os1),Proc具有一个操作员状态(os2)和两个键控状态(ks1,ks2),而Snk是无状态的。
MyApp的checkpoint 或者 savepoint都是由所有的状态数据组成,这些数据的结构可以让每个任务从checkpoint 或者savepoint恢复。在使用批处理作业保存恢复点的时候,其实际就是把每个任务的state映射到一个数据集或者一个表上,可以认为这个保存点其实就是一个数据库。每个operators(由其UID唯一标识)都代表了一个名称空间。operators的每个操作state都是有一个列映射到名称空间的专有表,改列包含所有任务的state数据。operators的所有的key states都会映射到单个表,该表的由一个k-v列组成。改列每一个key值对应一个列。如下图:
该图显示了Src的operators state值如何映射到一个表,该表具有一列五行,跨Src的所有并行任务的每个列表条目为一行。 运算符“ Proc”的运算符状态os2类似地映射到单个表。 键状态ks1和ks2被组合到具有三列的单个表中,一列用于键,一列用于ks1,一列用于ks2。 键控表为两个键控state的每个不同键保持一行。 由于运算符“ Snk”没有任何状态,因此其名称空间为空。
所以我们可以总结savepoint和database的关系如下:
- 一个savepoint是一个数据库
- operators是其uid命名的namespace
- 每一个operator state是一个单独的表
- operator state中每一个元素代表一行
- 每个keyed state表都有一个键列,该键列映射operator的键值
- 每个注册状态代表表中的单个列
- 表中的每一行都映射到一个键
为所有的operators去设置UID
如上所述,Flink将operators state映射到operators时,使用的是uid,这对于savepoint至关重要。默认情况下,uid是通过遍历JobGraph并hash特定operators属性来生成运算符uid。尽管对于使用者来说很方便,但是他也非常的脆弱,因为对于JobGraph的更改会导致新的UUID,为了建立稳定的映射时,我们必须setUid提供稳定的uid。
问题:
如果使用在FLink上层建立一层解析器,通过类似于SQL的简单的语法提供给未学习过的FLink的用户,这个时候导致解析器去映射成实际的FLink的uid并不能很稳定的去设置,会导致上个savepoint的operators即使部分相同,也会导致大部分的丢失,所以我们需要去了解UID怎么去设置的,并去做修改。
让我们看看flink是如何去设置UID的
可以通过运行这一段demo去debug发现:
StreamGraph streamGraph = env.getStreamGraph(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
如果用户有设置uid则设置为用户的uid
DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:188: User defined hash 'LogRiverEvent,d1494b6528fcd7578379234466c9feef' for node 'Source: LogRiverEvent-1' {id: 1, parallelism: 4, user function: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011}
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:185)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
没有uid则会自动生成一个uid
2020-04-15 15:37:07,248 DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:264: Generated hash '10c893aa189fa29399b8c379dbfeca05' for node 'ignore_null_event-3' {id: 3, parallelism: 4, user function: cn.yottabyte.pipe.flink.source.LogRiverSourceTest$1}
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateDeterministicHash(StreamGraphHasherV2.java:260)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:166)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
实际生成uid代码为:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
再去了解代码层面:
Hasher hasher = hashFunction.newHasher();
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
private byte[] generateDeterministicHash(
StreamNode node,
Hasher hasher,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled,
StreamGraph streamGraph) {
……
for (StreamEdge outEdge : node.getOutEdges()) {
generateNodeLocalHash(hasher, hashes.size());
}
……
byte[] hash = hasher.hash().asBytes();
……
for (StreamEdge inEdge : node.getInEdges()) {
byte[] otherHash = hashes.get(inEdge.getSourceId());
for (int j = 0; j < hash.length; j++) {
hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
}
}
……
}
最终发现最后生成的hashcode与当前的id,out个数,int的hash有关。
实际输出一个uid为例:
如图的flink DAG:
Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
解释:
- 第一个"#"后面的id是算hash用的id,也就是遍历时该Node的id。
- in edges表示前向依赖的Node,后面的#接这个node的hash值,再#后面是该node的name
- out edges表示后续的Node,后面的#接这个node的hash值,再#后面是该node的name
整个链路如下:
- Node:Source: LogRiverEvent-1, #0.
- Node:extractTimestamp-2, #1. in edges#bc764cd8ddf7a0cff126f51c16239658#Source: LogRiverEvent-1.
- Node:ignore_null_event-3, #2. in edges#0a448493b4782967b150582570326227#extractTimestamp-2.
#并不是算完一条再算另一条
-
Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
-
Node:Filter-11, #4. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
-
Node:Map-5, #5. in edges#6d2677a0ecc3fd8df0b72ec675edf8f4#Filter-4.
-
Node:Map-12, #6. in edges#5af0c26a4e78bae94addfaa227a406c1#Filter-11.
-
Node:Flat Map-6, #7. in edges#f66b9c09d172b1c19fff9288e5d53f49#Map-5.
-
Node:Flat Map-13, #8. in edges#227ab9be2ca77a534fd0e93a93e67e8b#Map-12.
-
Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8, #9. in edges#b15a823f9f5f129a46d82a43a93f3613#Flat Map-6.
-
Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15, #10. in edges#d6b993c9e4e18029649e604955b50858#Flat Map-13.
-
Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10, #11. in edges#ce95ba768fcf87e260ab71a244cbfdb9#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8.
-
Node:Map-16, #12. in edges#0ca94585fe097875e38fc49c1eb9af32#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15.
-
Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18, #13. in edges#0929bdfbc119b2e3d9f65c52d507beb4#Map-16.
-
Node:Sink: Unnamed-20, #14. in edges#9dc9fccc5cacedb2297bb9e1d19bbe5d#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10. in edges#8b25afa3fd1186f5175e9605d9e9cf94#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18.
了解到上面UID的设计情况以后,如果我们有需求进行更改,可以根据自己需求去在用户层面去设计UID,避免重复计算。