zoukankan      html  css  js  c++  java
  • Flink Application Development DataStream API Execution Mode (Batch/Streaming)- Flink应用程序开发DataStream API执行模式(批/流)


    翻译出处 DataStream API Execution Mode (Batch/Streaming)
    DataStream API支持不同的运行时执行模式,您可以根据用例的要求和job特征从中选择运行模式。

    DataStream API有“经典”执行行为,我们称之为 STREAMING执行模式。这应用于需要连续增量处理且无限期保持在线状态的无限制jobs。

    另外,有一个批处理式执行模式,我们称为BATCH 执行模式。这以一种类似于批处理框架(如MapReduce)的方式执行作业。这应用于您具有已知固定输入且不会连续运行的有边界作业。

    Apache Flink的流和批处理的统一方法意味着,无论配置的执行模式如何,在有界输入上执行的DataStream应用程序都将产生相同的 final结果。重要的是要注意final在这里意味着什么:以STREAMING模式执行的作业可能会产生增量更新(请想象数据库中的upserts),而BATCH作业最后只会产生一个最终结果。如果正确解释,最终结果将是相同的,但获得结果的方式可能会有所不同。

    通过启用BATCH执行,我们允许Flink应用其他优化,只有当我们知道输入是有限的时,我们才能做这些优化。例如,除了允许更多有效的任务调度和故障恢复行为的不同的shuffle实现之外,还可以使用不同的join/aggregation 策略。我们将在下面介绍执行行为的一些细节。

    什么时候可以/应该使用BATCH执行模式?

    该BATCH执行模式只能被用于那些有边界的 Jobs/Flink 程序。有界性是data source的一个属性,它告诉我们执行之前是否知道来自该源的所有输入,或者是否新数据可能无限期地显示。反过来,如果作业的所有源均有界,则该作业是有​​界的,否则为无界。

    另一方面,STREAMING执行模式可用于有界作业和无界作业。

    根据经验,程序有界时应使用BATCH执行模式,因为这样做会更有效。当程序不受限制时,您必须使用STREAMING 执行模式,因为只有这种模式足够通用,才能处理连续的数据流。

    一个明显的异常情况是当您想使用有边界job来启动某个作业状态,然后在一个无边界作业中使用该状态。例如,通过使用STREAMING模式运行有边界job,获取保存点,然后在无界作业上还原该保存点。这是一个非常特殊的用例,当我们允许将保存点作为BATCH执行作业的附加输出时,该用例可能很快就会过时。

    另一种可能使用STREAMING模式运行有边界job的情况是,编写针对最终将以无边界sources运行的代码测试。在这种情况下,为了进行测试,使用有界的sources会更加自然。

    配置BATCH执行模式

    可以通过execution.runtime-mode设置来配置执行模式。有三个可能的值:

    • STREAMING:经典的DataStream执行模式(默认)
    • BATCH:在DataStream API上以批处理方式执行
    • AUTOMATIC:让系统根据源的有界性来决定

    可以通过的命令行参数进行配置bin/flink run ...,也可以在创建/配置时以编程方式进行配置StreamExecutionEnvironment。

    通过命令行配置执行模式的方法如下:

    $ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
    

    此示例说明如何在代码中配置执行模式:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    

    注意: 建议用户不要在程序中设置运行时模式,而应在提交应用程序时使用命令行来设置运行时模式。保持应用程序代码的自由配置可提供更大的灵活性,因为可以在任何执行模式下执行同一应用程序。

    执行行为

    本节概述了BATCH 执行模式的执行行为,并将其与STREAMING执行模式进行了对比。有关更多详细信息,请参阅引入此功能的FLIP: FLIP-134FLIP-140

    任务调度和网络随机shuffle

    Flink作业由在数据流图中连接在一起的不同operations组成。系统决定如在不同进程/机器(TaskManagers)上如何调度这些操作符的执行,以及如何在它们之进行数据混洗(发送)。

    可以使用链接(chaining)功能将多个operations/operators链接在一起 。被Flink其视为一个调度单位的一组或多个(链式的)运算符称为一个任务task。通常,术语“子任务subtask”用于指代在多个TaskManager上并行运行任务的各个实例,但是我们在此仅使用术语“任务task”。

    在BATCH与 STREAMING执行模式上任务调度和网络shuffle 的运行方式不同。主要是因为我们知道输入数据在BATCH执行模式下有界,这使得Flink可以使用更有效的数据结构和算法。

    我们将使用此示例来说明二者在任务调度和网络传输中的差异:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStreamSource<String> source = env.fromElements(...);
    
    source.name("source")
    	.map(...).name("map1")
    	.map(...).name("map2")
    	.rebalance()
    	.map(...).name("map3")
    	.map(...).name("map4")
    	.keyBy((value) -> value)
    	.map(...).name("map5")
    	.map(...).name("map6")
    	.sinkTo(...).name("sink");
    

    1对1连接模式的operations,例如 map(),flatMap()或filter()只可以直接转发数据到下一个operation,这允许这些operations被链接在一起。这意味着通常情况下Flink不会在它们之间插入网络随机播放。

    另一方面,诸如keyBy()或rebalance()之类的操作要求在不同并行任务实例之间对数据进行shuffle。这会引起网络shuffle。

    对于上面的示例,Flink将会分组operations为以下任务:

    • 任务1: source,map1和map2
    • 任务2 :map3、map4
    • 任务3: map5,map6和sink
      在任务1和2之间以及任务2和3之间我们有网络shuffle。这是该工作的直观表示:

    流执行模式

    在STREAMING执行模式下,所有任务都必须一直在线/正在运行。这使Flink可以立即通过整个pipeline处理新记录,这是我们进行连续和低延迟流处理所需要的。这也意味着分配给job的TaskManager需要同时具有足够的资源来运行所有任务。

    网络shuffle是pipelined,这意味着记录会立即发送到下游任务,并在网络层上进行一些缓冲。同样,这是必需的,因为在处理连续的数据流时,没有自然点(在时间方面)可以在任务(或任务pipeline)之间落地(原文materialized)数据。这与BATCH执行模式相反,在执行模式中,可以落地中间结果,如下所述。

    批处理执行模式

    在BATCH执行模式下,一个作业的任务可以分为几个阶段,一个接一个地执行。我们这样做是因为输入是有界的,因此Flink可以在继续进行下一阶段(原文stage)之前完全处理pipeline的一个阶段。在上面的示例中,该作业将具有三个阶段,分别对应于由shuffle barriers分隔的三个任务。

    就像上面STREAMING模式中解释的那样,分阶段处理需要Flink将任务的中间结果落地到一些非临时存储中,而不是立即将记录发送给下游任务,该存储允许下游任务在上游任务已经脱机后读取它们。这将增加处理的等待时间,但具带来了其他有趣的属性。首先,这使Flink可以在发生故障时回溯到最新的可用结果,而不是重新启动整个作业。另一个副作用是,BATCH作业可以在较少的资源(就TaskManager上的可用的slots而言)上执行,因为系统可以依次执行任务。

    只要下游任务没有消耗中间结果,TaskManager就会保留它们。(从技术上讲,它们将一直保留到消费的pipelined regions产生它们的输出为止。)之后,它们将被保留尽可能长的时间,以便在发生故障时允许上面提到的回溯到较早的中间结果。

    状态后端/状态

    在STREAMING模式下,Flink使用StateBackend来控制状态的存储方式以及检查点的工作方式。

    在BATCH模式下,已配置的状态后端将被忽略。相反,keyed operation 的输入按键(使用排序)分组,然后我们依次处理一个键的所有记录。这允许同时仅保留仅仅一个键的状态。当转到下一个键时,给定键的状态将被丢弃。

    有关此背景信息,请参见FLIP-140

    处理顺序

    记录在operators或用户定义的函数(UDF)中的处理顺序在BATCH和STREAMING 执行中有所不同。

    在STREAMING模式下,用户定义的函数不应对传入记录的顺序进行任何假设。数据一到达就被处理。

    在BATCH执行模式下,有些操作Flink保证顺序。排序可能是特定任务计划,网络随机播放和状态后端(见上文)的副作用,也可能是系统有意识的选择。

    我们可以区分三种通用的输入类型:

    • 广播输入:来自广播流的输入(另请参见广播状态
    • 常规输入:既不广播也不输入keyed输入
    • keyed输入:来自Keyed Stream

    消耗多种输入类型的Functions或Operators将按以下顺序处理它们:

    • 首先处理广播输入
    • 常规输入将其次被处理
    • keyed 输入最后处理
      对于从多个常规或广播输入消费的functions--例如CoProcessFunction--Flink有权以任何顺序处理该任何类型输入的数据。

    对于从多个keyed输入消费的functions--例如KeyedCoProcessFunction--Flink在继续下一个key之前,会处理单个键所有输入记录。

    Event Time/水印(原文watermark)

    在支持event time方面,Flink的流运行时基于悲观的假设,即事件可能乱序,即带有时间戳 t 的事件可能在带有timestamp t+1 的事件之后发生。因此,系统永远无法确保对于一个时间戳T的元素将来不会再有时间戳t < T的元素出现。为了在使系统实用的同时分摊这种无序对最终结果的影响,Flink在STREAMING模式下使用了一种称为水印的启发式方法。带时间戳T的水印表示其后没有带时间戳t < T的元素。

    在BATCH模式中,事先知道输入数据集的情况下,无需进行启发式操作,因为至少可以按时间戳对元素进行排序,以便按时间顺序对其进行处理。对于熟悉流的读者而言,BATCH我们可以假设“完美的水印”。

    鉴于以上情况,在BATCH模式下,我们在与每个键相关的输入末尾只需要一个MAX_WATERMARK,或者如果输入流不是keyd时则在输入末尾 。基于此方案,所有已注册的计时器将在时间结束时触发并且用户定义的WatermarkAssigners或WatermarkGenerators被忽略。指定一个WatermarkStrategy仍然很重要,因为它的TimestampAssigner仍将用于为记录指定时间戳。

    处理时间

    处理时间是处理记录的计算机上的挂钟时间,在特定的情况下该记录正在处理。基于此定义,我们看到基于处理时间的计算结果不可重现。这是因为处理两次的同一条记录将具有两个不同的时间戳。

    尽管如此,在STREAMING模式下使用处理时间还是有用的。原因与以下事实有关:流传输管道通常实时地摄取其无限制的输入,因此事件时间与处理时间之间存在相关性。此外,由于上述原因,在事件STREAMING模式1h下的时间通常可能几乎1h在处理时间或挂钟时间中。因此,可以使用处理时间来进行早期(不完整)点火,从而提供有关预期结果的提示。

    在输入数据为静态且事先已知的批处理环境中,不存在这种关联。鉴于此,在BATCH模式下,我们允许用户请求当前处理时间并注册处理时间计时器,但是,与事件时间一样,所有计时器都将在输入结束时触发。

    从概念上讲,我们可以想象在执行作业期间处理时间不会增加,而在处理完所有输入后我们会快进到时间的尽头。

    故障恢复

    在STREAMING执行模式下,Flink使用检查点进行故障恢复。请查看检查点文档,以获取有关的上手文档以及配置方法。关于状态快照的容错,还有一个更入门的内容,它在更高层次上解释了这些概念。

    故障恢复检查点的特点之一是,如果发生故障,Flink将从检查点重新启动所有正在运行的任务。这可能比我们在BATCH模式(如下所述)下必须执行的操作要昂贵得多,这是如果在您的作业允许的情况下应该使用BATCH执行模式的原因之一。

    在BATCH执行模式下,Flink将尝试回溯到以前的处理阶段,在回溯之前,中间结果仍然可用。潜在地,只有失败的任务(或其图中的祖先任务)才必须重新启动,与从检查点重新启动所有任务相比,这可以提高处理效率和作业的总体处理时间。

    重要注意事项

    与经典的STREAMING执行模式相比,在BATCH模式下,某些事情可能无法按预期工作。有些功能会稍有不同,而其他功能则不受支持。

    批处理模式下的行为改变:

    • 诸如 reduce()或sum()之类的“滚动”操作会为 以STREAMING 模式到达的每个新记录发出增量更新。在BATCH模式下,这些操作不是“滚动”的。它们仅发出最终结果。

    在批处理模式下不受支持的:

    自定义操作符应谨慎实现,否则可能会出现不当行为。有关更多详细信息,请参见下面的其他说明。

    检查点

    正如上面对检查点所解释的,批处理程序故障恢复不使用检查点。

    重要的是,要记住因为没有检查站,某些功能如CheckpointListener,并且因此,Kafka 的EXACTLY_ONCE模式或StreamingFileSink的 OnCheckpointRollingPolicy将无法正常工作。如果您需要在工作于BATCH模式下的事务接收器,请 确保它使用了FLIP-143中建议的Unified Sink API 。

    您仍然可以使用所有状态原语,只是用于故障恢复的机制会有所不同。

    编写自定义运算符

    注意: 自定义运算符是Apache Flink的高级用法模式。对于大多数用例,请考虑改用(keyed)处理函数。

    在编写自定义运算符时,请记住对BATCH执行模式所做的假设,这一点很重要。否则,在BATCH模式下正常工作的操作符可能会在STREAMING模式下产生错误的结果。操作符永远不会局限于特定的key,这意味着操作符可以看到Flink试图利用的BATCH处理的某些属性 。

    首先,您不应该在操作符中缓存最后看到的水印。在BATCH模式下,我们逐项处理记录。结果,水印将在每个key之间从切换MAX_VALUE到MIN_VALUE。您不应该假定水印在操作符中将一直递增。出于相同的原因,计时器将首先在每个key中按键顺序触发,然后按时间戳顺序触发。此外,不支持手动更改键的操作。

  • 相关阅读:
    视图集
    子类视图
    Mixin扩展类
    GenericAPIView
    APIView
    ModelSerializer使用
    序列化和反序列化
    合并购物车
    pyplot基本绘制
    STL sort “invalid operator <”
  • 原文地址:https://www.cnblogs.com/qlxm/p/14453539.html
Copyright © 2011-2022 走看看