zoukankan      html  css  js  c++  java
  • Kafka Java API获取非compacted topic总消息数

    目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现。下列代码可以实现这一功能,特此记录一下:

    /**
         * 获取某个topic的当前消息数
         * Java 8+ only
         *
         * @param topic
         * @param brokerList
         * @return
         */
        public static long totalMessageCount(String topic, String brokerList) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "false");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic))
                        .orElse(Collections.emptyList())
                        .stream()
                        .map(info -> new TopicPartition(info.topic(), info.partition()))
                        .collect(Collectors.toList());
                Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
    
                return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum();
            }
        }
    

      

  • 相关阅读:
    struct--file_operations
    接触到的一些数据结构: LIST_ENTRY, TAILQ
    Kernel Mode, User Mode
    可运行的代码
    写一篇Hook Driver.
    Chromium学习笔记
    Resources for Browser/Webkit/GPU
    Win7/8, convert dynamic disk volume to basic volume.
    System and Device power management.
    错误记录1----dom4j解析xml调试编码错误
  • 原文地址:https://www.cnblogs.com/huxi2b/p/9530072.html
Copyright © 2011-2022 走看看