zoukankan      html  css  js  c++  java
  • sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)

    版本声明:

    kafka:1.0.1

    spark:2.1.0

    注意:在使用过程中可能会出现servlet版本不兼容的问题,因此在导入maven的pom文件的时候,需要做适当的排除操作

      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <project xmlns="http://maven.apache.org/POM/4.0.0"
      3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      5     <modelVersion>4.0.0</modelVersion>
      6 
      7     <groupId>kafkaDirect</groupId>
      8     <artifactId>kafkaDirect</artifactId>
      9     <version>1.0-SNAPSHOT</version>
     10     <repositories>
     11         <repository>
     12             <id>cloudera-releases</id>
     13             <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
     14             <releases>
     15                 <enabled>true</enabled>
     16             </releases>
     17             <snapshots>
     18                 <enabled>false</enabled>
     19             </snapshots>
     20         </repository>
     21     </repositories>
     22 
     23     <dependencies>
     24     <dependency>
     25         <groupId>org.apache.spark</groupId>
     26         <artifactId>spark-streaming_2.11</artifactId>
     27         <version>2.1.0</version>
     28         <exclusions>
     29             <exclusion>
     30                 <groupId>javax.servlet</groupId>
     31                 <artifactId>servlet-api</artifactId>
     32             </exclusion>
     33         </exclusions>
     34     </dependency>
     35     <!--<dependency>-->
     36         <!--<groupId>org.apache.spark</groupId>-->
     37         <!--<artifactId>spark-streaming-kafka_2.11</artifactId>-->
     38         <!--<version>2.1.0</version>-->
     39         <!--<exclusions>-->
     40             <!--<exclusion>-->
     41                 <!--<groupId>javax.servlet</groupId>-->
     42                 <!--<artifactId>servlet-api</artifactId>-->
     43             <!--</exclusion>-->
     44         <!--</exclusions>-->
     45     <!--</dependency>-->
     46         <dependency>
     47             <groupId>org.apache.spark</groupId>
     48             <artifactId>spark-core_2.11</artifactId>
     49             <version>2.1.0</version>
     50             <exclusions>
     51                 <exclusion>
     52                     <groupId>javax.servlet</groupId>
     53                     <artifactId>servlet-api</artifactId>
     54                 </exclusion>
     55             </exclusions>
     56         </dependency>
     57         <dependency>
     58             <groupId>org.scala-lang</groupId>
     59             <artifactId>scala-library</artifactId>
     60             <version>2.11.8</version>
     61             <exclusions>
     62                 <exclusion>
     63                     <groupId>javax.servlet</groupId>
     64                     <artifactId>servlet-api</artifactId>
     65                 </exclusion>
     66             </exclusions>
     67         </dependency>
     68         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
     69         <dependency>
     70             <groupId>org.apache.kafka</groupId>
     71             <artifactId>kafka-clients</artifactId>
     72             <version>1.0.1</version>
     73             <exclusions>
     74                 <exclusion>
     75                     <groupId>javax.servlet</groupId>
     76                     <artifactId>servlet-api</artifactId>
     77                 </exclusion>
     78             </exclusions>
     79         </dependency>
     80         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
     81         <dependency>
     82             <groupId>org.apache.spark</groupId>
     83             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
     84             <version>2.1.0</version>
     85         </dependency>
     86 
     87         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
     88         <dependency>
     89             <groupId>org.apache.hbase</groupId>
     90             <artifactId>hbase-client</artifactId>
     91             <version>1.2.0-cdh5.14.0</version>
     92         </dependency>
     93         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
     94         <dependency>
     95             <groupId>org.apache.hbase</groupId>
     96             <artifactId>hbase-common</artifactId>
     97             <version>1.2.0-cdh5.14.0</version>
     98         </dependency>
     99 
    100     </dependencies>
    101 </project>

    代码:

    因为使用了zookeeper作为offset的存储,因此任何能够监控zookeeper的框架,都可以监控当前kafka消费状况

    例如:kafkaOffsetMonitor

    https://github.com/quantifind/KafkaOffsetMonitor/releases

    其中注意的小点:

    1:在zookeeper中offset存储路径:/consumers/[groupId]/offsets/topic/[partitionId]

    2:读取offset操作,其实就是去zookeeper的路径下拿offset值,代码:

     1 def readOffsets(
     2                    topics: Seq[String],
     3                    groupId:String ,
     4                    zkUtils: ZkUtils
     5                  ): Map[TopicPartition, Long] = {
     6 
     7     val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
     8 
     9     val partitionMap = zkUtils.getPartitionsForTopics(topics)
    10 
    11     // /consumers/<groupId>/offsets/<topic>/
    12 
    13     partitionMap.foreach(topicPartitions => {
    14 
    15       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
    16       //遍历每一个分区下的数据
    17       topicPartitions._2.foreach(partition => {
    18 
    19         val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
    20         try {
    21 
    22           val offsetStatTuple = zkUtils.readData(offsetPath)
    23           if (offsetStatTuple != null) {
    24             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
    25 
    26           }
    27 
    28         } catch {
    29 
    30           case e: Exception =>
    31 
    32 //            println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
    33             println("message: {} , topic: {}, partition: {},  node path: {}" , e.getMessage , topics , topicPartitions ,  offsetPath)
    34             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
    35 
    36         }
    37 
    38       })
    39 
    40     })
    41 
    42     topicPartOffsetMap.toMap
    43 
    44   }

    3:提交offset代码,实际就是将offset存储到zookeeper中

    def persistOffsets(
                          offsets: Seq[OffsetRange],
                          groupId: String,
                          storeEndOffset: Boolean = true,
                          zkUtils: ZkUtils
                        ): Unit = {
    
    
        offsets.foreach(or => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
          val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
          val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
          println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
          zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls)
    
        })
    
      }

    完整代码

      1 package offsetInZookeeper
      2 
      3 import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
      4 import org.I0Itec.zkclient.ZkClient
      5 import org.apache.kafka.clients.consumer.ConsumerRecord
      6 import org.apache.kafka.common.TopicPartition
      7 import org.apache.kafka.common.serialization.StringDeserializer
      8 import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
      9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
     10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
     11 import org.apache.spark.{SparkConf, SparkContext}
     12 import org.apache.spark.streaming.{Seconds, StreamingContext}
     13 import org.apache.zookeeper.ZooDefs
     14 import org.apache.zookeeper.data.ACL
     15 
     16 import scala.collection.JavaConversions
     17 import scala.collection.mutable.ListBuffer
     18 
     19 /**
     20   * Created by angel
     21   */
     22 object KafkaOffsetInZookeeper {
     23   def main(args: Array[String]): Unit = {
     24     //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
     25     if (args.length < 5) {
     26       System.err.println("Usage: KafkaDirectStreamTest " +
     27         "<batch-duration-in-seconds> " +
     28         "<kafka-bootstrap-servers> " +
     29         "<kafka-topics> " +
     30         "<kafka-consumer-group-id> " +
     31         "<kafka-zookeeper-quorum>")
     32       System.exit(1)
     33     }
     34 
     35     val batchDuration = args(0)
     36     val bootstrapServers = args(1).toString
     37     val topicsSet = args(2).toString.split(",").toSet
     38     val consumerGroupID = args(3)
     39     val zkQuorum = args(4)
     40     val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
     41       .setMaster("local[4]")//Uncomment this line to test while developing on a workstation
     42     val sc = new SparkContext(sparkConf)
     43     val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
     44     val topics = topicsSet.toArray
     45     val topic = topics(0)
     46     //  /consumers/[groupId]/offsets/topic/[partitionId]
     47     //+"/consumers/"+consumerGroupID+"/offsets/"+topic
     48     val zkKafkaRootDir = zkQuorum + "/consumers/"+consumerGroupID+"/offsets/"+topic
     49     val zkSessionTimeOut = 10000
     50     val zkConnectionTimeOut = 10000
     51     val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkKafkaRootDir, zkSessionTimeOut, zkConnectionTimeOut)
     52     val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
     53 
     54 
     55     val kafkaParams = Map[String, Object](
     56       "bootstrap.servers" -> bootstrapServers,
     57       "key.deserializer" -> classOf[StringDeserializer],
     58       "value.deserializer" -> classOf[StringDeserializer],
     59       "group.id" -> consumerGroupID,
     60       "auto.offset.reset" -> "latest",
     61       "enable.auto.commit" -> (false: java.lang.Boolean)
     62     )
     63 
     64     //去zookeeper上拿offset
     65     val fromOffsets: Map[TopicPartition, Long] = readOffsets(topics , consumerGroupID , zkUtils)
     66     //根据offset获取数据
     67 //    val inputDStream = KafkaUtils.createDirectStream[String, String](
     68 //      ssc,
     69 //      PreferConsistent,
     70 //      Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)
     71 //    )
     72 
     73     //offsets: ju.Map[TopicPartition, jl.Long]
     74 //    val inputDStream = KafkaUtils.createDirectStream[String, String](
     75 //      ssc,
     76 //      PreferConsistent,
     77 //      Subscribe[String, String](topics, kafkaParams , fromOffsets)
     78 //    )
     79     val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
     80     //处理数据,处理完事之后将offset写入zookeeper
     81     var storeEndOffset: Boolean = false
     82     inputDStream.foreachRDD((rdd,batchTime) => {
     83 
     84       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     85       offsetRanges.foreach(
     86         offset =>
     87           println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
     88       )
     89       val newRDD = rdd.map(message => processMessage(message))
     90 //      newRDD.count()
     91       persistOffsets(offsetRanges,consumerGroupID,storeEndOffset,zkUtils)
     92     })
     93 
     94 //    println("Number of messages processed " + inputDStream.count())
     95     ssc.start()
     96     ssc.awaitTermination()
     97 
     98 
     99 
    100   }
    101 
    102   /*
    103     Create a dummy process that simply returns the message as is.
    104      */
    105   def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={
    106     message
    107   }
    108 
    109   def readOffsets(
    110                    topics: Seq[String],
    111                    groupId:String ,
    112                    zkUtils: ZkUtils
    113                  ): Map[TopicPartition, Long] = {
    114 
    115     val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    116 
    117     val partitionMap = zkUtils.getPartitionsForTopics(topics)
    118 
    119     // /consumers/<groupId>/offsets/<topic>/
    120 
    121     partitionMap.foreach(topicPartitions => {
    122 
    123       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
    124       //遍历每一个分区下的数据
    125       topicPartitions._2.foreach(partition => {
    126 
    127         val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
    128         try {
    129 
    130           val offsetStatTuple = zkUtils.readData(offsetPath)
    131           if (offsetStatTuple != null) {
    132             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
    133 
    134           }
    135 
    136         } catch {
    137 
    138           case e: Exception =>
    139 
    140 //            println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
    141             println("message: {} , topic: {}, partition: {},  node path: {}" , e.getMessage , topics , topicPartitions ,  offsetPath)
    142             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
    143 
    144         }
    145 
    146       })
    147 
    148     })
    149 
    150     topicPartOffsetMap.toMap
    151 
    152   }
    153 
    154 
    155   def persistOffsets(
    156                       offsets: Seq[OffsetRange],
    157                       groupId: String,
    158                       storeEndOffset: Boolean = true,
    159                       zkUtils: ZkUtils
    160                     ): Unit = {
    161 
    162 
    163     offsets.foreach(or => {
    164       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
    165       val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
    166       val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
    167       println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
    168       zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls)
    169 
    170     })
    171 
    172   }
    173 
    174 
    175 }

     第二种代码:

    package offsetInZookeeper
    
    /**
      * Created by angel
      */
    import java.lang.Object
    
    import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
    import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
    import org.slf4j.LoggerFactory
    
    import scala.collection.JavaConversions._
    import scala.reflect.ClassTag
    import scala.util.Try
    /**
      * Kafka的连接和Offset管理工具类
      *
      * @param zkHosts     Zookeeper地址
      * @param kafkaParams Kafka启动参数
      */
    class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
      //Logback日志对象,使用slf4j框架
      @transient private lazy val log = LoggerFactory.getLogger(getClass)
      //建立ZkUtils对象所需的参数
      val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
      //ZkUtils对象,用于访问Zookeeper
      val zkUtils = new ZkUtils(zkClient, zkConnection, false)
      /**
        * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
        *
        * @param ssc    Spark Streaming Context
        * @param topics Kafka话题
        * @tparam K Kafka消息Key类型
        * @tparam V Kafka消息Value类型
        * @return Kafka Streaming流
        */
      def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
        val groupId = kafkaParams("group.id").toString
        val storedOffsets = readOffsets(topics, groupId)
        log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
        val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
        kafkaStream
      }
      /**
        * 从Zookeeper读取Kafka消息队列的Offset
        *
        * @param topics  Kafka话题
        * @param groupId Kafka Group ID
        * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
        */
      def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
        val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
        val partitionMap = zkUtils.getPartitionsForTopics(topics)
        // /consumers/<groupId>/offsets/<topic>/
        partitionMap.foreach(topicPartitions => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
          topicPartitions._2.foreach(partition => {
            val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
            val tryGetKafkaOffset = Try {
              val offsetStatTuple = zkUtils.readData(offsetPath)
              if (offsetStatTuple != null) {
                log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
                topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
              }
            }
            if(tryGetKafkaOffset.isFailure){
              //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
              val consumer = new KafkaConsumer[String, Object](kafkaParams)
              val partitionList = List(new TopicPartition(topicPartitions._1, partition))
              consumer.assign(partitionList)
              val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
              consumer.close()
              log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
              topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
            }
          })
        })
        topicPartOffsetMap.toMap
      }
      /**
        * 保存Kafka消息队列消费的Offset
        *
        * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
        * @param storeEndOffset true=保存结束offset, false=保存起始offset
        */
      def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
        val groupId = kafkaParams("group.id").toString
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetsList.foreach(or => {
          val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
          val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
          val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
          zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
          log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
        })
      }
    
    
    }
    
    object Manager{
      def main(args: Array[String]): Unit = {
        //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
        if (args.length < 5) {
          System.err.println("Usage: KafkaDirectStreamTest " +
            "<batch-duration-in-seconds> " +
            "<kafka-bootstrap-servers> " +
            "<kafka-topics> " +
            "<kafka-consumer-group-id> " +
            "<kafka-zookeeper-quorum>")
          System.exit(1)
        }
    
        val batchDuration = args(0)
        val bootstrapServers = args(1).toString
        val topicsSet = args(2).toString.split(",").toSet
        val consumerGroupID = args(3)
        val zkQuorum = args(4)
        val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
          .setMaster("local[4]")
    
    
        val sc = new SparkContext(sparkConf)
        val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
    
        val topics = topicsSet.toArray
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> bootstrapServers,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> consumerGroupID,
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
        )
    
        lazy val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
        val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc , topics)
        inputDStream.foreachRDD(rdd => {
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          offsetRanges.foreach(
            offset =>
              println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
          )
          kafkaManager.persistOffsets(rdd)
        })
        ssc.start()
        ssc.awaitTermination()
    
    
    
    
      }
    
    
    
    }
  • 相关阅读:
    Vue Bug
    Vue.js(一)
    Node.js简介
    对请求链接的URLEncode处理
    淘宝开放平台
    Java基础(一)
    计算机基础知识
    Unity中对象池的使用
    希尔排序算法
    插入排序算法
  • 原文地址:https://www.cnblogs.com/niutao/p/10547718.html
Copyright © 2011-2022 走看看