zoukankan      html  css  js  c++  java
  • Kafka获取订阅某topic的所有consumer group【客户端版】

    之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里。使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人问这个问题,我就尝试写了一个。使用之前你需要引入kafka client包依赖(以2.2.0版本为例)

    Maven:

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
    </dependency>

    Gradle:

    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.0'

     

    下面是代码:

     1 private static List<String> getGroupsForTopic(String brokerServers, String topic) 
     2             throws ExecutionException, InterruptedException, TimeoutException {
     3         Properties props = new Properties();
     4         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers);
     5 
     6         try (AdminClient client = AdminClient.create(props)) {
     7             List<String> allGroups = client.listConsumerGroups()
     8                     .valid()
     9                     .get(10, TimeUnit.SECONDS)
    10                     .stream()
    11                     .map(ConsumerGroupListing::groupId)
    12                     .collect(Collectors.toList());
    13 
    14             Map<String, ConsumerGroupDescription> allGroupDetails =
    15                     client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS);
    16 
    17             final List<String> filteredGroups = new ArrayList<>();
    18             allGroupDetails.entrySet().forEach(entry -> {
    19                 String groupId = entry.getKey();
    20                 ConsumerGroupDescription description = entry.getValue();
    21                 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment)
    22                         .map(MemberAssignment::topicPartitions)
    23                         .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
    24                         .anyMatch(tps -> tps.contains(topic));
    25                 if (topicSubscribed)
    26                     filteredGroups.add(groupId);
    27             });
    28             return filteredGroups;
    29         }
    30     }

    我会假设你的集群中没有配置安全认证和授权机制或者发起此AdminClient的用户是合法用户且有CLUSTER以及GROUP的DESCRIBE权限。

    另外值得注意的是,上面这个函数无法获取非运行中的consumer group,即虽然一个group订阅了某topic,但是若它所有的consumer成员都关闭的话这个函数是不会返回该group的。

  • 相关阅读:
    根据EsLint配置WebStorm格式化代码风格
    Vue中使用vant-UI实现移动端自定义省市区三级联动
    Vue中使用Element-UI实现表格跨页多选
    Vue中使用iview-UI实现切换Tab页网络请求优化
    Vue中使用iview-UI按需引入Select组件下拉框无法生效问题
    Vue中使用iview-UI表格样式修改和使用自定义模板数据渲染相关
    Vue中使用Element-UI表单验证相关问题及解决
    Vue 3.0 多页面项目之商家平台练习
    五 创建道路模型(2 道路的挖填方量计算及条件部件)
    五 创建道路模型(1 道路三要素)
  • 原文地址:https://www.cnblogs.com/huxi2b/p/10638008.html
Copyright © 2011-2022 走看看