zoukankan      html  css  js  c++  java
  • 查询订阅某topic的所有consumer group(Java API)

    在网上碰到的问题,想了下使用现有的API还是可以实现的。

    首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖:

    Maven

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>1.0.0</version>
    </dependency>

    Gradle

    compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.0'

    然后编写获取订阅某topic的所有group的方法,代码如下:

    /**
         * get all subscribing consumer group names for a given topic
         * @param brokerListUrl localhost:9092 for instance
         * @param topic         topic name
         * @return
         */
        public static Set<String> getAllGroupsForTopic(String brokerListUrl, String topic) {
            AdminClient client = AdminClient.createSimplePlaintext(brokerListUrl);
    
            try {
                List<GroupOverview> allGroups = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
                Set<String> groups = new HashSet<>();
                for (GroupOverview overview: allGroups) {
                    String groupID = overview.groupId();
                    Map<TopicPartition, Object> offsets = scala.collection.JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));
                    Set<TopicPartition> partitions = offsets.keySet();
                    for (TopicPartition tp: partitions) {
                        if (tp.topic().equals(topic)) {
                            groups.add(groupID);
                        }
                    }
                }
                return groups;
            } finally {
                client.close();
            }
        }  
  • 相关阅读:
    Oracle启动关闭
    Open_stack 有虚拟机端口不通的问题
    关于Oracle归档的一些操作
    电脑无法开机 接通电源后主板有红灯闪烁的问题
    Centos7+python3.6+face-recognition
    电脑无法开机的问题-主板上有红色告警灯闪烁
    关于systemctl
    Vsftp搭建 for centos7
    外星人入侵——安装Pygame
    mysql索引原理详解
  • 原文地址:https://www.cnblogs.com/huxi2b/p/7831787.html
Copyright © 2011-2022 走看看