zoukankan      html  css  js  c++  java
  • Spark Structured Streaming 的 Continuous 模式

    Spark 的 Structured Streaming 流处理,默认用的是 micro-batch 模拟流,可以保证 exactly-once 语义,但最好只能是 100ms 级别的延迟

    为了满足更低延迟的业务需求,Spark Structured Streaming 推出了 Continuous 模式,在逻辑代码不变的情况下,只需要改变 Trigger 的配置,就可以非常方便的实现 Continuous 处理模式,可以支持 1ms 的低延迟,但只保证 at-least-once 语义


    传统的 micro-batch 代码如下

    df.writeStream 
      .format("console") 
      .trigger(processingTime='2 seconds') 
      .start()
    
    
    # 相当于 .trigger(processingTime='0 seconds')
    df.writeStream 
      .format("console") 
      .start()
    

    micro-batch 模式每隔一段时间到数据源取出所有可用数据,取出来的数据就是一个 micro-batch,然后每次都要启动 task 去处理 micro-batch,哪怕 tigger 的 processingTime 设置为 0,也只是一批数据处理完后不等待地去数据源继续取可用数据,但取到的可能还是一批数据,而且为了保证一致性语义,必须在处理一批数据后立刻记录数据源的偏移量,然后再取下一批

    由于每次都要启动 task,而且处理的都是一批数据,还要保存好偏移量再取下一批,这么一来,延时必然降不下来


    continuous 模式代码如下,可以看和 micro-batch 比只是改了 trigger 的配置

    df.writeStream
      .format("console")
      .trigger(continuous='1 second')
      .start()
    

    注意这里的 1 second 指的是每隔 1 秒记录保存一次状态,而不是说每隔 1 秒才处理数据

    continuous 不再是周期性启动 task,而是启动长期运行的 task,也不再是处理一批数据,而是不断地一个数据一个数据地处理,并且也不用每次都记录偏移,而是异步地,周期性的记录状态,这样就能实现低延迟


    Continuous 模式当前还有其他一些约束比如

    1. 只支持简单的 map,filter,select,where 等操作,join,agg 之类的还不支持
    2. SQL 不支持 aggregate,current_timestamp,current_date 等操作
    3. 只支持 Kafka Source 和 Rate Source



  • 相关阅读:
    less中遇到的一些特殊的写法
    观察者模式
    发布订阅模式
    javascript中的函数
    关于js中this的理解和作用
    [Oracle]Oracle镜像安装及使用教程
    log4net日志配置
    过滤器
    .net core 处理 中文乱码
    Fromform
  • 原文地址:https://www.cnblogs.com/moonlight-lin/p/14165171.html
Copyright © 2011-2022 走看看