zoukankan      html  css  js  c++  java
  • SparkStreaming Kafka 维护offset

    文章目录


    生产SparkStreaming数据零丢失实验

    SparkStreaming Kafka 维护offset 官网有三种实现方式

    • Checkpoints
    • Kafka itself
    • Your own data store

    这里第一种方式不推荐使用,这里只介绍后面两种方式

    Kafka itself

    代码分析

    object MyKafkaConsumer {
    
      def main(args: Array[String]): Unit = {
    
        val ssc = StreamingContestUtils.getStreamingContext(this.getClass.getSimpleName)
    
        val TOPIC= "spark_test2"
        val groupId = "spark_test2_group"
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "kafka01:9092,kafka02:9092,kafka03:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> groupId,
          "auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费
          "enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false
        )
    
        val topics = Array(TOPIC)
    
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,//均匀分发到executor
          Subscribe[String, String](topics, kafkaParams)
        )
    
    
        stream.foreachRDD({ rdd =>
    
          println(s"-----rdd.partitions.size------------------ ${rdd.partitions.size}")
            
        // 获取当前批次的offset数据
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition { iter =>
            val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
          }
    
          //TODO ... 业务处理
    
          // 在kafka 自身维护提交
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    
        })
        ssc.start()
        ssc.awaitTermination()
      }
    
    }

    初次启动程序:
    -----rdd.partitions.size------------------ 1
    spark_test2 0 0 44
    -----rdd.partitions.size------------------ 1
    spark_test2 0 44 44
    -----rdd.partitions.size------------------ 1
    spark_test2 0 44 44

    spark UI 图

    可以看出第一个批次是有所有kafka的数据的。

    停止程序,重启后
    -----rdd.partitions.size------------------ 1
    spark_test2 0 44 44
    -----rdd.partitions.size------------------ 1
    spark_test2 0 44 44
    -----rdd.partitions.size------------------ 1
    spark_test2 0 44 44

    重启后,是没有重头去消费的。

    own data store for MySql

    mysql 维护offset公共方法

    /**
      * 建表语句
      *
      * create table kafka_offset(
        topic varchar(30),
        group_id varchar(30),
        partition_id int(5),
        fromOffset bigint(18),
        untilOffset bigint(18),
        primary key(topic,group_id,partition_id)
        );
      *
      */
    object MysqlOffsetManager extends OffsetManager {
      /**
        * 存储偏移量
        *
        * @param topic
        * @param groupId
        * @param partitionId
        * @param fromoffset
        * @param untilOffset
        */
      override def storeOffsets(topic: String, groupId: String, partitionId: Int, fromoffset: Long,untilOffset:Long): Unit = {
    
        DBs.setupAll()
    
        DB.autoCommit(implicit session => {
          SQL("replace into kafka_offset values(?,?,?,?,?)")
            .bind(topic,groupId,partitionId,fromoffset,untilOffset)
            .update()
            .apply()
        })
      }
    
      /**
        * 获取偏移量
        *
        * @param topic
        * @param groupId
        * @return
        */
    
      override def obtainOffsets(topic: String, groupId: String): Map[TopicPartition, Long] ={
    
       DBs.setupAll()
       val offsetList = DB.readOnly(implicit session => {
    
          SQL("select topic,partition_id,untilOffset from kafka_offset where topic = ? and group_id = ? ")
            .bind(topic, groupId)
            .map(x => {
              (x.string("topic"),x.int("partition_id"), x.long("untilOffset"))
            }).toList()
            .apply()
        })
        offsetList.map( x => {
          new TopicPartition(topic,x._2) -> x._3
        }).toMap
      }
    }

    业务处理代码:

    def main(args: Array[String]): Unit = {
    
        val ssc = StreamingContestUtils.getStreamingContext(this.getClass.getSimpleName)
    
        val TOPIC= "spark_test2"
        val groupId = "spark_test3_group"
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "kafka01:9092,kafka02:9092,kafka03:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> groupId,
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array(TOPIC)
    
        val fromOffset = MysqlOffsetManager.obtainOffsets(TOPIC,groupId)
    
        fromOffset.foreach(println)
    
        val stream = if (fromOffset.isEmpty){
          println("从头开始消费...")
    
          KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,//均匀分发到executor
            Subscribe[String, String](topics, kafkaParams)
          )
        }else{
          println("从已存在记录开始消费...")
    
          KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,//均匀分发到executor
            Subscribe[String, String](topics, kafkaParams,fromOffset)
          )
        }
        var offsetRanges:Array[OffsetRange] = Array.empty
        stream.transform(rdd => {
    
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition{iter =>
            val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
          }
          rdd
        }).map(x =>
          (x.key(),x.value())
        ).foreachRDD(rdd => {
          //遍历不同分区的offset信息,并更新在MySQL中
          offsetRanges.foreach(x => {
            println(s"${x.topic},${groupId},${x.partition},${x.fromOffset},${x.untilOffset}")
    
            MysqlOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset)
    
          })
        }
        )
        ssc.start()
        ssc.awaitTermination()
      }

    打印结果:
    (spark_test2-0,55)
    从已存在记录开始消费…
    spark_test2,spark_test3_group,0,55,66
    spark_test2,spark_test3_group,0,66,66
    spark_test2,spark_test3_group,0,66,66

    维护成功

    own data store for Redis

    override def storeOffsets(topic: String, groupId: String, partition: Int, fromoffset: Long, untilOffset: Long): Unit = {
    
        val jedis = RedisUtils.getConnection()
        val key = topic+"_"+groupId
    
        jedis.hset(key,partition.toString,untilOffset.toString)
    
        jedis.close()
    
      }
    
      override def obtainOffsets(topic: String, groupId: String): Map[TopicPartition, Long] ={
    
        val jedis = RedisUtils.getConnection()
        val key = topic+"_"+groupId
    
        val offsets = jedis.hgetAll(key)
    
        import scala.collection.JavaConversions._
    
        val fromOffsets = offsets.toMap.map(x => {
          new TopicPartition(topic, x._1.toInt) -> x._2.toLong
        })
    
        fromOffsets
    
      }

    业务代码

    val fromOffset = RedisOffsetManager.obtainOffsets(TOPIC,groupId)
    
        fromOffset.foreach(println)
    
        val stream = if (fromOffset.isEmpty){
          println("从头开始消费...")
    
          KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,//均匀分发到executor
            Subscribe[String, String](topics, kafkaParams)
          )
        }else{
          println("从已存在记录开始消费...")
    
          KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,//均匀分发到executor
            Subscribe[String, String](topics, kafkaParams,fromOffset)
          )
        }
    
        var offsetRanges:Array[OffsetRange] = Array.empty
        stream.transform(rdd => {
    
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition{iter =>
            val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
          }
          rdd
        }).map(x =>
          (x.key(),x.value())
        ).foreachRDD(rdd => {
          //遍历不同分区的offset信息,并更新在MySQL中
          offsetRanges.foreach(x => {
            println(s"${x.topic},${groupId},${x.partition},${x.fromOffset},${x.untilOffset}")
    
    
            RedisOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset)
    
           // MysqlOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset)
    
          })
        }
        )

    第一次启动程序:
    从头开始消费…
    spark_test2,spark_test4_group,0,0,66
    spark_test2,spark_test4_group,0,66,66
    spark_test2,spark_test4_group,0,66,66

    第二次启动程序:

    从已存在记录开始消费…
    spark_test2,spark_test4_group,0,66,66
    spark_test2,spark_test4_group,0,66,66
    spark_test2,spark_test4_group,0,66,66
    spark_test2,spark_test4_group,0,66,66

    offset 维护成功。

    生产SparkStreaming数据零丢失实验成功。

    由于kafka 是非事务性的,有时候,业务数据提交失败了,但是offset维护成功了,也会导致丢数据,
    要做到精准一次性消费语义,还需要采用别的方式方法。
    比如 将offset 和业务数据捆绑一起提交,做到要么一起成功,要么一起失败。

    转载于:https://blog.csdn.net/kzw11/article/details/102978350

  • 相关阅读:
    C# TryParse
    C#委托的学习笔记
    C#基础学习C# 8.0 In a Nut Shell
    Everything学习之三
    Everything学习笔记二
    搜索软件everything帮助文档全文翻译
    Git笔记之基础命令
    Git学习笔记
    附加属性
    日期函数
  • 原文地址:https://www.cnblogs.com/it-deepinmind/p/14304497.html
Copyright © 2011-2022 走看看