zoukankan      html  css  js  c++  java
  • Spark Streaming 整合Kafka的 Offset 管理 【数据零丢失之 MySQL管理Offset】

    写在前面:

    在使用SparkStreaming 整合 Kafka 0.8版本的时候, spark-streaming-kafka-0-8 是不提供offset的管理的。为了保证数据零丢失,我们需要自己来管理这个偏移量。

    参照:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

    我们是将偏移量储存在MySQL中进行管理,

    快速入门使用scalikejdbc 操作MySQL:

    1.导入依赖

          <!--scalikejdbc-config_2.11-->
          <dependency>
              <groupId>org.scalikejdbc</groupId>
              <artifactId>scalikejdbc-config_2.11</artifactId>
              <version>2.5.0</version>
          </dependency>
    

    2.resource文件中新建application.conf文件,配置如下

    # MySQL example  本地
    db.default.driver="com.mysql.jdbc.Driver"
    db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
    db.default.user="root"
    db.default.password="123456"
    

    3.Scalikejdbc Demo

    package com.csylh.logAnalysis.scalikejdbc
    
    import scalikejdbc.{DB, SQL}
    import scalikejdbc.config.DBs
    
    
    /**
      * scala中连接mysql,使用ScaLikeJdbc框架进行数据的增删改查
      */
    object ScaLikeJdbcApp {
    
      def main(args: Array[String]): Unit = {
    
        //解析application.conf的文件
        DBs.setup()
        //DBs.setupAll()
        DB.autoCommit {
            implicit session =>
            SQL("insert into people(name,age,fv) values(?,?,?)")
              .bind("留歌", 22, 88)
              .update().apply()
            }
        }
    
        def delete() = {
          DB.autoCommit {
            implicit session =>
            SQL("delete from people where name = ?")
              .bind("留歌")
              .update().apply()
          }
        }
    
    
        def update() {
          DB.autoCommit { implicit session =>
            SQL("update people set age = ? where name = ?")
              .bind(18, "留歌")
              .update().apply()
          }
        }
    
      /**
        * select查询到数据之后会产生一个rs的对象集,然后可以得到这个对象集里面的数据
        */
      def select() = {
          DB.readOnly {
            implicit session =>
            val sql = SQL("select * from people ").map(rs =>
              (rs.string("name"), rs.int("age"))
            ).toList().apply()
          }
        }
    
    }
    

    零丢失数据解决方案代码如下:

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scalikejdbc.config.DBs
    
    /**
      * Description: Spark Streaming 整合Kafka的 偏移量 管理
      *     其实可以在 MySQL/zk/kafka/hbase/redis ...中存储我们的的offset偏移量数据的
      *     这里选用MySQL + scalikejdbc
      *
      * @Author: 留歌36
      * @Date: 2019/8/8 11:24
      */
    object OffsetApp {
      def main(args:Array[String]){
    
        val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        val kafkaParams = Map(
          "metadata.broker.list"->"192.168.1.116:9092",
          "group.id"->"liuge.group.id" ,// 消费的时候是以组为单位进行消费
          "auto.offset.reset" -> "smallest"
        )
        val topics = "test".split(",").toSet
        //  步骤一:先获取offset
        import scalikejdbc._
        /**
          * tuple 转 map
          * ().list.apply().toMap
          */
        DBs.setup()
        val fromOffsets =
          DB.readOnly {
            implicit session => {
              SQL("select * from offsets_storage ").map(rs =>
                (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
              ).list().apply()
            }
        }.toMap
    
        for (ele <- fromOffsets){
          println("读取MySQL偏移量相关数据: " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
        }
    
        // 步骤二: Direct 模式对接Kafka ,得到InputDStream
        val stream = if (fromOffsets.isEmpty){
          // 第一次进来,进行消费Kafka ==> stream
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)
    
        }else{ // 非第一次,进行消费Kafka ==> stream
          val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    
        }
        // 步骤三: 业务逻辑 + 保存offset
        stream.foreachRDD(rdd => {
    
          // TODO.. 3.1这里是你的业务逻辑代码,简单count()为例
          println("留歌本轮的统计结果:" + rdd.count())
    
    
          // 3.2这里是保存offset数据的代码
          var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          for (o <- offsetRanges) {
            println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
    
            DB.autoCommit{
              implicit session => {
                SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
                  .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
              }
            }
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    到这里,我们就能够比较好的去消费Kafka的数据了。

    有任何不对的地方,欢迎指证!谢谢~~

    KafkaProducer【java版】模拟数据

  • 相关阅读:
    左孩子右兄弟的字典树
    UVA 1401 Remember the Word
    HDOJ 4770 Lights Against Dudely
    UvaLA 3938 "Ray, Pass me the dishes!"
    UVA
    Codeforces 215A A.Sereja and Coat Rack
    Codeforces 215B B.Sereja and Suffixes
    HDU 4788 Hard Disk Drive
    HDU 2095 find your present (2)
    图的连通性问题—学习笔记
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614728.html
Copyright © 2011-2022 走看看