zoukankan      html  css  js  c++  java
  • Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster

    Return: Map[TopicPartition, Long] 

    Code:

    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

    val kc: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

    kc.partitionsFor(new String(topic)).asScala.map{partitionInfo =>

    val topicPartition = new TopicPartition(topic, partitionInfo.partition())
    kc.assign(Seq(topicPartition).asJava)
    kc.seekToBeginning(Seq(topicPartition).asJava)
    topicPartition ->  kc.position(topicPartition)
    }.toMap

    Key point: Scala code call Java lib

  • 相关阅读:
    鼠标滑过,解决ul下 li下a的背景与父级Li不同宽的问题
    php函数
    常用函数之数组函数
    php流程控制
    php运算符
    php常量
    php变量的数据类型
    PHP是什么
    css3新增属性
    html5的常用标签
  • 原文地址:https://www.cnblogs.com/yjyyjy/p/10678438.html
Copyright © 2011-2022 走看看