zoukankan      html  css  js  c++  java
  • 数据零丢失kafka + hbase

    package kafkautils

    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.ZkUtils
    import kafkautils.KafkaZKManager.{getFromOffsets, storeOffsets}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import kafka.ZookeeperHelper.client
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.I0Itec.zkclient.ZkClient
    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

    import scala.collection.JavaConversions._
    /**
    * Created by zhoucw on 下午4:52.
    *
    */
    object KafkaHbaseManager {

    // 自己参考实现
    def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
    hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={

    }


    // 从zookeeper中获取topic的分区数
    def getNumberOfPartitionsForTopicFromZK(TOPIC_NAME:String,GROUP_ID:String,
    zkQuorum:String,zkRootDir:String,sessTimeout:Int,connTimeOut:Int): Int ={

    }

    // 自己参考实现
    def getLastestOffsets(TOPIC_NAME:String,GROUP_ID:String,hTableName:String,
    zkQuorum:String,zkRootDir:String,sessTimeout:Int,connTimeOut:Int):Map[TopicAndPartition,Long] ={


    val zKNumberOfPartitions =getNumberOfPartitionsForTopicFromZK(TOPIC_NAME, GROUP_ID, zkQuorum,zkRootDir,sessTimeout,connTimeOut)


    val hbaseConf = HBaseConfiguration.create()

    // 获取hbase中最后提交的offset
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hTableName))
    val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
    val result = scanner.next()

    var hbaseNumberOfPartitions = 0 // 在hbase中获取的分区数量
    if (result != null){
    // 将分区数量设置为hbase表的列数量
    hbaseNumberOfPartitions = result.listCells().size()
    }

    val fromOffsets = collection.mutable.Map[TopicAndPartition,Long]()
    if(hbaseNumberOfPartitions == 0){
    // 初始化kafka为开始

    } else if(zKNumberOfPartitions > hbaseNumberOfPartitions){
    // 处理新增加的分区添加到kafka的topic

    } else {
    // 获取上次运行的offset

    }

    scanner.close()
    conn.close()
    fromOffsets.toMap
    }

    def main(args: Array[String]): Unit = {
    // getLastCommittedOffsets("mytest1", "testp", "stream_kafka_offsets", "spark123:12181", "kafka0.9", 30000, 30000)

    val processingInterval = 2
    val brokers = "spark123:9092"
    val topics = "mytest1"
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("kafkahbase").setMaster("local[2]")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "auto.offset.reset" -> "smallest")


    val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
    val groupId = "testp"
    val hbaseTableName = "spark_kafka_offsets"

    // 获取kafkaStream
    //val kafkaStream = createMyDirectKafkaStream(ssc, kafkaParams, zkClient, topicsSet, "testp")
    val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
    val fromOffsets = getLastestOffsets("mytest1", groupId,hbaseTableName , "spark123:12181", "kafka0.9", 30000, 30000)


    var kafkaStream : InputDStream[(String, String)] = null
    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)


    kafkaStream.foreachRDD((rdd,btime) => {
    if(!rdd.isEmpty()){
    println("==========================:" + rdd.count() )
    println("==========================btime:" + btime )
    saveOffsets(topics, groupId, rdd.asInstanceOf[HasOffsetRanges].offsetRanges, hbaseTableName, btime)
    }

    })


    //val offsetsRanges:Array[OffsetRange] = null

    ssc.start()
    ssc.awaitTermination()


    }
    }

  • 相关阅读:
    [分治FFT]「CTSC2018」青蕈领主
    [边分治+线段树合并]「CTSC2018」暴力写挂
    [模板]MTT
    [模板]NTT
    [矩阵求逆+二分图匹配]BZOJ 3168 [Heoi2013]钙铁锌硒维生素
    [BZOJ1925][SDOI2010]地精部落(DP)
    [BZOJ1047][HAOI2007]理想的正方形(RMQ+DP)
    [POJ3630]Phone List (Tire)
    [POJ1193][NOI1999]内存分配(链表+模拟)
    [POJ2823]Sliding Window 滑动窗口(单调队列)
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149620.html
Copyright © 2011-2022 走看看