zoukankan      html  css  js  c++  java
  • Spark createDirectStream 维护 Kafka offset(Scala)

    createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据。

    KafkaManager.scala

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.Decoder
    import org.apache.spark.SparkException
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
    
    import scala.reflect.ClassTag
    
    /**
      * Created by knowpigxia on 15-8-5.
      */
    class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
    
      private val kc = new KafkaCluster(kafkaParams)
    
      /**
        * 创建数据流
        * @param ssc
        * @param kafkaParams
        * @param topics
        * @tparam K
        * @tparam V
        * @tparam KD
        * @tparam VD
        * @return
        */
      def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
                                                                                                                ssc: StreamingContext,
                                                                                                                kafkaParams: Map[String, String],
                                                                                                                topics: Set[String]): InputDStream[(K, V)] =  {
        val groupId = kafkaParams.get("group.id").get
        // 在zookeeper上读取offsets前先根据实际情况更新offsets
        setOrUpdateOffsets(topics, groupId)
    
        //从zookeeper上读取offset开始消费message
        val messages = {
          val partitionsE = kc.getPartitions(topics)
          if (partitionsE.isLeft)
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
          val partitions = partitionsE.right.get
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft)
            throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
          val consumerOffsets = consumerOffsetsE.right.get
          KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
            ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
        }
        messages
      }
    
      /**
        * 创建数据流前,根据实际消费情况更新消费offsets
        * @param topics
        * @param groupId
        */
      private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
        topics.foreach(topic => {
          var hasConsumed = true
          val partitionsE = kc.getPartitions(Set(topic))
          if (partitionsE.isLeft)
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
          val partitions = partitionsE.right.get
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft) hasConsumed = false
          if (hasConsumed) {// 消费过
            /**
              * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
              * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
              * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
              * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
              * 这时把consumerOffsets更新为earliestLeaderOffsets
              */
            val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
            if (earliestLeaderOffsetsE.isLeft)
              throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
            val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
            val consumerOffsets = consumerOffsetsE.right.get
    
            // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
            var offsets: Map[TopicAndPartition, Long] = Map()
            consumerOffsets.foreach({ case(tp, n) =>
              val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
              if (n < earliestLeaderOffset) {
                println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                  " offsets已经过时,更新为" + earliestLeaderOffset)
                offsets += (tp -> earliestLeaderOffset)
              }
            })
            if (!offsets.isEmpty) {
              kc.setConsumerOffsets(groupId, offsets)
            }
          } else {// 没有消费过
          val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
            var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
            if (reset == Some("smallest")) {
              val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft)
                throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsets = leaderOffsetsE.right.get
            } else {
              val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft)
                throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsets = leaderOffsetsE.right.get
            }
            val offsets = leaderOffsets.map {
              case (tp, offset) => (tp, offset.offset)
            }
            kc.setConsumerOffsets(groupId, offsets)
          }
        })
      }
    
      /**
        * 更新zookeeper上的消费offsets
        * @param rdd
        */
      def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
        val groupId = kafkaParams.get("group.id").get
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
        for (offsets <- offsetsList) {
          val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
          val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
          if (o.isLeft) {
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
          }
        }
      }
    }
    

      主程序中

    def initKafkaParams = {
        Map[String, String](
          "metadata.broker.list" -> Constants.KAFKA_BROKERS,
          "group.id " -> Constants.KAFKA_CONSUMER_GROUP,
          "fetch.message.max.bytes" -> "20971520",
          "auto.offset.reset" -> "smallest"
        )
      } 
    
    // kafka参数
    val kafkaParams = initKafkaParams
    val manager = new KafkaManager(kafkaParams)
    val messageDstream = manager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) 
    
    // 更新offsets
    manager.updateZKOffsets(rdd)
    

      

  • 相关阅读:
    021.NET5_Autofac多种注入方式
    020.NET5_Autofac初识
    018-019 NET5_内置容器支持依赖注入+IServiceCollection的生命周期
    017.NET5_内置容器基本使用
    设计库和表从哪些方面考虑(MYSQL)
    MD5的如何加密空字符串的(PHP)
    Think php 5登陆注册session储存
    think php 5(命令行)创建控制器、model
    PHP面试题(个人总结)————(暂不更新)
    PHP中的curl库使用
  • 原文地址:https://www.cnblogs.com/zhangtianyuan/p/8483082.html
Copyright © 2011-2022 走看看