zoukankan      html  css  js  c++  java
  • spark streaming中维护kafka偏移量到外部介质

    spark streaming中维护kafka偏移量到外部介质

    以kafka偏移量维护到redis为例。

    redis存储格式

    使用的数据结构为string,其中key为topic:partition,value为offset

    例如bobo这个topic下有3个分区,则key-value结构如下:

    • bobo:0的偏移量为x
    • bobo:1的偏移量为y
    • bobo:2的偏移量为z

    消费时指定offset

    主要是如下两个方法:

    • createKafkaStream()创建kakfa流
    • getOffsets()从redis中获取offsets
    /**
      * kakfa参数
      */
    private val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      // 注意这里是none。
      "auto.offset.reset" -> "none",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    // `bobo`topic下有3个分区
    private val topicPartitions = Map[String, Int]("bobo" -> 3)
    
    // 从redis中获取offsets
    def getOffsets: Map[TopicPartition, Long] = {
      val jedis = InternalRedisClient.getResource
    
      // 设置每个分区起始的offset
      val offsets = mutable.Map[TopicPartition, Long]()
    
      topicPartitions.foreach { it =>
        val topic = it._1
        val partitions = it._2
        // 遍历分区,设置每个topic下对应partition的offset
        for (partition <- 0 until partitions) {
          val topicPartitionKey = topic + ":" + partition
          var lastOffset = 0L
          val lastSavedOffset = jedis.get(topicPartitionKey)
    
          if (null != lastSavedOffset) {
            try {
              lastOffset = lastSavedOffset.toLong
            } catch {
              case e: Exception =>
                log.error("get lastSavedOffset error", e)
                System.exit(1)
            }
          }
          log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)
    
          // 添加
          offsets += (new TopicPartition(topic, partition) -> lastOffset)
        }
      }
    
      InternalRedisClient.returnResource(jedis)
    
      offsets.toMap
    }
    
    /**
      * 创建kakfa流
      *
      * @param ssc StreamingContext
      * @return InputDStream
      */
    def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
      val offsets = getOffsets
    
      // 创建kafka stream
      val stream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
      )
      stream
    }
    

    其中:核心是通过ConsumerStrategies.Assign方法来指定topic下对应partitionoffset信息。

    更新offset到redis

    最后将offset信息维护到redis即可。

    /**
      * 消费
      *
      * @param stream InputDStream
      */
    def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
      stream.foreachRDD { rdd =>
        // 获取offset信息
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
        // 计算相关指标,这里就统计下条数了
        val total = rdd.count()
    
        val jedis = InternalRedisClient.getResource
        val pipeline = jedis.pipelined()
        // 会阻塞redis
        pipeline.multi()
    
        // 更新相关指标
        pipeline.incrBy("totalRecords", total)
    
        // 更新offset
        offsetRanges.foreach { offsetRange =>
          log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
          val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
          pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
        }
    
        // 执行,释放
        pipeline.exec()
        pipeline.sync()
        pipeline.close()
        InternalRedisClient.returnResource(jedis)
      }
    }
    

    参考

    spark代码

    顺便贴一下自己整理的spark相关的代码。

    Github地址:spark-programming

    主要包括:

    • RDD的基本使用
    • SQL
      • jdbc(读、写)
      • hive(读、写、动态分区)
    • Streaming
      • 消费kafka(手动提交、手动维护offset)
      • 写入HBase
      • 写入Hive
  • 相关阅读:
    Construction构造函数
    映射验证
    映射设置
    条件映射
    映射前和映射后的操作
    AutoMapper 5.0-升级指南
    Bootstrap Tree View
    MiniProfiler使用笔记
    关于添加数据自定义编号格式问题
    【Postgresql】数据库函数
  • 原文地址:https://www.cnblogs.com/bener/p/10651054.html
Copyright © 2011-2022 走看看