zoukankan      html  css  js  c++  java
  • Spark Streaming消费Kafka Direct保存offset到Redis,实现数据零丢失和exactly once

    一、概述

       上次写这篇文章文章的时候,Spark还是1.x,kafka还是0.8x版本,转眼间spark到了2.x,kafka也到了2.x,存储offset的方式也发生了改变,笔者根据上篇文章和网上文章,将offset存储到Redis,既保证了并发也保证了数据不丢失,经过测试,有效。

    二、使用场景

    Spark Streaming实时消费kafka数据的时候,程序停止或者Kafka节点挂掉会导致数据丢失,Spark Streaming也没有设置CheckPoint(据说比较鸡肋,虽然可以保存Direct方式的offset,但是可能会导致频繁写HDFS占用IO),所以每次出现问题的时候,重启程序,而程序的消费方式是Direct,所以在程序down掉的这段时间Kafka上的数据是消费不到的,虽然可以设置offset为smallest,但是会导致重复消费,重新overwrite hive上的数据,但是不允许重复消费的场景就不能这样做。

    三、原理阐述

    在Spark Streaming中消费 Kafka 数据的时候,有两种方式分别是 :

    1.基于 Receiver-based 的 createStream 方法。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。本文对此方式不研究,有兴趣的可以自己实现,个人不喜欢这个方式。KafkaUtils.createStream

    2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二种使用方式中  kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据,我使用的是这种方式。KafkaUtils.createDirectStream。本文将用代码说明如何将 kafka 中的 offset 保存到 Redis 中,以及如何从 Redis 中读取已存在的 offset。参数auto.offset.reset为latest的时候程序才会读取redis的offset。

    四、实现代码

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010._
    
    import scala.collection.JavaConverters._
    import scala.util.Try
    
    /**
      * Created by chouyarn of BI on 2018/8/21
      */
    object KafkaUtilsRedis {
      /**
        * 根据groupId保存offset
        * @param ranges
        * @param groupId
        */
      def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
        for (o <- ranges) {
          val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}"
          val value = o.untilOffset
          JedisUtil.set(key, value.toString)
        }
      }
    
      /**
        * 根据topic,groupid获取offset
        * @param topics
        * @param groupId
        * @return
        */
      def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = {
        val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]()
    
        topics.foreach(topic => {
          val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*")
          if (!keys.isEmpty) {
            keys.asScala.foreach(key => {
              val offset = JedisUtil.get(key)
              val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
              fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong)
            })
          }
        })
        if (fromOffSets.isEmpty) {
          (fromOffSets.toMap, 0)
        } else {
          (fromOffSets.toMap, 1)
        }
      }
    
      /**
        * 创建InputDStream,如果auto.offset.reset为latest则从redis读取
        * @param ssc
        * @param topic
        * @param kafkaParams
        * @return
        */
      def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String],
                                      kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = {
        var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
        val groupId = kafkaParams.get("group.id").get
        val (fromOffSet, flag) = getOffset(topic, groupId.toString)
        val offsetReset = kafkaParams.get("auto.offset.reset").get
        if (flag == 1 && offsetReset.equals("latest")) {
          kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet))
        } else {
          kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe(topic, kafkaParams))
        }
        kafkaStreams
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(60))
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "group.id" -> "binlog.test.rpt_test_1min",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean),
          "session.timeout.ms" -> "20000",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer]
        )
        val topic = Array("binlog.test.rpt_test", "binlog.test.hbase_test", "binlog.test.offset_test")
        val groupId = "binlog.test.rpt_test_1min"
        val lines = createStreamingContextRedis(ssc, topic, kafkaParams)
        lines.foreachRDD(rdds => {
          if (!rdds.isEmpty()) {
            println("##################:" + rdds.count())
          }
          storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    五、JedisUtil代码

    import java.util
    
    import com.typesafe.config.ConfigFactory
    import org.apache.kafka.common.serialization.StringDeserializer
    import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool, JedisPoolConfig}
    
    object JedisUtil {
      private val config = ConfigFactory.load("realtime-etl.conf")
    
      private val redisHosts: String = config.getString("redis.server")
      private val port: Int = config.getInt("redis.port")
    
      private val hostAndPortsSet: java.util.Set[HostAndPort] = new util.HashSet[HostAndPort]()
      redisHosts.split(",").foreach(host => {
        hostAndPortsSet.add(new HostAndPort(host, port))
      })
    
    
      private val jedisConf: JedisPoolConfig = new JedisPoolConfig()
      jedisConf.setMaxTotal(5000)
      jedisConf.setMaxWaitMillis(50000)
      jedisConf.setMaxIdle(300)
      jedisConf.setTestOnBorrow(true)
      jedisConf.setTestOnReturn(true)
      jedisConf.setTestWhileIdle(true)
      jedisConf.setMinEvictableIdleTimeMillis(60000l)
      jedisConf.setTimeBetweenEvictionRunsMillis(3000l)
      jedisConf.setNumTestsPerEvictionRun(-1)
    
      lazy val redis = new JedisCluster(hostAndPortsSet, jedisConf)
    
      def get(key: String): String = {
        try {
          redis.get(key)
        } catch {
          case e: Exception => e.printStackTrace()
            null
        }
      }
    
      def set(key: String, value: String) = {
        try {
          redis.set(key, value)
        } catch {
          case e: Exception => {
            e.printStackTrace()
          }
        }
      }
    
    
      def hmset(key: String, map: java.util.Map[String, String]): Unit = {
        //    val redis=pool.getResource
        try {
          redis.hmset(key, map)
        }catch {
          case e:Exception => e.printStackTrace()
        }
      }
    
      def hset(key: String, field: String, value: String): Unit = {
        //    val redis=pool.getResource
        try {
          redis.hset(key, field, value)
        } catch {
          case e: Exception => {
            e.printStackTrace()
          }
        }
      }
    
      def hget(key: String, field: String): String = {
        try {
          redis.hget(key, field)
        }catch {
          case e:Exception => e.printStackTrace()
            null
        }
      }
    
      def hgetAll(key: String): java.util.Map[String, String] = {
        try {
          redis.hgetAll(key)
        } catch {
          case e: Exception => e.printStackTrace()
            null
        }
      }
    }

    六、总结

    根据不同的groupid来保存不同的offset,支持多个topic

    七、exactly once方案

    准确的说也不是严格的方案,要根据实际的业务场景来配合。

    现在的方案是保存rdd的最后一个offset,我们可以考虑在处理完一个消息之后就更新offset,保存offset和业务处理做成一个事务,当出现Exception的时候,都进行回退,或者将出现问题的offset和消息发送到另一个kafka或者保存到数据库,另行处理错误的消息。代码demo如下

    val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(batchTime))
        val messages = KafkaOffsetUtils.createStreamingContextRedis(ssc, topic, kafkaParams)
        messages.foreachRDD(rdd => {
          rdd.foreach(msg => {
            val value = msg.value()
            try{
              //TODO 事务操作
              KafkaOffsetUtils.storeOffset(msg.topic(),msg.partition(),broadCastGroupId.value,msg.offset())
              println(value)
            }catch {
              case e:Exception => {
                e.printStackTrace()
                //TODO 出错幂等回滚
              }
            }
          })
        })
        ssc.start()
        ssc.awaitTermination()
  • 相关阅读:
    IOS中CocoaPods安装与使用
    解决 CocoaPods管理第三方库报错的问题
    Github上的600多个iOS开源类库
    VFL 可视化格式语言自动布局基础
    (转)WWDC2014之App Extensions学习笔记
    (转)iOS并发编程笔记,包含GCD,Operation Queues,Run Loops,如何在后台绘制UI,后台I/O处理,最佳安全实践避免互斥锁死锁优先级反转等,以及如何使用GCD监视进程文件文件夹,并发测试的方案等
    (转)轻松学习Objective-C消息转发
    IOS 获取IDFA以及判断是否越狱
    iOS 获取Wifi的SSID及MAC地址
    (转)黑幕背后的Autorelease
  • 原文地址:https://www.cnblogs.com/ChouYarn/p/9512102.html
Copyright © 2011-2022 走看看