zoukankan      html  css  js  c++  java
  • SparkStreaming直连方式读取kafka数据,使用MySQL保存偏移量

    SparkStreaming直连方式读取kafka数据,使用MySQL保存偏移量

    通过MySQL保存kafka的偏移量,完成直连方式读取数据
    使用scalikeJDBC,访问数据库。

    1. ScalikeJDBC

    ScalikeJDBC 是一款Scala 开发者使用的简洁 DB 访问类库,它是基于 SQL 的,使用者只需要关注 SQL 逻辑的编写,所有的数据库操作都交给 ScalikeJDBC。这个类库内置包含了JDBC API,并且给用户提供了简单易用并且非常灵活的 API。并且,QueryDSL(通用查询查询框架)使你的代码类型安全的并且可重复使用。我们可以在生产环境大胆地使用这款 DB 访问类库。

    2.配置文件

    //配置数据库信息
    //使用IDEA,在resources文件夹下新建文件File文件名为application.conf
    db.default.driver="com.mysql.jdbc.Driver"
    db.default.url="jdbc:mysql://hadoop01:3306/kafkaOffset?characterEncodeing=utf-8"
    db.default.user="root"
    db.default.password="root"
    

    在这里插入图片描述

    3.导入依赖的jar包

    <!--Maven依赖-->
    
    <!--通过mysql保存偏移量-->
    
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.3.1</version>
            </dependency>
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc_2.11</artifactId>
                <version>2.5.0</version>
            </dependency>
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc-core_2.11</artifactId>
                <version>2.5.0</version>
            </dependency>
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc-config_2.11</artifactId>
                <version>2.5.0</version>
            </dependency>
    
    

    4.源码测试

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.KafkaCluster.Err
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
    import org.apache.spark.streaming.{Duration, StreamingContext}
    import scalikejdbc.{DB, SQL}
    import scalikejdbc.config.DBs
    
    /*
    将偏移量保存到MySQL中
     */
    object SparkStreamingOffsetMySql {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("medd").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Duration(5000))
        //配置一系列基本配置
        val groupid = "GPMMCC"
        val topic = "mysqlDemo"
        val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
       // val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
        val topics = Set(topic)
        //设置kafka的参数
        val kafkaParams = Map(
          "metadata.broker.list"->brokerList,
          "group.id"->groupid,
          "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
        )
        //加载配置 application.conf
        DBs.setup()
        //不需要查询zk中的offset啦,直接查询MySQL中的offset
        val fromdbOffset:Map[TopicAndPartition,Long]=
          DB.readOnly{
            implicit  session=>{
              //查询每个分组下面的所有消息
              SQL(s"select * from offset where groupId = '${groupid}'" +
               //将MySQL中的数据赋值给元组
                s"").map(m=>(TopicAndPartition(m.string("topic"),m.string("partitions").toInt),m.string("untilOffset").toLong))
                .toList().apply()
            }.toMap  //最后toMap ,应为前面的返回值已经给定
          }
    
        //创建一个DStream,用来获取数据
        var kafkaDStream : InputDStream[(String,String)] = null
    
        //从MySql中获取数据进行判断
        if(fromdbOffset.isEmpty){
          kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,
            StringDecoder](ssc,kafkaParams,topics)
        }else{
          //1 不能重复消费
          //2 保证偏移量
          var checkOffset = Map[TopicAndPartition,Long]()
    
          //加载kafka的配置
          val kafkaCluster = new KafkaCluster(kafkaParams)
          //首先获得kafka中的所有的topic和partition Offset
          val earliesOffset: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]
            ] = kafkaCluster.getEarliestLeaderOffsets(fromdbOffset.keySet)
    
    
          //然后开始比较大小,用mysql中的offset和kafka中的offset进行比较
          if(earliesOffset.isRight){
            //去到需要的 大Map
            //物取值
            val tap: Map[TopicAndPartition, KafkaCluster.LeaderOffset] =
            earliesOffset.right.get
            //比较,直接进行比较大小
            val checkOffset = fromdbOffset.map(f => {
              //取kafka中的offset
              //进行比较,不需要重复消费,取最大的
              val KafkatopicOffset = tap.get(f._1).get.offset
              if (f._2 > KafkatopicOffset) {
                f
              } else {
                (f._1, KafkatopicOffset)
              }
            })
            checkOffset
    
          }
          val messageHandler=(mmd:MessageAndMetadata[String,String])=>{
            (mmd.key(),mmd.message())
          }
    
          //不是第一次启动的话 ,按照之前的偏移量取数据的偏移量
          kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder
            ,StringDecoder,(String,String)](ssc,kafkaParams,checkOffset
          ,messageHandler)
    
        }
    
        var offsetRanges = Array[OffsetRange]()
        kafkaDStream.foreachRDD(kafkaRDD=>{
         offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
          val map: RDD[String] = kafkaRDD.map(_._2)
          map.foreach(println)
    
          //更新偏移量
    
            DB.localTx(implicit session =>{
              //去到所有的topic partition offset
              for (o<- offsetRanges){
                /*SQL("update offset set groupId=? topic=?,partition=?," +
                  "untilsOffset=?").bind(groupid,o.topic,o.partition,o.untilOffset).update().apply()*/
                SQL("replace into offset(groupId,topic,partitions,untilOffset) values(?,?,?,?)").bind(
                  groupid,o.topic,o.partition.toString,o.untilOffset.toString
                ).update().apply()
              }
            })
    
        })
        ssc.start()
        ssc.awaitTermination()   
      }
    }
    
    
    
  • 相关阅读:
    ArchLinux and LXDE and LXDM
    如何改变X:\Users\XXX的用户名称
    Windows 7 支持4GB以上内存破解工具下载
    Linux & Vim Command Wallpaper
    The easy way to execute sudo command in Python using subprocess.Popen
    C# DateTime 精确到秒/截断毫秒部分
    制约程序员"钱途"的两大最关键因素
    Oracle基本操作
    字符串处理【Delphi版】
    java学习路线的经验之谈
  • 原文地址:https://www.cnblogs.com/aixing/p/13327349.html
Copyright © 2011-2022 走看看