zoukankan      html  css  js  c++  java
  • zookeeper

    package com.bnls.test.common

    import kafka.common.TopicAndPartition
    import org.apache.curator.framework.CuratorFrameworkFactory
    import org.apache.curator.retry.ExponentialBackoffRetry
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.streaming.kafka010.OffsetRange
    import org.slf4j.LoggerFactory

    import scala.collection.JavaConversions._
    import scala.collection.mutable


    object ZookeeperHelper {

    val LOG = LoggerFactory.getLogger(ZookeeperHelper.getClass)

    //链接zookeeper的host及端口
    val zk_connectstring = "10.60.81.168:2181,10.60.81.167:2181,10.60.81.166:2181"
    //zookeeper表空间
    val zk_namespace = "mykafka"
    // offset 路径起始位置
    val Globe_kafkaOffsetPath = "/kafka/offsets"

    // ZK client
    val zkClient = {
    val client = CuratorFrameworkFactory.builder.connectString(zk_connectstring)
    .retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace(zk_namespace).build()
    client.start()
    client
    }

    // 路径确认函数 确认ZK中路径存在,不存在则创建该路径
    def ensureZKPathExists(path: String) = {
    if (zkClient.checkExists().forPath(path) == null) {
    zkClient.create().creatingParentsIfNeeded().forPath(path)
    }
    }

    // 保存 新的 offset
    def storeOffsets(offsetRange: Array[OffsetRange], groupName: String) = {

    for (o <- offsetRange) {
    val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"

    // 向对应分区第一次写入或者更新Offset 信息
    println("---Offset写入ZK------ Topic:" + o.topic + ", Partition:" + o.partition + ", Offset:" + o.untilOffset)
    ensureZKPathExists(zkPath)
    zkClient.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
    }
    println(s"保存新 offset 成功!")
    }

    def getZKOffset(kafkaParam: Map[String, String], topic: Set[String], groupName: String): Map[TopicAndPartition, Long] = {

    // Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartition
    var offsets: Map[TopicAndPartition, Long] = Map()

    val topic1 = topic.head

    // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
    val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"
    // 检查路径是否存在
    ensureZKPathExists(zkTopicPath)

    // 获取topic的子节点,即 分区
    val childrens = zkClient.getChildren().forPath(zkTopicPath)

    // 遍历分区
    for { p <- childrens }
    yield {

    // 遍历读取子节点中的数据:即 offset
    val offsetData = zkClient.getData().forPath(s"$zkTopicPath/$p")
    // 将offset转为Long
    val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
    offsets += TopicAndPartition(topic1, Integer.parseInt(p)) -> offSet
    }
    offsets
    }

    def getResetOffsets(kafkaParam: Map[String, String], topics: Set[String]): Map[TopicAndPartition, Long] = {
    //复制KafkaCluster
    val cluster = new KafkaClusterHelper(kafkaParam)

    var offsets: Map[TopicAndPartition, Long] = Map()
    System.out.println("dddddd22222222222dddd")
    // 最新或者最小offset reset为smallest或largest
    val reset = kafkaParam.get("auto.offset.reset").map(x => x.toString.toLowerCase())
    System.out.println(kafkaParam)
    System.out.println(cluster.toString)
    System.out.println(reset)
    val topicAndPartitions: Set[TopicAndPartition] = cluster.getPartitions(topics).right.get
    System.out.println(topicAndPartitions)

    if (reset == Some("smallest")) {
    System.out.println("start**********")
    val leaderOffsets = cluster.getEarliestLeaderOffsets(topicAndPartitions).right.get
    System.out.println(leaderOffsets)
    topicAndPartitions.foreach(tp => {
    offsets += tp -> leaderOffsets(tp).offset
    })
    } else if (reset == Some("largest")) {
    val leaderOffsets = cluster.getLatestLeaderOffsets(topicAndPartitions).right.get
    topicAndPartitions.foreach(tp => {
    offsets += tp -> leaderOffsets(tp).offset
    })
    }
    offsets
    }

    def getConSumerOffsets(kafkaParam: Map[String, String], topicSet1:Set[String], groupName:String) : (Map[TopicPartition, Long],Int) = {

    val brokers = kafkaParam("bootstrap.servers")
    System.out.println(brokers)
    // println(brokers)
    var topicSet = topicSet1.toArray
    System.out.println("rrrrrrrrrrrrrrrr")
    System.out.println(topicSet.toString)
    val kafkaSmallestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
    val kafkaLargestParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "largest")

    var offSets: mutable.Buffer[(TopicPartition, Long)] = mutable.Buffer()

    val smallOffsets = getResetOffsets(kafkaSmallestParams, topicSet1)
    val largestOffsets = getResetOffsets(kafkaLargestParams, topicSet1)
    val consumerOffsets = getZKOffset( kafkaParam,topicSet1, groupName) // cOffset-从外部存储中读取的offset

    smallOffsets.foreach({
    case(tp, sOffset) => {
    val cOffset = if (!consumerOffsets.containsKey(tp)) 0 else consumerOffsets(tp)
    val lOffset = largestOffsets(tp)
    if(sOffset > cOffset) {
    offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), sOffset))
    } else if(cOffset > lOffset){
    offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), lOffset))
    } else{
    offSets.append((new TopicPartition(tp.topic.toString,tp.partition.toInt), cOffset))
    }
    }
    })
    if(offSets.isEmpty){
    (offSets.toMap,0)
    } else {
    (offSets.toMap, 1)
    }
    }


    }
  • 相关阅读:
    作为平台的Windows PowerShell(一)
    PowerShell中的数学计算
    PowerShell远程安装应用程序
    CRC码计算
    HTTP API 设计指南
    Restfull API 示例
    使用HttpClient 发送 GET、POST、PUT、Delete请求及文件上传
    函数可重入性及编写规范
    Python3中的http.client模块
    python用httplib模块发送get和post请求***
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10064292.html
Copyright © 2011-2022 走看看