zoukankan      html  css  js  c++  java
  • Spark Structured Streaming 的 Continuous 模式

    Spark 的 Structured Streaming 流处理,默认用的是 micro-batch 模拟流,可以保证 exactly-once 语义,但最好只能是 100ms 级别的延迟

    为了满足更低延迟的业务需求,Spark Structured Streaming 推出了 Continuous 模式,在逻辑代码不变的情况下,只需要改变 Trigger 的配置,就可以非常方便的实现 Continuous 处理模式,可以支持 1ms 的低延迟,但只保证 at-least-once 语义


    传统的 micro-batch 代码如下

    df.writeStream 
      .format("console") 
      .trigger(processingTime='2 seconds') 
      .start()
    
    
    # 相当于 .trigger(processingTime='0 seconds')
    df.writeStream 
      .format("console") 
      .start()
    

    micro-batch 模式每隔一段时间到数据源取出所有可用数据,取出来的数据就是一个 micro-batch,然后每次都要启动 task 去处理 micro-batch,哪怕 tigger 的 processingTime 设置为 0,也只是一批数据处理完后不等待地去数据源继续取可用数据,但取到的可能还是一批数据,而且为了保证一致性语义,必须在处理一批数据后立刻记录数据源的偏移量,然后再取下一批

    由于每次都要启动 task,而且处理的都是一批数据,还要保存好偏移量再取下一批,这么一来,延时必然降不下来


    continuous 模式代码如下,可以看和 micro-batch 比只是改了 trigger 的配置

    df.writeStream
      .format("console")
      .trigger(continuous='1 second')
      .start()
    

    注意这里的 1 second 指的是每隔 1 秒记录保存一次状态,而不是说每隔 1 秒才处理数据

    continuous 不再是周期性启动 task,而是启动长期运行的 task,也不再是处理一批数据,而是不断地一个数据一个数据地处理,并且也不用每次都记录偏移,而是异步地,周期性的记录状态,这样就能实现低延迟


    Continuous 模式当前还有其他一些约束比如

    1. 只支持简单的 map,filter,select,where 等操作,join,agg 之类的还不支持
    2. SQL 不支持 aggregate,current_timestamp,current_date 等操作
    3. 只支持 Kafka Source 和 Rate Source



  • 相关阅读:
    English,The Da Vinci Code, Chapter 23
    python,meatobject
    English,The Da Vinci Code, Chapter 22
    English,The Da Vinci Code, Chapter 21
    English,The Da Vinci Code, Chapter 20
    English,The Da Vinci Code, Chapter 19
    python,xml,ELement Tree
    English,The Da Vinci Code, Chapter 18
    English,The Da Vinci Code, Chapter 17
    English,The Da Vinci Code, Chapter 16
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/14165171.html
Copyright © 2011-2022 走看看