zoukankan      html  css  js  c++  java
  • 数据零丢失kafka + hbase

    package kafkautils

    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import kafka.utils.ZkUtils
    import kafkautils.KafkaZKManager.{getFromOffsets, storeOffsets}
    import org.I0Itec.zkclient.ZkClient
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
    import kafka.ZookeeperHelper.client
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.I0Itec.zkclient.ZkClient
    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

    import scala.collection.JavaConversions._
    /**
    * Created by zhoucw on 下午4:52.
    *
    */
    object KafkaHbaseManager {

    // 自己参考实现
    def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
    hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={

    }


    // 从zookeeper中获取topic的分区数
    def getNumberOfPartitionsForTopicFromZK(TOPIC_NAME:String,GROUP_ID:String,
    zkQuorum:String,zkRootDir:String,sessTimeout:Int,connTimeOut:Int): Int ={

    }

    // 自己参考实现
    def getLastestOffsets(TOPIC_NAME:String,GROUP_ID:String,hTableName:String,
    zkQuorum:String,zkRootDir:String,sessTimeout:Int,connTimeOut:Int):Map[TopicAndPartition,Long] ={


    val zKNumberOfPartitions =getNumberOfPartitionsForTopicFromZK(TOPIC_NAME, GROUP_ID, zkQuorum,zkRootDir,sessTimeout,connTimeOut)


    val hbaseConf = HBaseConfiguration.create()

    // 获取hbase中最后提交的offset
    val conn = ConnectionFactory.createConnection(hbaseConf)
    val table = conn.getTable(TableName.valueOf(hTableName))
    val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
    val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
    val scan = new Scan()
    val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
    val result = scanner.next()

    var hbaseNumberOfPartitions = 0 // 在hbase中获取的分区数量
    if (result != null){
    // 将分区数量设置为hbase表的列数量
    hbaseNumberOfPartitions = result.listCells().size()
    }

    val fromOffsets = collection.mutable.Map[TopicAndPartition,Long]()
    if(hbaseNumberOfPartitions == 0){
    // 初始化kafka为开始

    } else if(zKNumberOfPartitions > hbaseNumberOfPartitions){
    // 处理新增加的分区添加到kafka的topic

    } else {
    // 获取上次运行的offset

    }

    scanner.close()
    conn.close()
    fromOffsets.toMap
    }

    def main(args: Array[String]): Unit = {
    // getLastCommittedOffsets("mytest1", "testp", "stream_kafka_offsets", "spark123:12181", "kafka0.9", 30000, 30000)

    val processingInterval = 2
    val brokers = "spark123:9092"
    val topics = "mytest1"
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("kafkahbase").setMaster("local[2]")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "auto.offset.reset" -> "smallest")


    val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
    val groupId = "testp"
    val hbaseTableName = "spark_kafka_offsets"

    // 获取kafkaStream
    //val kafkaStream = createMyDirectKafkaStream(ssc, kafkaParams, zkClient, topicsSet, "testp")
    val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
    val fromOffsets = getLastestOffsets("mytest1", groupId,hbaseTableName , "spark123:12181", "kafka0.9", 30000, 30000)


    var kafkaStream : InputDStream[(String, String)] = null
    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)


    kafkaStream.foreachRDD((rdd,btime) => {
    if(!rdd.isEmpty()){
    println("==========================:" + rdd.count() )
    println("==========================btime:" + btime )
    saveOffsets(topics, groupId, rdd.asInstanceOf[HasOffsetRanges].offsetRanges, hbaseTableName, btime)
    }

    })


    //val offsetsRanges:Array[OffsetRange] = null

    ssc.start()
    ssc.awaitTermination()


    }
    }

  • 相关阅读:
    angular js 多处获取ajax数据的方法
    回调函数(在原生ajax中应用) 事件监听 与promise的应用介绍
    AngularJS Scope(作用域)
    angular js 模型 (ng-model指令)
    angular js 指令 ng-model与 no-repeat的使用 ng-各种指令 创建自定义指令 限制使用指令 restrict的不同取值
    Ka的回溯编程练习 Part2|八皇后问题和N皇后问题
    Ka的回溯编程练习 Part1|整划什么的。。
    Ka的递归编程练习 Final.Part8|回溯前传二|排列组合
    Ka的递归编程练习 Part7|回溯前传一|素数环!
    Ka的递归编程练习 Part6|简单背包问题,拒绝动规从我做起
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149620.html
Copyright © 2011-2022 走看看