zoukankan      html  css  js  c++  java
  • Kafka consumer Job异常重置offset

    一、业务场景

    Kafka consumer 任务出现异常的时候如何保证数据的质量?在以往的经验中,为了保证数据的精准一次,使用mysql表记录下程序异时数据的partition和offset,任务重启的时候查询下mysql 表中是否有程序异常的记录,如果有就从mysql表中取出对应partition的offset,重置consumer的消费。以下案例为使用Kafka consumer消费kafka的数据,ETL之后写入HBase。

    二、代码实践

      1 package scala.com.qsq.report.consumer
      2 
      3 import java.sql.ResultSet
      4 import java.text.SimpleDateFormat
      5 import java.util
      6 import java.util.{Date, Properties}
      7 import com.qsq.config.LoadConfig
      8 import com.qsq.utils.hbase.HbaseClientObj
      9 import com.qsq.utils.jdbc.C3p0Pools
     10 import com.qsq.utils.JsonUtils
     11 import com.qsq.utils.constant.Constants
     12 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
     13 import kafka.consumer._
     14 
     15 import scala.collection.mutable.ArrayBuffer
     16 
     17 object MyConsumer {
     18 
     19   def main(args: Array[String]): Unit = {
     20     val HBASE_A_RT_CREDIT = "bee:a_user"
     21 
     22     val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
     23 
     24     // kafka参数
     25     val props = new Properties()
     26     val zk = LoadConfig.getProperties(Constants.START_ENV_REALTIME, "kafka.zookeeper.quorum")
     27     props.put("zookeeper.connect", zk)
     28     props.put("group.id", "call_group")
     29     props.put("auto.offset.reset", "largest")
     30     props.put("fetch.message.max.bytes", "50000000")
     31     props.put("replica.fetch.max.bytes", "50000000")
     32     val config = new ConsumerConfig(props)
     33 
     34     // 创建consumer
     35     val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config)
     36     var hashMap = new util.HashMap[TopicAndPartition, OffsetAndMetadata]()
     37     val conn = C3p0Pools.getConnection()
     38 
     39     // 查询mysql表中记录
     40     val res: ResultSet = C3p0Pools.query(conn, """ SELECT * FROM shop.kafka_topic_info WHERE topics = ? AND type = 1 """, Array("u-rall"))
     41 
     42     // 有上次失败的记录
     43     while ( res.next() ) {
     44       println("恢复topic : " + res.getString("topics"))
     45       println("恢复partition: " + res.getInt("partitions"))
     46       println("恢复offset: " + res.getLong("offsets"))
     47       hashMap.put(TopicAndPartition(res.getString("topics") ,res.getInt("partitions")), OffsetAndMetadata( res.getLong("offsets") ))
     48     }
     49     conn.close()
     50 
     51     if (!hashMap.isEmpty) {
     52       println("恢复offset---------------------- " )
     53       consumer.commitOffsets( hashMap, true )
     54     }
     55 
     56     registerShutdownHook()
     57     // 开启3个线程
     58     run(3)
     59     def run(numThread: Int) = {
     60 
     61       println("run----------------------")
     62       val topicMap = new util.HashMap[String, Integer]()
     63       topicMap.put("u-rall", numThread)
     64       val decoder = new kafka.serializer.StringDecoder(null)
     65       val topicStreams = consumer.createMessageStreams(topicMap, decoder, decoder)
     66       val consumerStreams = topicStreams.values().iterator()
     67       while (consumerStreams.hasNext) {
     68         val streams: util.List[KafkaStream[String, String]] = consumerStreams.next()
     69         (0 until streams.size()).foreach(i => {
     70           val stream = streams.get(i).iterator
     71           new Thread(new Runnable {
     72             override def run(): Unit = {
     73               while (stream.hasNext()) {
     74                 val mam = stream.next
     75                 val message: String = mam.message()
     76 
     77                 try {
     78                   if (message.size > 0) {
     79 
     80                     val jsonMsgObj = JsonUtils.getObjectFromJson(message)
     81                     val id = jsonMsgObj.getOrDefault("id", "").toString
     82                     val identity = jsonMsgObj.getOrDefault("identity", "").toString
     83                     val dataMsg = ArrayBuffer[(String, AnyRef)]()
     84                     dataMsg += (("id", id))
     85                     dataMsg += (("identity", identity))
     86                     dataMsg += (("data", message))
     87                     dataMsg += (("create_time", dateFormat.format(new Date())))
     88 
     89                     HbaseClientObj.getInstance().init(HBASE_A_RT_CREDIT)
     90                     HbaseClientObj.getInstance().put(id, "cf", dataMsg)
     91 
     92                     // 记录消息信息
     93                     val partition: Int = mam.partition
     94                     println("partition = " + partition + "  time: " + dateFormat.format(new Date()))
     95                     val offset: Long = mam.offset
     96                     println("offset = " + offset + "  time: " + dateFormat.format(new Date()))
     97                     val topic:String = mam.topic
     98                     println("topic = " + topic + "  time: " + dateFormat.format(new Date()))
     99 
    100                     try {
    101                       // 更新mysql
    102                       C3p0Pools.execute(
    103                         """
    104                           |INSERT INTO shop.kafka_topic_info
    105                           |( type, topics, partitions, offsets, create_date, update_date )
    106                           |VALUES
    107                           |( '1', ?, ?, ?, NOW(), NOW()  )
    108                           |ON DUPLICATE KEY UPDATE partitions = VALUES(partitions), offsets = VALUES(offsets), update_date = NOW()
    109                         """.stripMargin, Array(topic, partition, offset))
    110                     } catch {
    111                       case e: Exception =>
    112                         println( s"failed save to mysql ${e}" )
    113                     }
    114 
    115                   }
    116                 } catch {
    117                   case e: Exception =>
    118                     e.printStackTrace()
    119                     println(s"failed consumer message ${e}")
    120 
    121                 }
    122               }
    123             }
    124           }).start()
    125         })
    126       }
    127     }
    128 
    129 
    130     def release(): Unit = {
    131       try {
    132         println("release consumer...")
    133         consumer.shutdown
    134       } catch {
    135         case e: Exception => println(s"failed release consumer ${e}")
    136       }
    137     }
    138 
    139     def registerShutdownHook(): Unit = {
    140       Runtime.getRuntime.addShutdownHook(new Thread() {
    141         override def run(): Unit = {
    142           release
    143         }
    144       })
    145     }
    146 
    147     Thread.sleep(10000)
    148   }
    149 }

    三、总结

    使用Kafka consumer的好处是比较轻量级,在数据量可控的情况下,占用资源少,采用mysql来记录异常的offset信息虽然带来额外的系统开销,却能使数据更加可靠,可以指定从任意的offset开始消费,方便灵活。

  • 相关阅读:
    luogu P1144 最短路计数
    luogu P1440 求m区间内的最小值
    cogs 1075. [省常中2011S4] 最短路径问题
    luogu P2485 [SDOI2011]计算器
    luogu P1220 关路灯
    笨小猴 2008年NOIP全国联赛提高组
    [CF580E]Kefa and Watch
    [HDU2138]How many prime numbers
    [NOIp2014提高组]解方程
    [洛谷1390]公约数的和
  • 原文地址:https://www.cnblogs.com/zfwwdz/p/13163543.html
Copyright © 2011-2022 走看看