zoukankan      html  css  js  c++  java
  • sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)-- 2

    参考上篇博文:https://www.cnblogs.com/niutao/p/10547718.html

    同样的逻辑,不同的封装

    package offsetInZookeeper
    
    /**
      * Created by angel
      */
    import java.lang.Object
    
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
    import org.slf4j.LoggerFactory
    
    import scala.collection.JavaConversions._
    import scala.reflect.ClassTag
    import scala.util.Try
    /**
      * Kafka的连接和Offset管理工具类
      *
      * @param zkHosts     Zookeeper地址
      * @param kafkaParams Kafka启动参数
      */
    class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
      //Logback日志对象,使用slf4j框架
      @transient private lazy val log = LoggerFactory.getLogger(getClass)
      //建立ZkUtils对象所需的参数
      val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
      //ZkUtils对象,用于访问Zookeeper
      val zkUtils = new ZkUtils(zkClient, zkConnection, false)
      /**
        * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
        *
        * @param ssc    Spark Streaming Context
        * @param topics Kafka话题
        * @tparam K Kafka消息Key类型
        * @tparam V Kafka消息Value类型
        * @return Kafka Streaming流
        */
      def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
        val groupId = kafkaParams("group.id").toString
        val storedOffsets = readOffsets(topics, groupId)
        log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
        val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
        kafkaStream
      }
      /**
        * 从Zookeeper读取Kafka消息队列的Offset
        *
        * @param topics  Kafka话题
        * @param groupId Kafka Group ID
        * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
        */
      def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
        val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
        val partitionMap = zkUtils.getPartitionsForTopics(topics)
        // /consumers/<groupId>/offsets/<topic>/
        partitionMap.foreach(topicPartitions => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
          topicPartitions._2.foreach(partition => {
            val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
            val tryGetKafkaOffset = Try {
              val offsetStatTuple = zkUtils.readData(offsetPath)
              if (offsetStatTuple != null) {
                log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
                topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
              }
            }
            if(tryGetKafkaOffset.isFailure){
              //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
              val consumer = new KafkaConsumer[String, Object](kafkaParams)
              val partitionList = List(new TopicPartition(topicPartitions._1, partition))
              consumer.assign(partitionList)
              val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
              consumer.close()
              log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
              topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
            }
          })
        })
        topicPartOffsetMap.toMap
      }
      /**
        * 保存Kafka消息队列消费的Offset
        *
        * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
        * @param storeEndOffset true=保存结束offset, false=保存起始offset
        */
      def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
        val groupId = kafkaParams("group.id").toString
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetsList.foreach(or => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
          val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
          val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
          zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
          log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
        })
      }
    
    
    }
    
    object Manager{
      def main(args: Array[String]): Unit = {
        //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
        if (args.length < 5) {
          System.err.println("Usage: KafkaDirectStreamTest " +
            "<batch-duration-in-seconds> " +
            "<kafka-bootstrap-servers> " +
            "<kafka-topics> " +
            "<kafka-consumer-group-id> " +
            "<kafka-zookeeper-quorum>")
          System.exit(1)
        }
    
        val batchDuration = args(0)
        val bootstrapServers = args(1).toString
        val topicsSet = args(2).toString.split(",").toSet
        val consumerGroupID = args(3)
        val zkQuorum = args(4)
        val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
          .setMaster("local[4]")
    
    
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
    
        val topics = topicsSet.toArray
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> bootstrapServers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> consumerGroupID,
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
        )
    
        lazy val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
        val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc , topics)
        inputDStream.foreachRDD(rdd => {
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          offsetRanges.foreach(
            offset =>
              println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
          )
          kafkaManager.persistOffsets(rdd)
        })
        ssc.start()
        ssc.awaitTermination()
    
    
    
    
      }
    
    
    
    }
  • 相关阅读:
    Eclipse汉化
    Sublime Text 3 插件安装
    HTML5中canvas的save和restore方法
    No module ata_piix found的解决方法
    在虚拟机和主机之间共享文件夹
    C语言:文件操作
    ubuntu下的第一个脚本file.sh
    解析java源文件
    Compile Java Codes in Linux Shell instead of Ant Script
    Eclipse的XML编辑器解决方案
  • 原文地址:https://www.cnblogs.com/niutao/p/10547831.html
Copyright © 2011-2022 走看看