zoukankan      html  css  js  c++  java
  • Flink Uid设计剖析

    一个flink job通常由一个或多个source operators、一些处理计算的operators、和一个或者多个sink oper组成。每个operators在一个或者多个task中并行运行,并且使用不同的类型的state。

    如果operators应用于key steam,它可以有零个、一个或者多个“key state”,它的作用是从每一条记录中提取出它的key值,可以将它理解为处理了一个分布式的map。

    下图显示了应用程序“ MyApp”,该应用程序由称为“ Src”,“ Proc”和“ Snk”的三个运算符组成。 Src具有一个操作员状态(os1),Proc具有一个操作员状态(os2)和两个键控状态(ks1,ks2),而Snk是无状态的。

    Application: MyApp

    MyApp的checkpoint 或者 savepoint都是由所有的状态数据组成,这些数据的结构可以让每个任务从checkpoint 或者savepoint恢复。在使用批处理作业保存恢复点的时候,其实际就是把每个任务的state映射到一个数据集或者一个表上,可以认为这个保存点其实就是一个数据库。每个operators(由其UID唯一标识)都代表了一个名称空间。operators的每个操作state都是有一个列映射到名称空间的专有表,改列包含所有任务的state数据。operators的所有的key states都会映射到单个表,该表的由一个k-v列组成。改列每一个key值对应一个列。如下图:

    Database: MyApp

    该图显示了Src的operators state值如何映射到一个表,该表具有一列五行,跨Src的所有并行任务的每个列表条目为一行。 运算符“ Proc”的运算符状态os2类似地映射到单个表。 键状态ks1和ks2被组合到具有三列的单个表中,一列用于键,一列用于ks1,一列用于ks2。 键控表为两个键控state的每个不同键保持一行。 由于运算符“ Snk”没有任何状态,因此其名称空间为空。

    所以我们可以总结savepoint和database的关系如下:

    • 一个savepoint是一个数据库
    • operators是其uid命名的namespace
    • 每一个operator state是一个单独的表
    • operator state中每一个元素代表一行
    • 每个keyed state表都有一个键列,该键列映射operator的键值
    • 每个注册状态代表表中的单个列
    • 表中的每一行都映射到一个键

    为所有的operators去设置UID

    如上所述,Flink将operators state映射到operators时,使用的是uid,这对于savepoint至关重要。默认情况下,uid是通过遍历JobGraph并hash特定operators属性来生成运算符uid。尽管对于使用者来说很方便,但是他也非常的脆弱,因为对于JobGraph的更改会导致新的UUID,为了建立稳定的映射时,我们必须setUid提供稳定的uid。

    问题:

    如果使用在FLink上层建立一层解析器,通过类似于SQL的简单的语法提供给未学习过的FLink的用户,这个时候导致解析器去映射成实际的FLink的uid并不能很稳定的去设置,会导致上个savepoint的operators即使部分相同,也会导致大部分的丢失,所以我们需要去了解UID怎么去设置的,并去做修改。


    让我们看看flink是如何去设置UID的

    可以通过运行这一段demo去debug发现:

    StreamGraph streamGraph = env.getStreamGraph(jobName);
    JobGraph jobGraph = streamGraph.getJobGraph();
    

    如果用户有设置uid则设置为用户的uid

    DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:188: User defined hash 'LogRiverEvent,d1494b6528fcd7578379234466c9feef' for node 'Source: LogRiverEvent-1' {id: 1, parallelism: 4, user function: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011}
      java.lang.Thread.getStackTrace(Thread.java:1559)
      org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:185)
      org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
      org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
      org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
      org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
      org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
      cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
      
      
      sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      java.lang.reflect.Method.invoke(Method.java:498)
      org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
      org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
      org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
      org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
      org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
      org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
      org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
      org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
      org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
      org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
    

    没有uid则会自动生成一个uid

    2020-04-15 15:37:07,248 DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:264: Generated hash '10c893aa189fa29399b8c379dbfeca05' for node 'ignore_null_event-3' {id: 3, parallelism: 4, user function: cn.yottabyte.pipe.flink.source.LogRiverSourceTest$1}
      java.lang.Thread.getStackTrace(Thread.java:1559)
      org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateDeterministicHash(StreamGraphHasherV2.java:260)
      org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:166)
      org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
      org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
      org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
      org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
      org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
      
      
      cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
      sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      java.lang.reflect.Method.invoke(Method.java:498)
      org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
      org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
      org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
      org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
      org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
      org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
      org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
      org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
      org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
      org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
    

    实际生成uid代码为:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java

    再去了解代码层面:

    Hasher hasher = hashFunction.newHasher();
    			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
    
    private byte[] generateDeterministicHash(
    			StreamNode node,
    			Hasher hasher,
    			Map<Integer, byte[]> hashes,
    			boolean isChainingEnabled,
    			StreamGraph streamGraph) {
    		……
    		for (StreamEdge outEdge : node.getOutEdges()) {
    				generateNodeLocalHash(hasher, hashes.size());
    		}
    		……
    		byte[] hash = hasher.hash().asBytes();
    		……
    		for (StreamEdge inEdge : node.getInEdges()) {
    			byte[] otherHash = hashes.get(inEdge.getSourceId());
    			for (int j = 0; j < hash.length; j++) {
    				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
    			}
    		}
    		……
    }
    

    最终发现最后生成的hashcode与当前的id,out个数,int的hash有关。

    实际输出一个uid为例:

    如图的flink DAG:

    img

    Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
    

    解释:

    • 第一个"#"后面的id是算hash用的id,也就是遍历时该Node的id。
    • in edges表示前向依赖的Node,后面的#接这个node的hash值,再#后面是该node的name
    • out edges表示后续的Node,后面的#接这个node的hash值,再#后面是该node的name

    整个链路如下:

    • Node:Source: LogRiverEvent-1, #0.
    • Node:extractTimestamp-2, #1. in edges#bc764cd8ddf7a0cff126f51c16239658#Source: LogRiverEvent-1.
    • Node:ignore_null_event-3, #2. in edges#0a448493b4782967b150582570326227#extractTimestamp-2.

    #并不是算完一条再算另一条

    • Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.

    • Node:Filter-11, #4. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.

    • Node:Map-5, #5. in edges#6d2677a0ecc3fd8df0b72ec675edf8f4#Filter-4.

    • Node:Map-12, #6. in edges#5af0c26a4e78bae94addfaa227a406c1#Filter-11.

    • Node:Flat Map-6, #7. in edges#f66b9c09d172b1c19fff9288e5d53f49#Map-5.

    • Node:Flat Map-13, #8. in edges#227ab9be2ca77a534fd0e93a93e67e8b#Map-12.

    • Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8, #9. in edges#b15a823f9f5f129a46d82a43a93f3613#Flat Map-6.

    • Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15, #10. in edges#d6b993c9e4e18029649e604955b50858#Flat Map-13.

    • Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10, #11. in edges#ce95ba768fcf87e260ab71a244cbfdb9#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8.

    • Node:Map-16, #12. in edges#0ca94585fe097875e38fc49c1eb9af32#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15.

    • Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18, #13. in edges#0929bdfbc119b2e3d9f65c52d507beb4#Map-16.

    • Node:Sink: Unnamed-20, #14. in edges#9dc9fccc5cacedb2297bb9e1d19bbe5d#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10. in edges#8b25afa3fd1186f5175e9605d9e9cf94#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18.

    了解到上面UID的设计情况以后,如果我们有需求进行更改,可以根据自己需求去在用户层面去设计UID,避免重复计算。

  • 相关阅读:
    如何最大限度提高.NET的性能
    Webserivce简单安全验证
    一些NLP相关的JD,作参考
    拼多多的故事
    storm的一些相关文章
    这篇文章不错,仔细读读,码农晋升为技术管理者后,痛并快乐着的纠结内心
    protobuf的反射机制
    如何清理Docker占用的磁盘空间?
    经典面试题:浏览器是怎样解析CSS的?
    代码编辑器横评:为什么 VS Code 能拔得头筹
  • 原文地址:https://www.cnblogs.com/yankang/p/13859820.html
Copyright © 2011-2022 走看看