zoukankan      html  css  js  c++  java
  • Spark createDirectStream 维护 Kafka offset(Scala)

    createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据。

    KafkaManager.scala

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.Decoder
    import org.apache.spark.SparkException
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
    
    import scala.reflect.ClassTag
    
    /**
      * Created by knowpigxia on 15-8-5.
      */
    class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
    
      private val kc = new KafkaCluster(kafkaParams)
    
      /**
        * 创建数据流
        * @param ssc
        * @param kafkaParams
        * @param topics
        * @tparam K
        * @tparam V
        * @tparam KD
        * @tparam VD
        * @return
        */
      def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
                                                                                                                ssc: StreamingContext,
                                                                                                                kafkaParams: Map[String, String],
                                                                                                                topics: Set[String]): InputDStream[(K, V)] =  {
        val groupId = kafkaParams.get("group.id").get
        // 在zookeeper上读取offsets前先根据实际情况更新offsets
        setOrUpdateOffsets(topics, groupId)
    
        //从zookeeper上读取offset开始消费message
        val messages = {
          val partitionsE = kc.getPartitions(topics)
          if (partitionsE.isLeft)
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
          val partitions = partitionsE.right.get
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft)
            throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
          val consumerOffsets = consumerOffsetsE.right.get
          KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
            ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
        }
        messages
      }
    
      /**
        * 创建数据流前,根据实际消费情况更新消费offsets
        * @param topics
        * @param groupId
        */
      private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
        topics.foreach(topic => {
          var hasConsumed = true
          val partitionsE = kc.getPartitions(Set(topic))
          if (partitionsE.isLeft)
            throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
          val partitions = partitionsE.right.get
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft) hasConsumed = false
          if (hasConsumed) {// 消费过
            /**
              * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
              * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
              * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
              * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
              * 这时把consumerOffsets更新为earliestLeaderOffsets
              */
            val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
            if (earliestLeaderOffsetsE.isLeft)
              throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
            val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
            val consumerOffsets = consumerOffsetsE.right.get
    
            // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
            var offsets: Map[TopicAndPartition, Long] = Map()
            consumerOffsets.foreach({ case(tp, n) =>
              val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
              if (n < earliestLeaderOffset) {
                println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
                  " offsets已经过时,更新为" + earliestLeaderOffset)
                offsets += (tp -> earliestLeaderOffset)
              }
            })
            if (!offsets.isEmpty) {
              kc.setConsumerOffsets(groupId, offsets)
            }
          } else {// 没有消费过
          val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
            var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
            if (reset == Some("smallest")) {
              val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft)
                throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsets = leaderOffsetsE.right.get
            } else {
              val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft)
                throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsets = leaderOffsetsE.right.get
            }
            val offsets = leaderOffsets.map {
              case (tp, offset) => (tp, offset.offset)
            }
            kc.setConsumerOffsets(groupId, offsets)
          }
        })
      }
    
      /**
        * 更新zookeeper上的消费offsets
        * @param rdd
        */
      def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
        val groupId = kafkaParams.get("group.id").get
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
        for (offsets <- offsetsList) {
          val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
          val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
          if (o.isLeft) {
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
          }
        }
      }
    }
    

      主程序中

    def initKafkaParams = {
        Map[String, String](
          "metadata.broker.list" -> Constants.KAFKA_BROKERS,
          "group.id " -> Constants.KAFKA_CONSUMER_GROUP,
          "fetch.message.max.bytes" -> "20971520",
          "auto.offset.reset" -> "smallest"
        )
      } 
    
    // kafka参数
    val kafkaParams = initKafkaParams
    val manager = new KafkaManager(kafkaParams)
    val messageDstream = manager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) 
    
    // 更新offsets
    manager.updateZKOffsets(rdd)
    

      

  • 相关阅读:
    Balance的数学思想构造辅助函数
    1663. Smallest String With A Given Numeric Value (M)
    1680. Concatenation of Consecutive Binary Numbers (M)
    1631. Path With Minimum Effort (M)
    1437. Check If All 1's Are at Least Length K Places Away (E)
    1329. Sort the Matrix Diagonally (M)
    1657. Determine if Two Strings Are Close (M)
    1673. Find the Most Competitive Subsequence (M)
    1641. Count Sorted Vowel Strings (M)
    1679. Max Number of K-Sum Pairs (M)
  • 原文地址:https://www.cnblogs.com/zhangtianyuan/p/8483082.html
Copyright © 2011-2022 走看看