zoukankan      html  css  js  c++  java
  • 数据零丢失kafka + checkpoint

    package kafkautils

    /**
    * Created on 上午12:48.
    *
    * High level comsumer api
    *
    * low level comsumer api(simple comsumer api)
    *
    *
    */
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}


    object StreamingWithCheckpoint {
    def main(args: Array[String]) {
    //val Array(brokers, topics) = args
    val processingInterval = 2
    val brokers = "spark123:9092"
    val topics = "mytest1"
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ConsumerWithCheckPoint").setMaster("local[2]")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "auto.offset.reset" -> "smallest")
    val checkpointPath = "hdfs://spark123:8020/spark_checkpoint10"
    def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    ssc.checkpoint(checkpointPath)
    messages.checkpoint(Duration(8*processingInterval.toInt*1000))
    messages.foreachRDD(rdd => {
    if(!rdd.isEmpty()){
    println("################################" + rdd.count())
    }

    })
    ssc
    }

    // 如果有checkpoint则checkpoint中记录的信息恢复StreamingContext
    val context = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
    context.start()
    context.awaitTermination()
    }
    }

  • 相关阅读:
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    FFmpeg解码H264及swscale缩放详解
    linux中cat more less head tail 命令区别
    C语言字符串操作总结大全(超详细)
    如何使用eclipse进行嵌入式Linux的开发
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149647.html
Copyright © 2011-2022 走看看