zoukankan      html  css  js  c++  java
  • sparkStreaming消费kafka-0.8方式:direct方式(存储offset到zookeeper)

    生产中,为了保证kafka的offset的安全性,并且防止丢失数据现象,会手动维护偏移量(offset)

    版本:kafka:0.8

    其中需要注意的点:

    1:获取zookeeper记录的分区偏移量

    2:获取broker中实际的最小和最大偏移量

    3:将实际的偏移量和zookeeper记录的偏移量进行对比,如果zookeeper中记录的偏移量在实际的偏移量范围内则使用zookeeper中的偏移量
    4:反之,使用实际的broker中的最小偏移量

    KafkaHelper:

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.spark.SparkException
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{KafkaCluster, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.kafka.KafkaCluster.Err
    
    /**
      * KafkaHelper类提供两个共有方法,一个用来创建direct方式的DStream,另一个用来更新zookeeper中的消费偏移量
      * @param kafkaPrams kafka配置参数
      * @param zkQuorum zookeeper列表
      * @param group 消费组
      * @param topic 消费主题
      */
    class KafkaHelper(kafkaPrams:Map[String,String],zkQuorum:String,group:String,topic:String) extends Serializable{
    
      private val kc = new KafkaCluster(kafkaPrams)
      private val zkClient = new ZkClient(zkQuorum)
      private val topics = Set(topic)
    
      /**
        * 获取消费组group下的主题topic在zookeeper中的保存路径
        * @return
        */
      private def getZkPath():String={
        val topicDirs = new ZKGroupTopicDirs(group,topic)
        val zkPath = topicDirs.consumerOffsetDir
        zkPath
      }
    
      /**
        * 获取偏移量信息
        * @param children 分区数
        * @param zkPath zookeeper中的topic信息的路径
        * @param earlistLeaderOffsets broker中的实际最小偏移量
        * @param latestLeaderOffsets broker中的实际最大偏移量
        * @return
        */
      private def getOffsets(children:Int,zkPath:String,earlistLeaderOffsets:Map[TopicAndPartition, KafkaCluster.LeaderOffset],latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset]): Map[TopicAndPartition, Long] = {
        var fromOffsets: Map[TopicAndPartition, Long] = Map()
        for(i <- 0 until children){
          //获取zookeeper记录的分区偏移量
          val zkOffset = zkClient.readData[String](s"${zkPath}/${i}").toLong
          val tp = TopicAndPartition(topic,i)
          //获取broker中实际的最小和最大偏移量
          val earlistOffset: Long = earlistLeaderOffsets(tp).offset
          val latestOffset: Long = latestLeaderOffsets(tp).offset
          //将实际的偏移量和zookeeper记录的偏移量进行对比,如果zookeeper中记录的偏移量在实际的偏移量范围内则使用zookeeper中的偏移量,
          //反之,使用实际的broker中的最小偏移量
          if(zkOffset>=earlistOffset && zkOffset<=latestOffset) {
            fromOffsets += (tp -> zkOffset)
          }else{
            fromOffsets += (tp -> earlistOffset)
          }
        }
        fromOffsets
      }
    
      /**
        * 创建DStream
        * @param ssc
        * @return
        */
      def createDirectStream(ssc:StreamingContext):InputDStream[(String, String)]={
        //----------------------获取broker中实际偏移量---------------------------------------------
        val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
        if(partitionsE.isLeft)
          throw new SparkException("get kafka partitions failed:")
        val partitions = partitionsE.right.get
        val earlistLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getEarliestLeaderOffsets(partitions)
        if(earlistLeaderOffsetsE.isLeft)
          throw new SparkException("get kafka earlistLeaderOffsets failed:")
        val earlistLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earlistLeaderOffsetsE.right.get
        val latestLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getLatestLeaderOffsets(partitions)
        if(latestLeaderOffsetsE.isLeft)
          throw new SparkException("get kafka latestLeaderOffsets failed:")
        val latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = latestLeaderOffsetsE.right.get
        //----------------------创建kafkaStream----------------------------------------------------
        var kafkaStream:InputDStream[(String, String)]=null
        val zkPath: String = getZkPath()
        val children = zkClient.countChildren(zkPath)
        //根据zookeeper中是否有偏移量数据判断有没有消费过kafka中的数据
        if(children > 0){
          val fromOffsets:Map[TopicAndPartition, Long] = getOffsets(children,zkPath,earlistLeaderOffsets,latestLeaderOffsets)
          val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
          //如果消费过,根据偏移量创建Stream
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
            ssc, kafkaPrams, fromOffsets, messageHandler)
        }else{
          //如果没有消费过,根据kafkaPrams配置信息从最早的数据开始创建Stream
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPrams, topics)
        }
        kafkaStream
      }
    
      /**
        * 更新zookeeper中的偏移量
        * @param offsetRanges
        */
      def updateZkOffsets(offsetRanges:Array[OffsetRange])={
        val zkPath: String = getZkPath()
        for( o <- offsetRanges){
          val newZkPath = s"${zkPath}/${o.partition}"
          //将该 partition 的 offset 保存到 zookeeper
          ZkUtils.updatePersistentPath(zkClient, newZkPath, o.fromOffset.toString)
        }
      }
    }
    

      

    驱动类:

    package CC
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sparkStreaming.kafka.KafkaHelper
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Created by angel;
      */
    object TestKafkaHelper {
      def main(args: Array[String]): Unit = {
    
    
        val Array(timeInterval,brokerList,zkQuorum,topic,group) = Array(
          "2"
          , "hadoop01:9092,hadoop02:9092,hadoop03:9092"
          , "hadoop01:2181,hadoop02:2181,hadoop03:2181"
          , "CustomerContacts"
          , "CustomerContacts"
        )
    
        val conf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(timeInterval.toInt))
    
        //kafka配置参数
        val kafkaParams = Map(
          "metadata.broker.list" -> brokerList,
          "group.id" -> group,
          "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
        )
    
        val kafkaHelper = new KafkaHelper(kafkaParams,zkQuorum,topic,group)
    
        val kafkaStream: InputDStream[(String, String)] = kafkaHelper.createDirectStream(ssc)
    
        var offsetRanges = Array[OffsetRange]()
    
        kafkaStream.transform( rdd =>{
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
        }).map( msg => msg._2)
          .foreachRDD( rdd => {
            rdd.foreachPartition( partition =>{
              partition.foreach( record =>{
                //处理数据的方法
                println(record)
              })
            })
            kafkaHelper.updateZkOffsets(offsetRanges)
          })
    
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
      }
    
    }
    

      

  • 相关阅读:
    Collections.unmodifiableMap,Collections.unmodifiableList,Collections.unmodifiableSet作用及源码解析
    Mybatis源码解析,一步一步从浅入深(五):mapper节点的解析
    Mybatis源码解析,一步一步从浅入深(四):将configuration.xml的解析到Configuration对象实例
    Mybatis源码解析,一步一步从浅入深(三):实例化xml配置解析器(XMLConfigBuilder)
    Mybatis源码解析,一步一步从浅入深(二):按步骤解析源码
    Cannot find class: com.mysql.jdbc.Driver错误及解决办法。
    Mybatis源码解析,一步一步从浅入深(一):创建准备工程
    Maven 创建项目之简单示例
    常用注解记录
    jmeter性能测试工具
  • 原文地址:https://www.cnblogs.com/niutao/p/10547594.html
Copyright © 2011-2022 走看看