zoukankan      html  css  js  c++  java
  • customize health checking of kafka in springboot app 在springboot应用中自定义kafka的健康状况检查

    java code of health check :

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeClusterOptions;
    import org.apache.kafka.clients.admin.DescribeClusterResult;
    import org.apache.kafka.common.config.ConfigResource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.actuate.health.AbstractHealthIndicator;
    import org.springframework.boot.actuate.health.Health;
    import org.springframework.boot.actuate.health.Status;
    import org.springframework.kafka.core.KafkaAdmin;
    import org.springframework.stereotype.Component;
    
    import java.util.Collections;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * localhost:8080/actuator/health
     * refer docs:
     * https://github.com/spring-projects/spring-boot/blob/7cd19822c6de99e835bcaff1307f104e863da265/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
     *
     * spring boot github address:
     * https://github.com/spring-projects/spring-boot/tree/master/spring-boot-project
     */
    @Component("kafka")
    public class MyKafkaHealthIndicator extends AbstractHealthIndicator {
        private Logger logger = LoggerFactory.getLogger(MyKafkaHealthIndicator.class);
        static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
    
        @Autowired
        private KafkaAdmin kafkaAdmin;
    
        private Integer TIMEOUT_MS = 1000;
    
        private DescribeClusterOptions describeOptions = new DescribeClusterOptions().timeoutMs(TIMEOUT_MS);
    
        @Override
        protected void doHealthCheck(Health.Builder builder) throws Exception {
            try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfigurationProperties())) {
                DescribeClusterResult clusterResult = adminClient.describeCluster(this.describeOptions);
    
                // print detail of each of the nodes
                adminClient.describeCluster(this.describeOptions)
                        .nodes()
                        .get()
                        .stream()
                        .forEach(node -> {
                            try {
                                int replicationNum = getReplicationNumber(node.id() + "", adminClient);
                                logger.info("遍历kafka节点:node.id={}, host={}, port={}, replicationNum={}", node.id(), node.host(), node.port(), replicationNum);
                            } catch (Exception e) {
                                logger.error(e.getMessage(), e);
                            }
                        });
    
                // kafka 充当控制器的节点   the node as controller node
                String brokerId = clusterResult.controller().get(TIMEOUT_MS, TimeUnit.MILLISECONDS).idString();
                int replicationNum = getReplicationNumber(brokerId, adminClient);
                int availableNodesNum = clusterResult.nodes().get().size();
                Status status = availableNodesNum >= replicationNum ? Status.UP : Status.DOWN;
                builder.status(status)
                        .withDetail("clusterId", clusterResult.clusterId().get())
                        .withDetail("controller-node-brokerId", brokerId)
                        .withDetail("available-nodes-number", availableNodesNum);
            }
        }
    
        private int getReplicationNumber(String brokerId,
                                         AdminClient adminClient) throws ExecutionException, InterruptedException {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            String replicationNumStr = adminClient
                    .describeConfigs(Collections.singletonList(configResource))
                    .all()
                    .get()
                    .get(configResource)
                    .get(REPLICATION_PROPERTY)
                    .value();
            return Integer.valueOf(replicationNumStr);
        }
    }

    regist KafkaAdmin bean in springboot:

    @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                    StringUtils.arrayToCommaDelimitedString("192.168.2.62:9092,192.168.2.63:9092".split(",")));
            return new KafkaAdmin(configs);
        }

     remeber import dependency:

    <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
    </dependency>

    My springboot version is 2.3.1.RELEASE

    end.

  • 相关阅读:
    css3-响应式布局
    css3-盒模型新增属性
    css3-弹性盒模型
    阿里天池超级码力复赛
    [状态压缩dp]Leetcode5.02双周赛 每个人戴不同帽子的方案数
    算法编程题:魔塔
    [Dijkstra贪心思想妙用]真实笔试题:传送门
    2020 力扣杯!Code Your Future 春季全国编程大赛 个人赛
    经典笔试算法题之打小怪兽
    两道经典面试算法题2020-3-20(打牌,最长上升字符串拼接)
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13583428.html
Copyright © 2011-2022 走看看