环境:
kafka 0.10
spark 2.1.0
zookeeper 3.4.5-cdh5.14.0
公司阿里云测试机,十月一放假前,没有在继续消费,假期过后回来再使用spark streaming消费某个消费组下的kafka时报错如下:
As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error: kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=) To fix it I added the “seek” setting: consumer.seek(0,2)
出现问题的原因:
kafka会定时清理日志
当我们的任务开始的时候,如果之前消费过某个topic,那么这个topic会在zk上设置offset,我们一般会去获取这个offset来继续从上次结束的地方继续消费,但是kafka定时清理日志的功能,比如定时一天一清理,那么如果你的offset是前天消费的offset,那么这个时候你再去消费,自然而然的你的offset肯定已经不在有效范围内,所以就报OffsetOutOfRangeException了
解决:
需要在发现zk_offset<earliest_offset>
时矫正zk_offset为合法值
前期完整代码
https://www.cnblogs.com/niutao/p/10547831.html
改正后的关键代码:
/**
* 获取最小offset
* Returns the earliest (lowest) available offsets, taking new partitions into account.
*
* @param kafkaParams kafka客户端配置
* @param topics 获取获取offset的topic
*/
def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
consumer.subscribe(topics)
val notOffsetTopicPartition = mutable.Set[TopicPartition]()
try {
consumer.poll(0)
} catch {
case ex: NoOffsetForPartitionException =>
log.warn(s"consumer topic partition offset not found:${ex.partition()}")
notOffsetTopicPartition.add(ex.partition())
}
val parts = consumer.assignment().toSet
consumer.pause(parts)
consumer.seekToBeginning(parts)
consumer.pause(parts)
val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
consumer.unsubscribe()
consumer.close()
offsets
}
/** * 获取最大offset * Returns the latest (highest) available offsets, taking new partitions into account. * * @param kafkaParams kafka客户端配置 * @param topics 需要获取offset的topic **/ def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val notOffsetTopicPartition = mutable.Set[TopicPartition]() try { consumer.poll(0) } catch { case ex: NoOffsetForPartitionException => log.warn(s"consumer topic partition offset not found:${ex.partition()}") notOffsetTopicPartition.add(ex.partition()) } val parts = consumer.assignment().toSet consumer.pause(parts) consumer.seekToEnd(parts) val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap consumer.unsubscribe() consumer.close() offsets }
val earliestOffsets = getEarliestOffsets(kafkaParams , topics) val latestOffsets = getLatestOffsets(kafkaParams , topics) for((k,v) <- topicPartOffsetMap.toMap){ val current = v val earliest = earliestOffsets.get(k).get val latest = latestOffsets.get(k).get if (current > latest || current < earliest) { log.warn("矫正offset: " + current +" -> "+ earliest); topicPartOffsetMap.put(k , earliest) } }
完整代码,拿去直接用就可以了
import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils} import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ import scala.reflect.ClassTag import scala.util.Try import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, NoOffsetForPartitionException} import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat import scala.collection.JavaConversions._ import scala.collection.mutable /** * Kafka的连接和Offset管理工具类 * * @param zkHosts Zookeeper地址 * @param kafkaParams Kafka启动参数 */ class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable { //Logback日志对象,使用slf4j框架 @transient private lazy val log = LoggerFactory.getLogger(getClass) //建立ZkUtils对象所需的参数 val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000) // zkClient.setZkSerializer(new MyZkSerializer()) //ZkUtils对象,用于访问Zookeeper val zkUtils = new ZkUtils(zkClient, zkConnection, false) /** * 获取最小offset * Returns the earliest (lowest) available offsets, taking new partitions into account. * * @param kafkaParams kafka客户端配置 * @param topics 获取获取offset的topic */ def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val notOffsetTopicPartition = mutable.Set[TopicPartition]() try { consumer.poll(0) } catch { case ex: NoOffsetForPartitionException => log.warn(s"consumer topic partition offset not found:${ex.partition()}") notOffsetTopicPartition.add(ex.partition()) } val parts = consumer.assignment().toSet consumer.pause(parts) consumer.seekToBeginning(parts) consumer.pause(parts) val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap consumer.unsubscribe() consumer.close() offsets } /** * 获取最大offset * Returns the latest (highest) available offsets, taking new partitions into account. * * @param kafkaParams kafka客户端配置 * @param topics 需要获取offset的topic **/ def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = { val newKafkaParams = mutable.Map[String, Object]() newKafkaParams ++= kafkaParams newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams) consumer.subscribe(topics) val notOffsetTopicPartition = mutable.Set[TopicPartition]() try { consumer.poll(0) } catch { case ex: NoOffsetForPartitionException => log.warn(s"consumer topic partition offset not found:${ex.partition()}") notOffsetTopicPartition.add(ex.partition()) } val parts = consumer.assignment().toSet consumer.pause(parts) consumer.seekToEnd(parts) val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap consumer.unsubscribe() consumer.close() offsets } /** * 获取消费者当前offset * * @param consumer 消费者 * @param partitions topic分区 * @return */ def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = { partitions.map(tp => tp -> consumer.position(tp)).toMap } /** * 从Zookeeper读取Kafka消息队列的Offset * * @param topics Kafka话题 * @param groupId Kafka Group ID * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0 */ def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/ partitionMap.foreach(topicPartitions => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) topicPartitions._2.foreach(partition => { val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition val tryGetKafkaOffset = Try { val offsetStatTuple = zkUtils.readData(offsetPath) if (offsetStatTuple != null) { log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*) topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) } } if(tryGetKafkaOffset.isFailure){ //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html val consumer = new KafkaConsumer[String, Object](kafkaParams) val partitionList = List(new TopicPartition(topicPartitions._1, partition)) consumer.assign(partitionList) val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head consumer.close() log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*) topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset) } }) }) //TODO 解决kafka中数据还没来得及消费,数据就已经丢失或者过期了######################### //Offsets out of range with no configured reset policy for partition //获取EarliestOffsets val earliestOffsets = getEarliestOffsets(kafkaParams , topics) val latestOffsets = getLatestOffsets(kafkaParams , topics) for((k,v) <- topicPartOffsetMap.toMap){ val current = v val earliest = earliestOffsets.get(k).get val latest = latestOffsets.get(k).get if (current > latest || current < earliest) { log.warn("矫正offset: " + current +" -> "+ earliest); topicPartOffsetMap.put(k , earliest) } } topicPartOffsetMap.toMap } //######################################################### /** * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流 * * @param ssc Spark Streaming Context * @param topics Kafka话题 * @tparam K Kafka消息Key类型 * @tparam V Kafka消息Value类型 * @return Kafka Streaming流 */ def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = { val groupId = kafkaParams("group.id").toString //TODO val storedOffsets: Map[TopicPartition, Long] = readOffsets(topics, groupId) // val storedOffsets: Map[TopicPartition, Long] = getCurrentOffset(kafkaParams , topics) log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2))) val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets)) kafkaStream } /** * 保存Kafka消息队列消费的Offset * * @param rdd SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V] * @param storeEndOffset true=保存结束offset, false=保存起始offset */ def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = { val groupId = kafkaParams("group.id").toString val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetsList.foreach(or => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic) val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/) log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*) }) } }