zoukankan      html  css  js  c++  java
  • Kafka+Spark Streaming保证exactly once语义

    在Kafka、Flink、Spark Streaming等分布式流处理系统中(Kafka本质上市流处理系统,不单是MQ),存在三种消息传递语义(Message Delivery Semantics):

    • At Least Once

    每条消息会被收到1次或多次。例如发送方S在超时时间内没有收到接收方R的通知,或者收到了R的报错,就会不断重发消息直至R传回ACK

    • At Most Once

    每条消息会被收到0次或1次。S只负责向R发送消息,R也没有任何通知机制。无论R最终是否收到,S都不会重发

    • Exactly Once

    是上面两个的综合,保证S发送的每一条消息,R都会“不重不漏”地恰好一次收到

    一个Spark Streaming程序由三步组成:输入、处理逻辑、输出

    要达到Exactly Once的理想状态,需要三步协同进行,而不是只与处理逻辑有关

    Kafka与Spark Streaming集成时有两种方法:

    • 基于receiver
    • 基于direct

    1、基于receiver

    • 基于receiver的采用kafka高级消费者API
    • 每个executor进程都会不断拉取消息,并同时保存在executor内存与HDFS上的预写日志(Write-Ahead log,WAL)
    • 当消息写入WAL后,自动更新ZK中的offset

    它可以保证At Least Once语义,但无法保证Exactly Once语义。虽然引入了WAL来保证消息不会丢失,但还有可能会出现消息已经写入WAL,但offset更新失败的情况,Kafka就会按上一次的offset重新发送消息。

    这种方式还会造成数据冗余(Kafka broker中一份数据,spark executor中一分),是吞吐量和内存利用率降低

    2、基于direct

    • 基于direct的方法采用Kafka的简单消费者API,它的流程大大简化。
    • executor不再从Kafka中连续读取消息,也消除了receiver和WAL。
    • Kafka分区与RDD分区一一对应,更可控
    • driver线程只需要每次从Kafka获得批次消息的offset range
    • 然后executor进程根据offset range去读取该批次对应的消息
    • 由于offset在kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了Exactly Once

    不过由于它采用了简单消费者API,我们需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复。

    Spark RDD之所以被称为“弹性分布式数据集”,是因为它具有不可变、可分区、可并行计算、容错的特征。一个RDD只能由稳定的数据集生成,或者从其他RDD转换(transform)得来。如果在执行RDD lineage的过程中失败,那么只要元数据不发生变化,无论重新执行多少次lineage,都会得到同样的、确定的结果

    Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中出错,那么就会重复执行知道写入成功。为了让它符合Exactly once,可以施加两种限制之一:幂等性(idempotent write)事务性写入(transactional write)

    2.1 幂等性写入

    幂等性原来是数学里的概念,即f(f(x))=f(x)。

    幂等写入就是写入多次与写入一次的结果完全相同,可以自动将at least once转化为Exactly once。这对于自带主键或主键组的业务比较合适(如:各类日志、MySQL binlog),并且实现起来简单

    但是它要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等操作。如果条件更严格,就只能采用事务性写入方法

    stream.foreachRDD { rdd =>
          rdd.foreachPartition { iter =>
            // make sure connection pool is set up on the executor before writing
            SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
    
            iter.foreach { case (key, msg) =>
              DB.autoCommit { implicit session =>
                // the unique key for idempotency is just the text of the message itself, for example purposes
                sql"insert into idem_data(msg) values (${msg})".update.apply
              }
            }
          }
        }

    2.2 事务性写入

    这里的事务和DBMS中的事务含义基本相同,就是对数据进行一系列访问与更新操作所组成的逻辑块。为了符合事务性的ACID特性,必须引入一个唯一ID标识当前的处理逻辑,并且将计算结果与该ID一起落盘。

    ID可以由主题、分区、时间、offset等共同组成

    事务操作可以在foreachRDD()时进行。如果数据吸入失败,或者offset吸入与当前offset range不匹配,那么这一批次数据都将失败并且回滚

    // localTx is transactional, if metric update or offset update fails, neither will be committed
        DB.localTx { implicit session =>
          // store metric data
          val metricRows = sql"""
        update txn_data set metric = metric + ${metric}
          where topic = ${osr.topic}
        """.update.apply()
          if (metricRows != 1) {
            throw new Exception("...")
          }
     
          // store offsets
          val offsetRows = sql"""
        update txn_offsets set off = ${osr.untilOffset}
          where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
        """.update.apply()
          if (offsetRows != 1) {
            throw new Exception("...")
          }
        }
  • 相关阅读:
    Webix快速跨浏览器的JavaScript UI组件
    [转]UltraISO制作U盘启动盘安装Win7/9/10系统攻略
    JavaScript中setTimeout()和setInterval()的区别
    AngularJS中文介绍
    Android Studio参考在线文章
    Android原型界面设计工具
    B-JUI(Best jQuery UI) 前端框架
    Linux Ubuntu download
    Jquery之家5个顶级Material Design框架
    bootstrap绿色大气后台模板下载[转]
  • 原文地址:https://www.cnblogs.com/hyunbar/p/13132522.html
Copyright © 2011-2022 走看看