zoukankan      html  css  js  c++  java
  • 关于kafka定期清理日志后再消费报错kafka.common.OffsetOutOfRangeException的解决

     环境:

    kafka  0.10

    spark  2.1.0

    zookeeper  3.4.5-cdh5.14.0

    公司阿里云测试机,十月一放假前,没有在继续消费,假期过后回来再使用spark streaming消费某个消费组下的kafka时报错如下:

    As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error:
    
    kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=)
    To fix it I added the “seek” setting:
    
    consumer.seek(0,2)

    出现问题的原因:

    kafka会定时清理日志

    当我们的任务开始的时候,如果之前消费过某个topic,那么这个topic会在zk上设置offset,我们一般会去获取这个offset来继续从上次结束的地方继续消费,但是kafka定时清理日志的功能,比如定时一天一清理,那么如果你的offset是前天消费的offset,那么这个时候你再去消费,自然而然的你的offset肯定已经不在有效范围内,所以就报OffsetOutOfRangeException了

    解决:

    需要在发现zk_offset<earliest_offset>时矫正zk_offset为合法值

    前期完整代码

    https://www.cnblogs.com/niutao/p/10547831.html

    改正后的关键代码:

    /**
    * 获取最小offset
    * Returns the earliest (lowest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客户端配置
    * @param topics 获取获取offset的topic
    */
    def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
    consumer.poll(0)
    } catch {
    case ex: NoOffsetForPartitionException =>
    log.warn(s"consumer topic partition offset not found:${ex.partition()}")
    notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    consumer.seekToBeginning(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
    }
    /**
        * 获取最大offset
        * Returns the latest (highest) available offsets, taking new partitions into account.
        *
        * @param kafkaParams kafka客户端配置
        * @param topics      需要获取offset的topic
        **/
      def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val notOffsetTopicPartition = mutable.Set[TopicPartition]()
        try {
          consumer.poll(0)
        } catch {
          case ex: NoOffsetForPartitionException =>
            log.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
        val parts = consumer.assignment().toSet
        consumer.pause(parts)
        consumer.seekToEnd(parts)
        val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
        consumer.unsubscribe()
        consumer.close()
        offsets
      }
    val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
        val latestOffsets = getLatestOffsets(kafkaParams , topics)
        for((k,v) <- topicPartOffsetMap.toMap){
          val current = v
          val earliest = earliestOffsets.get(k).get
          val latest = latestOffsets.get(k).get
          if (current > latest || current < earliest) {
            log.warn("矫正offset: " + current +" -> "+ earliest);
            topicPartOffsetMap.put(k , earliest)
          }
        }

    完整代码,拿去直接用就可以了

    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
    import org.slf4j.LoggerFactory
    
    import scala.collection.JavaConversions._
    import scala.reflect.ClassTag
    import scala.util.Try
    import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, NoOffsetForPartitionException}
    import org.apache.kafka.common.TopicPartition
    import org.apache.zookeeper.data.Stat
    
    import scala.collection.JavaConversions._
    import scala.collection.mutable
    /**
      * Kafka的连接和Offset管理工具类
      *
      * @param zkHosts     Zookeeper地址
      * @param kafkaParams Kafka启动参数
      */
    class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
      //Logback日志对象,使用slf4j框架
      @transient private lazy val log = LoggerFactory.getLogger(getClass)
      //建立ZkUtils对象所需的参数
      val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
      //  zkClient.setZkSerializer(new MyZkSerializer())
      //ZkUtils对象,用于访问Zookeeper
      val zkUtils = new ZkUtils(zkClient, zkConnection, false)
    
    
      /**
        * 获取最小offset
        * Returns the earliest (lowest) available offsets, taking new partitions into account.
        *
        * @param kafkaParams kafka客户端配置
        * @param topics      获取获取offset的topic
        */
      def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val notOffsetTopicPartition = mutable.Set[TopicPartition]()
        try {
          consumer.poll(0)
        } catch {
          case ex: NoOffsetForPartitionException =>
            log.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
        val parts = consumer.assignment().toSet
        consumer.pause(parts)
        consumer.seekToBeginning(parts)
        consumer.pause(parts)
        val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
        consumer.unsubscribe()
        consumer.close()
        offsets
      }
    
      /**
        * 获取最大offset
        * Returns the latest (highest) available offsets, taking new partitions into account.
        *
        * @param kafkaParams kafka客户端配置
        * @param topics      需要获取offset的topic
        **/
      def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val notOffsetTopicPartition = mutable.Set[TopicPartition]()
        try {
          consumer.poll(0)
        } catch {
          case ex: NoOffsetForPartitionException =>
            log.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
        val parts = consumer.assignment().toSet
        consumer.pause(parts)
        consumer.seekToEnd(parts)
        val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
        consumer.unsubscribe()
        consumer.close()
        offsets
      }
    
      /**
        * 获取消费者当前offset
        *
        * @param consumer   消费者
        * @param partitions topic分区
        * @return
        */
      def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
        partitions.map(tp => tp -> consumer.position(tp)).toMap
      }
      /**
        * 从Zookeeper读取Kafka消息队列的Offset
        *
        * @param topics  Kafka话题
        * @param groupId Kafka Group ID
        * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
        */
      def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
        val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
        val partitionMap = zkUtils.getPartitionsForTopics(topics)
        // /consumers/<groupId>/offsets/<topic>/
        partitionMap.foreach(topicPartitions => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
          topicPartitions._2.foreach(partition => {
            val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
            val tryGetKafkaOffset = Try {
              val offsetStatTuple = zkUtils.readData(offsetPath)
              if (offsetStatTuple != null) {
                log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
                topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
              }
            }
            if(tryGetKafkaOffset.isFailure){
              //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
              val consumer = new KafkaConsumer[String, Object](kafkaParams)
              val partitionList = List(new TopicPartition(topicPartitions._1, partition))
              consumer.assign(partitionList)
              val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
              consumer.close()
              log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
              topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
            }
          })
        })
        //TODO  解决kafka中数据还没来得及消费,数据就已经丢失或者过期了#########################
        //Offsets out of range with no configured reset policy for partition
        //获取EarliestOffsets
        val earliestOffsets = getEarliestOffsets(kafkaParams , topics)
        val latestOffsets = getLatestOffsets(kafkaParams , topics)
        for((k,v) <- topicPartOffsetMap.toMap){
          val current = v
          val earliest = earliestOffsets.get(k).get
          val latest = latestOffsets.get(k).get
          if (current > latest || current < earliest) {
            log.warn("矫正offset: " + current +" -> "+ earliest);
            topicPartOffsetMap.put(k , earliest)
          }
        }
    
        topicPartOffsetMap.toMap
      }
    
      //#########################################################
      /**
        * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
        *
        * @param ssc    Spark Streaming Context
        * @param topics Kafka话题
        * @tparam K Kafka消息Key类型
        * @tparam V Kafka消息Value类型
        * @return Kafka Streaming流
        */
      def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
        val groupId = kafkaParams("group.id").toString
        //TODO
            val storedOffsets: Map[TopicPartition, Long] = readOffsets(topics, groupId)
    //    val storedOffsets: Map[TopicPartition, Long] = getCurrentOffset(kafkaParams , topics)
        log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
        val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
        kafkaStream
      }
    
      /**
        * 保存Kafka消息队列消费的Offset
        *
        * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
        * @param storeEndOffset true=保存结束offset, false=保存起始offset
        */
      def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
        val groupId = kafkaParams("group.id").toString
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetsList.foreach(or => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
          val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
          val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
          zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
          log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
        })
      }
    
    
    }
    kafka的offset管理代码
  • 相关阅读:
    对MVC模型的自悟,详尽解释,为了更多非计算机人员可以理解
    openSUSE leap 42.3 实现有线 无线同时用
    Fedora27 源配置
    Ubuntu16.04添加HP Laserjet Pro M128fn打印机和驱动
    openSUSE leap 42.3 添加HP Laserjet Pro M128fn打印机和驱动
    OpenSUSE Leap 42.3下通过Firefox Opera Chromium浏览器直接执行java应用程序(打开java jnlp文件)实现在服务器远程虚拟控制台完成远程管理的方法
    OpenSUSE Leap 42.3 安装java(Oracle jre)
    linux下支持托盘的邮件客户端Sylpheed
    Ubuntu下通过Firefox Opera Chromium浏览器直接执行java应用程序(打开java jnlp文件)实现在服务器远程虚拟控制台完成远程管理的方法
    Firefox 浏览器添加Linux jre插件
  • 原文地址:https://www.cnblogs.com/niutao/p/11654679.html
Copyright © 2011-2022 走看看