zoukankan      html  css  js  c++  java
  • 160728、Spark Streaming kafka 实现数据零丢失的几种方式

    定义

     问题开始之前先解释下流处理中的一些概念:

    • At most once - 每条数据最多被处理一次(0次或1次)

    • At least once - 每条数据最少被处理一次 (1次或更多)

    • Exactly once - 每条数据只会被处理一次(没有数据会丢失,并且没有数据会被多次处理)

    High Level API

     

    如果不做容错,将会带来数据丢失
    因为receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),executor突然挂掉(或是driver挂掉通知executor关闭),缓存在其中的数据就会丢失。


    因为这个问题,Spark1.2开始加入了WAL(Write ahead log)
    开启 WAL,将receiver获取数据的存储级别修改为StorageLevel.MEMORY_AND_DISK_SER

    val conf = new SparkConf()
    conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
    val sc= new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("walDir")
    val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    开启WAL后,依旧存在数据丢失问题
    即使按官方说的设置了WAL,依旧会有数据丢失,这是为什么?因为在任务中断时receiver也被强行终止了,将会造成数据丢失,提示如下:

    ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
    WARN BlockGenerator: Cannot stop BlockGenerator as its not in the Active state [state = StoppedAll]
    WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.

    在Streaming程序的最后添加代码,只有在确认所有receiver都关闭的情况下才终止程序。

    sys.addShutdownHook({
      ssc.stop(true,true)})

    调用的方法为:

    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit

    WAL带来的问题

    WAL实现的是At-least-once语义。
    如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费。同时,降低了程序的吞吐量。

    Kafka Direct API

    Kafka direct API 的运行方式,将不再使用receiver来读取数据,也不用使用WAL机制。


    同时保证了exactly-once语义,不会在WAL中消费重复数据。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍。
    例如如下的调用方式:

    messages.foreachRDD(rdd=>{   val message = rdd.map(_._2)  //对数据进行一些操作
       message.map(method)//更新zk上的offset (自己实现)
       updateZKOffsets(rdd)
    })
  • 相关阅读:
    java面向对象4-多态
    机器学习降维--SVD奇异值分解
    hive中的null
    熵(二)-交叉熵与相对熵
    指数家族-Beta分布
    指数族函数
    java面向对象3-继承(继承、抽象类、抽象接口)
    网页自动刷新
    spring +hibernate 启动优化【转】
    svn is already locked解决方案
  • 原文地址:https://www.cnblogs.com/zrbfree/p/5714351.html
Copyright © 2011-2022 走看看