zoukankan      html  css  js  c++  java
  • 数据零丢失kafka + zookeeper

    package kafkautils

    import kafka.ZookeeperHelper.client
    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.StringDecoder
    import org.I0Itec.zkclient.ZkClient
    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

    import scala.collection.JavaConversions._

    /**
    * Created on 下午9:37.
    */
    object KafkaZKManager extends Serializable{

    val client = {
    val client = CuratorFrameworkFactory
    .builder
    .connectString("spark123:12181/kafka0.9")
    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    .namespace("mykafka")
    .build()
    client.start()
    client
    }

    val kakfaOffsetRootPath = "/consumers/offsets"

    // 确保zookeeper中的路径是存在的
    def ensureZKPathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
    client.create().creatingParentsIfNeeded().forPath(path)
    }
    }

    def storeOffsets(offsetsRanges:Array[OffsetRange], groupName:String) = {


    for (o <- offsetsRanges) {
    // 保存offset到zk

    }

    }

    def getFromOffsets(topic : String,groupName : String): (Map[TopicAndPartition, Long], Int) = {
    // 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
    var fromOffsets: Map[TopicAndPartition, Long] = Map()
    val zkTopicPath = s"${kakfaOffsetRootPath}/${groupName}/${topic}"
    ensureZKPathExists(zkTopicPath)
    val t = client.getChildren.forPath(zkTopicPath)
    val offsets = for {
    p <- client.getChildren.forPath(zkTopicPath)
    } yield {
    //遍历路径下面的partition中的offset
    val data = client.getData.forPath(s"$zkTopicPath/$p")
    //将data变成Long类型
    val offset = java.lang.Long.valueOf(new String(data)).toLong
    println("offset:" + offset)
    (TopicAndPartition(topic, Integer.parseInt(p)), offset)
    }

    if(offsets.isEmpty) {
    (offsets.toMap,0)
    }else{
    (offsets.toMap,1)
    }


    }


    def createMyDirectKafkaStream (ssc: StreamingContext, kafkaParams: Map[String, String], topic: String, groupName: String
    ): InputDStream[(String, String)] = {
    val (fromOffsets, flag) = getFromOffsets( topic, groupName)

    var kafkaStream : InputDStream[(String, String)] = null
    if (flag == 1) {
    // 这个会将kafka的消息进行transform,最终kafak的数据都会变成(topic_name, message)这样的tuple
    val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
    println("fromOffsets:" + fromOffsets)
    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    } else {
    // 如果未保存,根据kafkaParam的配置使用最新或者最旧的offset
    kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
    }
    kafkaStream
    }


    def main(args: Array[String]): Unit = {

    val processingInterval = 2
    val brokers = "spark123:9092"
    val topic = "mytest1"
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topic.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "auto.offset.reset" -> "smallest")


    val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))


    val messages = createMyDirectKafkaStream(ssc, kafkaParams, topic, "testp")

    messages.foreachRDD((rdd,btime) => {
    if(!rdd.isEmpty()){
    println("==========================:" + rdd.count() )
    println("==========================btime:" + btime )
    }
    storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "testp")
    })

    ssc.start()
    ssc.awaitTermination()

    }

    }

  • 相关阅读:
    c/c++生成预编译文件
    (转)vim 访问系统剪贴板
    vim 使用、设置笔记
    使用cygwin注意事项二
    (转)使用cygwin注意事项一
    cygwin Could not create directory '/home/Administrator/.ssh'
    Linux 学习笔记 文件权限
    Linux 学习笔记 Linux环境变量初稿
    Linux 学习笔记 更多的bash shell命令
    Linux 学习笔记 查看文件内容诸多命令
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149629.html
Copyright © 2011-2022 走看看