zoukankan      html  css  js  c++  java
  • Spark Streaming 整合Kafka的 Offset 管理 【数据零丢失之 MySQL管理Offset】

    写在前面:

    在使用SparkStreaming 整合 Kafka 0.8版本的时候, spark-streaming-kafka-0-8 是不提供offset的管理的。为了保证数据零丢失,我们需要自己来管理这个偏移量。

    参照:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

    我们是将偏移量储存在MySQL中进行管理,

    快速入门使用scalikejdbc 操作MySQL:

    1.导入依赖

          <!--scalikejdbc-config_2.11-->
          <dependency>
              <groupId>org.scalikejdbc</groupId>
              <artifactId>scalikejdbc-config_2.11</artifactId>
              <version>2.5.0</version>
          </dependency>
    

    2.resource文件中新建application.conf文件,配置如下

    # MySQL example  本地
    db.default.driver="com.mysql.jdbc.Driver"
    db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
    db.default.user="root"
    db.default.password="123456"
    

    3.Scalikejdbc Demo

    package com.csylh.logAnalysis.scalikejdbc
    
    import scalikejdbc.{DB, SQL}
    import scalikejdbc.config.DBs
    
    
    /**
      * scala中连接mysql,使用ScaLikeJdbc框架进行数据的增删改查
      */
    object ScaLikeJdbcApp {
    
      def main(args: Array[String]): Unit = {
    
        //解析application.conf的文件
        DBs.setup()
        //DBs.setupAll()
        DB.autoCommit {
            implicit session =>
            SQL("insert into people(name,age,fv) values(?,?,?)")
              .bind("留歌", 22, 88)
              .update().apply()
            }
        }
    
        def delete() = {
          DB.autoCommit {
            implicit session =>
            SQL("delete from people where name = ?")
              .bind("留歌")
              .update().apply()
          }
        }
    
    
        def update() {
          DB.autoCommit { implicit session =>
            SQL("update people set age = ? where name = ?")
              .bind(18, "留歌")
              .update().apply()
          }
        }
    
      /**
        * select查询到数据之后会产生一个rs的对象集,然后可以得到这个对象集里面的数据
        */
      def select() = {
          DB.readOnly {
            implicit session =>
            val sql = SQL("select * from people ").map(rs =>
              (rs.string("name"), rs.int("age"))
            ).toList().apply()
          }
        }
    
    }
    

    零丢失数据解决方案代码如下:

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scalikejdbc.config.DBs
    
    /**
      * Description: Spark Streaming 整合Kafka的 偏移量 管理
      *     其实可以在 MySQL/zk/kafka/hbase/redis ...中存储我们的的offset偏移量数据的
      *     这里选用MySQL + scalikejdbc
      *
      * @Author: 留歌36
      * @Date: 2019/8/8 11:24
      */
    object OffsetApp {
      def main(args:Array[String]){
    
        val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        val kafkaParams = Map(
          "metadata.broker.list"->"192.168.1.116:9092",
          "group.id"->"liuge.group.id" ,// 消费的时候是以组为单位进行消费
          "auto.offset.reset" -> "smallest"
        )
        val topics = "test".split(",").toSet
        //  步骤一:先获取offset
        import scalikejdbc._
        /**
          * tuple 转 map
          * ().list.apply().toMap
          */
        DBs.setup()
        val fromOffsets =
          DB.readOnly {
            implicit session => {
              SQL("select * from offsets_storage ").map(rs =>
                (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
              ).list().apply()
            }
        }.toMap
    
        for (ele <- fromOffsets){
          println("读取MySQL偏移量相关数据: " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
        }
    
        // 步骤二: Direct 模式对接Kafka ,得到InputDStream
        val stream = if (fromOffsets.isEmpty){
          // 第一次进来,进行消费Kafka ==> stream
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
    
        }else{ // 非第一次,进行消费Kafka ==> stream
          val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    
        }
        // 步骤三: 业务逻辑 + 保存offset
        stream.foreachRDD(rdd => {
    
          // TODO.. 3.1这里是你的业务逻辑代码,简单count()为例
          println("留歌本轮的统计结果:" + rdd.count())
    
    
          // 3.2这里是保存offset数据的代码
          var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          for (o <- offsetRanges) {
            println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
    
            DB.autoCommit{
              implicit session => {
                SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
                  .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
              }
            }
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    到这里,我们就能够比较好的去消费Kafka的数据了。

    有任何不对的地方,欢迎指证!谢谢~~

    KafkaProducer【java版】模拟数据

  • 相关阅读:
    洛谷P1508 Likecloud-吃、吃、吃 [2017年4月计划 动态规划10]
    洛谷P1855 榨取kkksc03 [2017年4月计划 动态规划 09]
    洛谷P1164 小A点菜 [2017年4月计划 动态规划08]
    洛谷P1244 [NOI2000] 青蛙过河 [2017年4月计划 动态规划07]
    洛谷P1757 通天之分组背包 [2017年4月计划 动态规划06]
    洛谷P1877 [HAOI2012]音量调节 [2017年4月计划 动态规划05]
    洛谷P1474 [USACO 2.3]货币系统 Money Systems [2017年4月计划 动态规划04]
    洛谷P1832 A+B Problem(再升级) [2017年4月计划 动态规划03]
    洛谷P1968 美元汇率[2017年4月计划 动态规划02]
    洛谷P2347 砝码称重 [2017年4月计划 动态规划01]
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614728.html
Copyright © 2011-2022 走看看