zoukankan      html  css  js  c++  java
  • spark/kafka的集成

    Spark1.3中新增DirectStream处理Kafka的消息。使用方法如下:

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    ssc:StreamContext
    kafkaParams:Kafka的参数,包括Kafka的Brokers等
    topicsSet:要读取的topic。

    此方法创建一个input steam,通过直接从Kafka的Brokers读取消息,而不是创建任何的receiver(即不通过kafka的高级api来读取消息)
    此stream保证Kafka的消息只会被处理一次。
      * - 无需receivers:该Steam不创建任何receiver。它直接查询Kafka的offset(偏移量),不需要通过zookeeper来保存已经消费过的offset。
    当然,现有很多kafka的监控工具都会读取Zookeeper的数据,因此,如果你想继续使用Kafka 的监控工具,需要自己实现代码来更新Zookeeper的offset。
    可以参考 org.apache.spark.streaming.kafka.HasOffsetRanges
      * - 故障恢复:启用checkpoint机制后,当Driver失败后,可以快速的从故障中恢复。
    Driver失败时,当前Kafka的读取的offset也会保存下来,恢复的时候会从此offset继续处理,保证消息不会丢失,并且读取处理一次。

    def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
    ssc: StreamingContext,
    kafkaParams: Map[String, String],
    topics: Set[String]
    ): InputDStream[(K, V)] = {
    //创建消息处理
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

    (for {
    topicPartitions <- kc.getPartitions(topics).right
    leaderOffsets <- (if (reset == Some("smallest")) {
    //根据topic和partition得到相应的leader节点,并且读取offset的最小值(最小值可能不是0)
    kc.getEarliestLeaderOffsets(topicPartitions)
    } else {
    //根据topic和partition得到相应的leader节点,并且读取offset的最大值,然后此stream只会处理kafka的新增消息,有点想tail命令
    kc.getLatestLeaderOffsets(topicPartitions)
    }).right
    } yield {
    val fromOffsets = leaderOffsets.map { case (tp, lo) =>
    (tp, lo.offset)
    }
    //根据ssc、offsets等来创建stream
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
    ssc, kafkaParams, fromOffsets, messageHandler)
    }).fold(
    errs => throw new SparkException(errs.mkString(" ")),
    ok => ok
    )
    }

     生成的DirectKafkaInputDStream

    class DirectKafkaInputDStream[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[K]: ClassTag,
    T <: Decoder[V]: ClassTag,
    R: ClassTag](
    @transient ssc_ : StreamingContext,
    val kafkaParams: Map[String, String],
    val fromOffsets: Map[TopicAndPartition, Long],
    messageHandler: MessageAndMetadata[K, V] => R
    ) extends InputDStream[R](ssc_) with Logging {
    val maxRetries = context.sparkContext.getConf.getInt(
    "spark.streaming.kafka.maxRetries", 1)

    //创建checkpoint的数据
    protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
    ...

    //当前kafka的偏移量,每批消息处理完毕之后,都将此offset进行更新。,此offset也会保存在checkpointData中,如果Driver失败,可以恢复,保证消息不会被遗漏。
    protected var currentOffsets = fromOffsets

    //获取kafka每个topic的每个partition的leader,并且读取每个leader已经最大的offset。
    //如果kafka的leader已经发生变更(由于机器故障灯),也可以尽快的发现
    @tailrec
    protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
    // Either.fold would confuse @tailrec, do it manually
    if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
    throw new SparkException(err)
    } else {
    log.error(err)
    Thread.sleep(kc.config.refreshLeaderBackoffMs)
    latestLeaderOffsets(retries - 1)
    }
    } else {
    o.right.get
    }
    }
    ...

    //stream的计算,根据currentOffsets和untilOffsets生成RDD
    override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
    }

    ...


    private[streaming]
    class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
    def batchForTime = data.asInstanceOf[mutable.HashMap[
    Time, Array[OffsetRange.OffsetRangeTuple]]]

    override def update(time: Time) {
    batchForTime.clear()
    generatedRDDs.foreach { kv =>
    val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
    batchForTime += kv._1 -> a
    }
    }

    override def cleanup(time: Time) { }

    //从失败中恢复的时候,需要重新计算generatedRDDs
    override def restore() {
    // this is assuming that the topics don't change during execution, which is true currently
    val topics = fromOffsets.keySet
    val leaders = kc.findLeaders(topics).fold(
    errs => throw new SparkException(errs.mkString(" ")),
    ok => ok
    )

    batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
    logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
    generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
    }
    }
    }

    }

  • 相关阅读:
    中国石油昆仑加油卡
    157 01 Android 零基础入门 03 Java常用工具类01 Java异常 01 异常介绍 02 异常内容简介
    156 01 Android 零基础入门 03 Java常用工具类01 Java异常 01 异常介绍 01 Java常用工具类简介
    155 01 Android 零基础入门 02 Java面向对象 07 Java多态 07 多态知识总结 01 多态总结
    154 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 05 匿名内部类
    153 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 04 方法内部类
    152 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 03 静态内部类
    151 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 02 成员内部类
    150 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类概述 01 内部类概述
    149 01 Android 零基础入门 02 Java面向对象 07 Java多态 05 接口(重点)07 接口的继承
  • 原文地址:https://www.cnblogs.com/luckuan/p/4479268.html
Copyright © 2011-2022 走看看