zoukankan      html  css  js  c++  java
  • Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现(转)

    原文链接:Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现

     Apache Spark 1.3.0引入了Direct API,利用Kafka的低层次API从Kafka集群中读取数据,并且在Spark Streaming系统里面维护偏移量相关的信息,并且通过这种方式去实现零数据丢失(zero data loss)相比使用基于Receiver的方法要高效。但是因为是Spark Streaming系统自己维护Kafka的读偏移量,而Spark Streaming系统并没有将这个消费的偏移量发送到Zookeeper中,这将导致那些基于偏移量的Kafka集群监控软件(比如:Apache Kafka监控之Kafka Web ConsoleApache Kafka监控之KafkaOffsetMonitor等)失效。本文就是基于为了解决这个问题,使得我们编写的Spark Streaming程序能够在每次接收到数据之后自动地更新Zookeeper中Kafka的偏移量。

      我们从Spark的官方文档可以知道,维护Spark内部维护Kafka便宜了信息是存储在HasOffsetRanges类的offsetRanges中,我们可以在Spark Streaming程序里面获取这些信息:

    1 val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    这样我们就可以获取所以分区消费信息,只需要遍历offsetsList,然后将这些信息发送到Zookeeper即可更新Kafka消费的偏移量。完整的代码片段如下:

    01 val messages =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    02       messages.foreachRDD(rdd => {
    03         val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    04         val kc = new KafkaCluster(kafkaParams)
    05         for (offsets < - offsetsList) {
    06           val topicAndPartition = TopicAndPartition("iteblog", offsets.partition)
    07           val =kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))
    08           if (o.isLeft) {
    09             println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
    10           }
    11         }
    12 })

      KafkaCluster类用于建立和Kafka集群的链接相关的操作工具类,我们可以对Kafka中Topic的每个分区设置其相应的偏移量Map((topicAndPartition, offsets.untilOffset)),然后调用KafkaCluster类的setConsumerOffsets方法去更新Zookeeper里面的信息,这样我们就可以更新Kafka的偏移量,最后我们就可以通过KafkaOffsetMonitor之类软件去监控Kafka中相应Topic的消费信息,下图是KafkaOffsetMonitor的监控情况:



      从图中我们可以看到KafkaOffsetMonitor监控软件已经可以监控到Kafka相关分区的消费情况,这对监控我们整个Spark Streaming程序来非常重要,因为我们可以任意时刻了解Spark读取速度。另外,KafkaCluster工具类的完整代码如下:

    01 package org.apache.spark.streaming.kafka
    02  
    03 import kafka.api.OffsetCommitRequest
    04 import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
    05 import kafka.consumer.SimpleConsumer
    06 import org.apache.spark.SparkException
    07 import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
    08  
    09 import scala.collection.mutable.ArrayBuffer
    10 import scala.util.Random
    11 import scala.util.control.NonFatal
    12  
    13 /**
    14  * User: 过往记忆
    15  * Date: 2015-06-02
    16  * Time: 下午23:46
    17  * bolg: http://www.iteblog.com
    18  * 本文地址:http://www.iteblog.com/archives/1381
    19  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    20  * 过往记忆博客微信公共帐号:iteblog_hadoop
    21  */
    22  
    23 class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
    24   type Err = ArrayBuffer[Throwable]
    25  
    26   @transient private var _config: SimpleConsumerConfig = null
    27  
    28   def config: SimpleConsumerConfig = this.synchronized {
    29     if (_config == null) {
    30       _config = SimpleConsumerConfig(kafkaParams)
    31     }
    32     _config
    33   }
    34  
    35   def setConsumerOffsets(groupId: String,
    36                          offsets: Map[TopicAndPartition, Long]
    37                           ): Either[Err, Map[TopicAndPartition, Short]] = {
    38     setConsumerOffsetMetadata(groupId, offsets.map { kv =>
    39       kv._1 -> OffsetMetadataAndError(kv._2)
    40     })
    41   }
    42  
    43   def setConsumerOffsetMetadata(groupId: String,
    44                                 metadata: Map[TopicAndPartition, OffsetMetadataAndError]
    45                                  ): Either[Err, Map[TopicAndPartition, Short]] = {
    46     var result = Map[TopicAndPartition, Short]()
    47     val req = OffsetCommitRequest(groupId, metadata)
    48     val errs = new Err
    49     val topicAndPartitions = metadata.keySet
    50     withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
    51       val resp = consumer.commitOffsets(req)
    52       val respMap = resp.requestInfo
    53       val needed = topicAndPartitions.diff(result.keySet)
    54       needed.foreach { tp: TopicAndPartition =>
    55         respMap.get(tp).foreach { err: Short =>
    56           if (err == ErrorMapping.NoError) {
    57             result += tp -> err
    58           else {
    59             errs.append(ErrorMapping.exceptionFor(err))
    60           }
    61         }
    62       }
    63       if (result.keys.size == topicAndPartitions.size) {
    64         return Right(result)
    65       }
    66     }
    67     val missing = topicAndPartitions.diff(result.keySet)
    68     errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    69     Left(errs)
    70   }
    71  
    72   private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
    73                          (fn: SimpleConsumer => Any): Unit = {
    74     brokers.foreach { hp =>
    75       var consumer: SimpleConsumer = null
    76       try {
    77         consumer = connect(hp._1, hp._2)
    78         fn(consumer)
    79       catch {
    80         case NonFatal(e) =>
    81           errs.append(e)
    82       finally {
    83         if (consumer != null) {
    84           consumer.close()
    85         }
    86       }
    87     }
    88   }
    89  
    90   def connect(host: String, port: Int): SimpleConsumer =
    91     new SimpleConsumer(host, port, config.socketTimeoutMs,
    92       config.socketReceiveBufferBytes, config.clientId)
    93 }
  • 相关阅读:
    (转)jQuery.extend 函数详解
    (转)跟我一起学JQuery插件开发教程
    (转)jQuery插件开发全解析
    (转)弹出窗口lhgDialog API文档
    (转)反射发送实战(-)InvokeMember
    (转)JQuery处理json与ajax返回JSON实例
    linux下添加用户并赋予root权限
    Linux 下配置,安装Hadoop
    Linux 下安装 jdk-7u79-linux-x64.gz,jdk1.7.0_79,jdk1.7步骤:
    linux下导入、导出mysql数据库命令
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4961443.html
Copyright © 2011-2022 走看看