zoukankan      html  css  js  c++  java
  • 数据零丢失 + 仅一次消费数据【终极方案】

    
    import java.sql.{DriverManager, ResultSet}
    
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scalikejdbc.config.DBs
    
    /**
      *  数据零丢失  + 仅一次消费数据  <====  MySQL 管理Offset
      *
      * 使用的0.8版本  spark-streaming-kafka-0-8_2.11
      *
      *
      * @Author: 留歌36
      * @Date: 2019/8/15 10:40
      */
    object Offset05App {
      def main(args: Array[String]): Unit ={
        val conf = new SparkConf().setAppName("Offset05App").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        val kafkaParams = Map[String, String](
          "metadata.broker.list"->"xxx:9092",
                  "auto.offset.reset" -> "smallest"
        )
        val topics = "exactlyonce_topic2".split(",").toSet
    
        // TODO... 获取偏移量 ==> MySQL
        // =====================获取offset的模板==================================
        //  步骤一:先获取offset
        import scalikejdbc._
        /**
          * tuple 转 map
          * ().list.apply().toMap
          */
        DBs.setup()
        val fromOffsets =
          DB.readOnly {
            implicit session => {
              SQL("select * from exactlyonce_topic2").map(rs =>
                (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
              ).list().apply()
            }
          }.toMap
    
        for (ele <- fromOffsets){
          println("读取MySQL偏移量相关数据==>topic:  " + ele._1.topic + "  partition:" + ele._1.partition +"  offset:"+ele._2)
        }
        //=======================================================
        // 步骤二: Direct 模式对接Kafka ,得到InputDStream
        val stream = if (fromOffsets.isEmpty){ // 从头消费
            KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }else { // 从指定偏移量进行消费
    
          val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
    
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    
        }
        // 步骤三: 业务逻辑 + 保存offset
        stream.foreachRDD(rdd => {
          // 假设这里就是业务逻辑   <== 步骤一:保存业务逻辑处理的结果数据
          if (!rdd.isEmpty()) {
            println(s"留歌本轮的统计结果:${rdd.count()}条数据" )
          }
          // 幂等  仅1次消费 :多次消费同一批数据,在外部存储显示为 update即可,类似upsert
          rdd.map(_._2).foreachPartition(partition => {
    
            val connection = DriverManager.getConnection("jdbc:mysql://xxx:3306/onlineloganalysis?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","xx")
    
            partition.foreach(msg => {
              if(StringUtils.isNoneEmpty(msg)){
    
                val temp = msg.split(",")
    
                if (temp.length == 3){
                  val minute = temp(0)
                  val domain = temp(1)
    
                  var traffic = 0L  // 考虑到流量这个值可能脏数据,无法toLong
                  try{
                    traffic = temp(2).trim.toLong  // 考虑到空格的情况
                  }catch {
                    case e:Exception => traffic = 0L
                  }
    
                  println(msg + "............")
                  // TODO... 处理完的结果应该要 “入库”  ==> MySQL
                  // 主键冲突 。。 how to slave ?? md5 or upsert
                  val sqlbefore = s"select count(1) as nums from traffic_m5 where m5 ='$minute' and domain= '$domain' "
                  val resultSet: ResultSet = connection.createStatement().executeQuery(sqlbefore)
                  if (resultSet.next()) {
                    val flag = resultSet.getInt("nums")
                    if (flag == 0){  // 不存在
                      val sql = s"insert into traffic_m5(m5, domain,traffic) values('$minute','$domain','$traffic')"
                      val stmt = connection.prepareStatement(sql)
                      stmt.execute()
                    }else {
                      val sql = s"update traffic_m5 set traffic ='$traffic' where m5='$minute' and domain='$domain' "
                      val stmt = connection.prepareStatement(sql)
                      stmt.execute()
                    }
                  }
                }
              }
            })
    
            connection.close()
          })
    
          // 将Offset 提交到外部存储保存  <==   步骤二:保存offsetr
          // ======================保存offset的模板==================================
          var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          for (o <- offsetRanges) {
            if (o.fromOffset != o.untilOffset){
              println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            }else {
              println("!该批次没有消费到数据")
            }
            DB.autoCommit{
              implicit session => {
                SQL("replace into exactlyonce_topic2(topic,groupid,partitions,offset) values(?,?,?,?)")
                  .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
              }
            }
          }
          // ========================================================
    
        })
    
        ssc.start() // 启动程序
        ssc.awaitTermination() // 等待程序终止
      }
    
    }
    
    

    补充:https://blog.csdn.net/weixin_41907511/article/details/84842815

  • 相关阅读:
    c语言指针讲解第一节初识指针
    linux的的一些入门常识
    sql手注的思路
    mysql主从备份配置
    CentOS 6.5 nginx+tomcat+ssl配置
    mysql 5.7.18安装教程
    minIO分布式集群搭建+nginx负载均衡
    Linux常用命令
    使用python连接mysql数据库——pymysql模块的使用
    with与上下文管理器
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614726.html
Copyright © 2011-2022 走看看