zoukankan      html  css  js  c++  java
  • sparkstreaming + kafka + zookeeper + hbase

    package com.bnls.test.spark

    import com.bnls.test.common.ZookeeperHelper
    import com.bnls.test.hbase.HbaseHelper
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.log4j.{Level, Logger}
    import org.apache.kafka.clients.consumer._
    import org.json4s.jackson.JsonMethods

    import scala.util.parsing.json.JSON

    object StreamingZookeeperProApp {
    Logger.getLogger("org.apache").setLevel(Level.WARN) //设置日志显示

    def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, String], topic:Set[String],
    groupName:String ):InputDStream[ConsumerRecord[String, String]] = {
    System.out.println(kafkaParams)
    System.out.println(groupName)
    System.out.println(topic)
    // get offset flag = 1 表示基于已有的offset计算 flag = 0 表示从头开始(最早或者最新,根据Kafka配置)
    var (fromOffsets, flag) = ZookeeperHelper.getConSumerOffsets(kafkaParams,topic, groupName)
    System.out.println("q111qqqqqqqqqqq")

    var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null

    if (flag == 1){
    kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))
    println("中断后 Streaming 成功!")
    } else {
    kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe(topic, kafkaParams))
    println("首次 Streaming 成功!")
    }
    kafkaStream
    }

    def main(args: Array[String]): Unit = {

    //Spark 配置
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka checkpoint zookeeper")
    .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.rdd.compress", "true")
    // .set("spark.kryo.registrator", "com.test.spark.StreamingZookeeperProApp")
    //sparkConf.registerKryoClasses(Array(classOf[KafkaClusterHelper], classOf[ZookeeperHelper], classOf[StreamingZookeeperProApp]))
    /* .registerKryoClasses(Array(
    classOf[KafkaClusterHelper],classOf[ZookeeperHelper],classOf[StreamingZookeeperProApp]
    ))*/

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
    var tableName :String = null

    // Kafka 配置
    val brokers = "sz280325:9092,sz280324:9092,sz280323:9092"
    //val topics1 = "mysql_kafka"
    val topics1 = "mysql_kafka"
    //val topics1 = "test01"
    val topics = topics1.split(",").toSet
    val groupId = "zk_mysql_kafka" //也可以通过读取配置文件,使用typesafe的 ConfigFactory读取

    val kafkaParams = Map[String, String](
    "bootstrap.servers" -> brokers,
    "key.deserializer"-> "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer"-> "org.apache.kafka.common.serialization.StringDeserializer",
    "group.id" -> groupId,
    "enable.auto.commit"-> "false"
    //"auto.offset.reset" -> "smallest"
    )

    // val kafkaParams1 = Map[String, String]("bootstrap.servers" -> brokers, "auto.offset.reset" -> "smallest")
    System.out.println("qqqqqqqqqqqq")
    var messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, groupId)

    System.out.println("ssssssssss")

    //case class Sensors1(author: String, id: String, title: String, rowKey: String)/*{
    // override def toString = s"Sensors1(author=$author, id=$id, title=$title, rowKey=$rowKey)"
    //}

    var offsetsRanges = Array[OffsetRange]()
    messages.transform{ rdd2 =>
    //得到该 rdd 对应 kafka 的消息的 offset
    offsetsRanges = rdd2.asInstanceOf[HasOffsetRanges].offsetRanges
    // ZookeeperHelper.storeOffsets(rdd2.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)

    /* val s2 = rdd2.map(record => {
    implicit val formats = DefaultFormats
    parse(record.value, false).extract[Sensors1]
    }
    )
    s2.window(Seconds(30), Seconds(5)).foreachRDD { rdd =>
    val collected = rdd.map(record => (record.author, record.id, record.title, record.rowKey)).collect()
    for (c <- collected) {
    println(c)
    }
    }*/
    //rdd2.map(x=>(x.topic(),x.value())) .window(Seconds(30),Seconds(10))
    rdd2
    }.foreachRDD { rdds => {
    if (!rdds.isEmpty()) {
    /*
    * rdds
    * */
    rdds.foreachPartition(partitionOfRecords => {
    partitionOfRecords.foreach { data => {
    try {

    /* val jsonS = JSON.parseFull(data.value())
    val jsonObj = jsonS match {
    case Some(map: Map[String, Any]) => map
    }
    val headObj = jsonObj.get("head").get.asInstanceOf[Map[String, String]]
    val tableStr = headObj.get("table").getOrElse("")
    println(tableStr)*/
    //处理数据
    println(data)
    //插入数据到hbase
    HbaseHelper.insert2hbase(data.value())


    // 保存新的 Offset
    ZookeeperHelper.storeOffsets(offsetsRanges, groupId)

    }
    catch {
    case e: Exception =>
    println(s"不规则数据 " + e.getMessage)
    }
    }
    }
    }
    )
    } else {
    println(s"没有数据")
    }
    }
    }




    ssc.start()
    ssc.awaitTermination()
    }
    }





    /*

    // 获取topic中有效的最小offset
    val kafkaParamsSmallest = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
    val smallestOffsets = ZookeeperHelper.getResetOffsets(kafkaParamsSmallest, topics)
    // 获取topic中有效的最新offset
    val kafkaParamsLargest = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "largest")
    val largestOffsets = ZookeeperHelper.getResetOffsets(kafkaParamsLargest, topics)

    // 打印
    println("========Smallest offsets=============:" + smallestOffsets)
    println("========Largest offsets=============:" + largestOffsets)*/
  • 相关阅读:
    【字符串】C语言_字符串常量详解
    2138=数据结构实验之图论三:判断可达性
    3363=数据结构实验之图论七:驴友计划
    1916=字符串扩展(JAVA)
    2140=数据结构实验之图论十:判断给定图是否存在合法拓扑序列
    3364=数据结构实验之图论八:欧拉回路
    2138=数据结构实验之图论三:判断可达性
    2271=Eddy的难题(JAVA)
    2246=时间日期格式转换(JAVA)
    2804=数据结构实验之二叉树八:(中序后序)求二叉树的深度
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10064301.html
Copyright © 2011-2022 走看看