zoukankan      html  css  js  c++  java
  • Spark Streaming消费Kafka Direct方式数据零丢失实现

    使用场景

    Spark Streaming实时消费kafka数据的时候,程序停止或者Kafka节点挂掉会导致数据丢失,Spark Streaming也没有设置CheckPoint(据说比较鸡肋,虽然可以保存Direct方式的offset,但是可能会导致频繁写HDFS占用IO),所以每次出现问题的时候,重启程序,而程序的消费方式是Direct,所以在程序down掉的这段时间Kafka上的数据是消费不到的,虽然可以设置offset为smallest,但是会导致重复消费,重新overwrite hive上的数据,但是不允许重复消费的场景就不能这样做。

    原理阐述

    在Spark Streaming中消费 Kafka 数据的时候,有两种方式分别是 :

    1.基于 Receiver-based 的 createStream 方法。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。本文对此方式不研究,有兴趣的可以自己实现,个人不喜欢这个方式。KafkaUtils.createStream

    2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二种使用方式中  kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据,我使用的是这种方式。KafkaUtils.createDirectStream。本文将用代码说明如何将 kafka 中的 offset 保存到 zookeeper 中,以及如何从 zookeeper 中读取已存在的 offset。

    代码

    废话不说,直接贴代码。

    
    
      import kafka.common.TopicAndPartition
      import kafka.message.MessageAndMetadata
      import kafka.serializer.StringDecoder
      import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
      import org.I0Itec.zkclient.ZkClient
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      import org.apache.spark.streaming.dstream.InputDStream
      import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

       val conf: Conf = new config.Conf("test-util.conf") val zkHost = conf.getString("kafka.zookeeper.connect") val brokerList=conf.getString("kafka.metadata.broker.list") val zkClient = new ZkClient(zkHost) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerList, "zookeeper.connect" -> zkHost, "group.id" -> "testid") var kafkaStream: InputDStream[(String, String)] = null var offsetRanges = Array[OffsetRange]() val sc=SparkUtil.createSparkContext("test") val ssc=new StreamingContext(sc,Seconds(5)) val topic="TEST_TOPIC" val topicDirs = new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming_testid", topic) //创建一个 ZKGroupTopicDirs 对象,对保存 val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的) var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置 if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误 for (i <- 0 until children) { val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}") val tp = TopicAndPartition(topic, i) fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中 } val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("TEST_TOPIC")) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset } kafkaStream.transform{rdd=> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset rdd }.map(_._2).foreachRDD(rdd=>{ for (o <- offsetRanges) { val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //将该 partition 的 offset 保存到 zookeeper } rdd.foreach(s=>println(s)) }) ssc.start() ssc.awaitTermination()

    总结

    楼主实现了保存一个topic的offset到zk,但是如果Spark Streaming同时消费多个topic的方式及topicSet里有多个topic,楼主还没有想到解决办法,欢迎指正。

  • 相关阅读:
    Powershell数据处理
    Powershell About Active Directory Group Membership of a domain user
    Powershell About Active Directory Server
    Oracle Schema Objects——Tables——TableStorage
    Oracle Schema Objects——Tables——TableType
    English Grammar
    Oracle Database Documentation
    Oracle Schema Objects——Tables——Oracle Data Types
    Oracle Schema Objects——Tables——Overview of Tables
    What is Grammar?
  • 原文地址:https://www.cnblogs.com/ChouYarn/p/6235823.html
Copyright © 2011-2022 走看看