zoukankan      html  css  js  c++  java
  • spark/kafka的集成

    Spark1.3中新增DirectStream处理Kafka的消息。使用方法如下:

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    ssc:StreamContext
    kafkaParams:Kafka的参数,包括Kafka的Brokers等
    topicsSet:要读取的topic。

    此方法创建一个input steam,通过直接从Kafka的Brokers读取消息,而不是创建任何的receiver(即不通过kafka的高级api来读取消息)
    此stream保证Kafka的消息只会被处理一次。
      * - 无需receivers:该Steam不创建任何receiver。它直接查询Kafka的offset(偏移量),不需要通过zookeeper来保存已经消费过的offset。
    当然,现有很多kafka的监控工具都会读取Zookeeper的数据,因此,如果你想继续使用Kafka 的监控工具,需要自己实现代码来更新Zookeeper的offset。
    可以参考 org.apache.spark.streaming.kafka.HasOffsetRanges
      * - 故障恢复:启用checkpoint机制后,当Driver失败后,可以快速的从故障中恢复。
    Driver失败时,当前Kafka的读取的offset也会保存下来,恢复的时候会从此offset继续处理,保证消息不会丢失,并且读取处理一次。

    def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
    ssc: StreamingContext,
    kafkaParams: Map[String, String],
    topics: Set[String]
    ): InputDStream[(K, V)] = {
    //创建消息处理
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

    (for {
    topicPartitions <- kc.getPartitions(topics).right
    leaderOffsets <- (if (reset == Some("smallest")) {
    //根据topic和partition得到相应的leader节点,并且读取offset的最小值(最小值可能不是0)
    kc.getEarliestLeaderOffsets(topicPartitions)
    } else {
    //根据topic和partition得到相应的leader节点,并且读取offset的最大值,然后此stream只会处理kafka的新增消息,有点想tail命令
    kc.getLatestLeaderOffsets(topicPartitions)
    }).right
    } yield {
    val fromOffsets = leaderOffsets.map { case (tp, lo) =>
    (tp, lo.offset)
    }
    //根据ssc、offsets等来创建stream
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
    ssc, kafkaParams, fromOffsets, messageHandler)
    }).fold(
    errs => throw new SparkException(errs.mkString(" ")),
    ok => ok
    )
    }

     生成的DirectKafkaInputDStream

    class DirectKafkaInputDStream[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[K]: ClassTag,
    T <: Decoder[V]: ClassTag,
    R: ClassTag](
    @transient ssc_ : StreamingContext,
    val kafkaParams: Map[String, String],
    val fromOffsets: Map[TopicAndPartition, Long],
    messageHandler: MessageAndMetadata[K, V] => R
    ) extends InputDStream[R](ssc_) with Logging {
    val maxRetries = context.sparkContext.getConf.getInt(
    "spark.streaming.kafka.maxRetries", 1)

    //创建checkpoint的数据
    protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
    ...

    //当前kafka的偏移量,每批消息处理完毕之后,都将此offset进行更新。,此offset也会保存在checkpointData中,如果Driver失败,可以恢复,保证消息不会被遗漏。
    protected var currentOffsets = fromOffsets

    //获取kafka每个topic的每个partition的leader,并且读取每个leader已经最大的offset。
    //如果kafka的leader已经发生变更(由于机器故障灯),也可以尽快的发现
    @tailrec
    protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
    // Either.fold would confuse @tailrec, do it manually
    if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
    throw new SparkException(err)
    } else {
    log.error(err)
    Thread.sleep(kc.config.refreshLeaderBackoffMs)
    latestLeaderOffsets(retries - 1)
    }
    } else {
    o.right.get
    }
    }
    ...

    //stream的计算,根据currentOffsets和untilOffsets生成RDD
    override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
    }

    ...


    private[streaming]
    class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
    def batchForTime = data.asInstanceOf[mutable.HashMap[
    Time, Array[OffsetRange.OffsetRangeTuple]]]

    override def update(time: Time) {
    batchForTime.clear()
    generatedRDDs.foreach { kv =>
    val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
    batchForTime += kv._1 -> a
    }
    }

    override def cleanup(time: Time) { }

    //从失败中恢复的时候,需要重新计算generatedRDDs
    override def restore() {
    // this is assuming that the topics don't change during execution, which is true currently
    val topics = fromOffsets.keySet
    val leaders = kc.findLeaders(topics).fold(
    errs => throw new SparkException(errs.mkString(" ")),
    ok => ok
    )

    batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
    logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
    generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
    }
    }
    }

    }

  • 相关阅读:
    使用Jackson对字典翻译
    通过代理的方式实现对httpClient的监控,超时回调
    mapstruct 1.4.2和lombok 1.18.16之后版本,报错和mapstruct生成空的实现
    jenkins + sonar 中文文件名报错解决
    linux 启动jar脚本
    easyUI关键(常见)组件详解
    shiro三连斩之概念
    SpringMVC请求参数总结
    springboot jsp,过滤器,拦截器
    CentOS添加使用
  • 原文地址:https://www.cnblogs.com/luckuan/p/4479268.html
Copyright © 2011-2022 走看看