zoukankan      html  css  js  c++  java
  • kafka-spark偏移量提交至redis kafka1.0版本

    kafka版本 1.0.0

    spark版本 spark-streaming-kafka-0-10_2.11
    /**
    * @created by imp ON 2019/12/21
    */
    class KafkaManagerByRedis(zkHost:String,kafkaParams: Map[String, Object]) extends Logging {

    private val (zkClient,zkConnection) = ZkUtils.createZkClientAndConnection(zkHost , 10000 , 10000)
    private val zkUtils = new ZkUtils(zkClient,zkConnection , false)
    private val jedis = JedisUtil.getInstance().getJedis

    /**
    * def createDirectStream:InputDStream
    **/

    def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    //1:readOffset
    val groupId = kafkaParams("group.id").toString
    val topic = topics(0)
    val topicPartition: Map[TopicPartition, Long] = readOffset(topic, groupId)
    KafkaUtils.createDirectStream[K, V](
    ssc,
    PreferConsistent,
    Subscribe[K, V](topics, kafkaParams, topicPartition)
    )
    }

    /**
    * 读取偏移量
    *
    * @param topics
    * @param groupId 消费组
    * @return Map[car-1 , car-2 , Long]
    **/

    private def readOffset(topic: String, groupId: String): Map[TopicPartition, Long] = {
    val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    //去zk上拿topic和分区信息
    val topicAndPartitionMaps: mutable.Map[String, Seq[Int]] = zkUtils.getPartitionsForTopics(Seq(topic))
    val groupId = kafkaParams("group.id").toString
    val redisKey = topic + "|" + groupId
    topicAndPartitionMaps.foreach(topicPartitions =>{
    val zkGroupTopicsDirs: ZKGroupTopicDirs = new ZKGroupTopicDirs(groupId , topicPartitions._1)
    topicPartitions._2.foreach(partition => {
    //迭代分区
    val map: util.Map[String, String] = jedis.hgetAll(redisKey)
    val offsetMap: mutable.Map[String, String] = mapAsScalaMap(map)
    if (offsetMap != null && offsetMap.size != 0) {
    logger.error("groupId:"+groupId+"获取到redis的偏移量数据")
    topicPartitionMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetMap(partition.toString).toLong)
    }
    else {
    logger.error("程序第一次启动,redis还未存储,获取kafka的偏移量")
    val consumer = new KafkaConsumer[String, Object](kafkaParams)
    val topicCollection = List(new TopicPartition(topicPartitions._1 , partition))
    consumer.assign(topicCollection)
    val avaliableOffset: Long = consumer.beginningOffsets(topicCollection).values().head
    consumer.close()
    topicPartitionMap.put(new TopicPartition(topicPartitions._1 , Integer.valueOf(partition)) , avaliableOffset)
    }
    })
    }
    )

    //currentoffset 、 earliestoffset leatestOffset
    //cur < ear || cur > leaty ==> 矫正--> ear
    //TODO 矫正
    val earliestOffsets = getEarliestOffsets(kafkaParams, topic)
    val topics = List(topic)
    val latestOffsets = getLatestOffsets(kafkaParams, topics)
    for ((k, v) <- topicPartitionMap) {
    val current = v
    val earliest = earliestOffsets.get(k).get
    val latest = latestOffsets.get(k).get
    if (current < earliest || current > latest) {
    topicPartitionMap.put(k, earliest)
    }
    }
    topicPartitionMap.toMap
    }


    /**
    * 获取最早的偏移量
    *
    * @param kafkaParams
    * @param topics
    * @return
    */
    private def getEarliestOffsets(kafkaParams: Map[String, Object], topic: String) = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    //kafka api
    val consumer = new KafkaConsumer(kafkaParams)
    //订阅
    val topics = Seq[String](topic)
    consumer.subscribe(topics)
    val noOffsetForPartitionExceptionSet: mutable.Set[Nothing] = mutable.Set()
    try {
    consumer.poll(0)
    } catch {
    case e: NoOffsetForPartitionException =>
    // noOffsetForPartitionExceptionSet.add(e.partition())
    //邮件报警
    }
    //获取 分区信息
    val topicp = consumer.assignment().toSet
    //暂定消费
    consumer.pause(topicp)
    //从头开始
    consumer.seekToBeginning(topicp)
    val toMap = topicp.map(line => line -> consumer.position(line)).toMap
    val earliestOffsetMap = toMap
    consumer.unsubscribe()
    consumer.close()
    earliestOffsetMap
    }


    private def getLatestOffsets(kafkaParams: Map[String, Object], topic: Seq[String]) = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    //kafka api
    val consumer = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    //订阅
    consumer.subscribe(topic)
    val noOffsetForPartitionExceptionSet = mutable.Set()
    try {
    consumer.poll(0)
    } catch {
    case e: NoOffsetForPartitionException =>
    // noOffsetForPartitionExceptionSet.add(e.partition())
    //邮件报警
    }
    //获取 分区信息
    val topicp = consumer.assignment().toSet
    //暂定消费
    consumer.pause(topicp)
    //从尾开始
    consumer.seekToEnd(topicp)
    val toMap: Map[TopicPartition, Long] = topicp.map(line => line -> consumer.position(line)).toMap
    val earliestOffsetMap = toMap
    consumer.unsubscribe()
    consumer.close()
    earliestOffsetMap
    }


    def persistOffset[K, V](rdd: RDD[ConsumerRecord[K, V]], storeOffset: Boolean = true, topic: String) = {
    val groupId = kafkaParams("group.id").toString
    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetRanges.foreach(offsetRange => {
    val redisKey = topic + "|" + groupId
    val data = if (storeOffset) offsetRange.untilOffset else offsetRange.fromOffset
    jedis.hset(redisKey, offsetRange.partition.toString, data.toString)
    println("topic:" + offsetRange.topic + "分区:" + offsetRange.partition + "开始消费" + offsetRange.fromOffset + "消费到" + offsetRange.untilOffset + "共计" + offsetRange.count())
    })


    }


    }

    object KafkaManagerByRedis {
    def main(args: Array[String]): Unit = {
    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "192.168.121.12:9092,192.168.121.12:9093",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "test1",
    "auto.offset.reset" -> ("earliest "),
    "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
    )
    val zkServer=""
    val kafkama = new KafkaManagerByRedis(zkServer,kafkaParams)
    kafkama.getEarliestOffsets(kafkaParams, "cheng_du_gps_topic")
    .foreach(m => println(m._1.topic(), m._1.partition(), m._2))

    kafkama.getLatestOffsets(kafkaParams, List("cheng_du_gps_topic"))
    .foreach(m => println(m._1.topic(), m._1.partition(), m._2))
    }
    }

    
    
  • 相关阅读:
    Spring MVC国际化
    cvc-complex-type.2.3: Element 'beans' cannot have character [children]
    jstl fmt
    java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderL
    eclipse 使用tomcat7.0建立Dynamic Web Project 时 web.xml的问题
    JAVA学习(七)__Spring的@Autowired注入规则
    Java中的默认构造函数
    Spring的国际化(转载)
    java工程中不能存在多个数据库连接jar包
    HDU 3265 Posters
  • 原文地址:https://www.cnblogs.com/hejunhong/p/12081028.html
Copyright © 2011-2022 走看看