zoukankan      html  css  js  c++  java
  • customize health checking of kafka in springboot app 在springboot应用中自定义kafka的健康状况检查

    java code of health check :

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeClusterOptions;
    import org.apache.kafka.clients.admin.DescribeClusterResult;
    import org.apache.kafka.common.config.ConfigResource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.actuate.health.AbstractHealthIndicator;
    import org.springframework.boot.actuate.health.Health;
    import org.springframework.boot.actuate.health.Status;
    import org.springframework.kafka.core.KafkaAdmin;
    import org.springframework.stereotype.Component;
    
    import java.util.Collections;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * localhost:8080/actuator/health
     * refer docs:
     * https://github.com/spring-projects/spring-boot/blob/7cd19822c6de99e835bcaff1307f104e863da265/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
     *
     * spring boot github address:
     * https://github.com/spring-projects/spring-boot/tree/master/spring-boot-project
     */
    @Component("kafka")
    public class MyKafkaHealthIndicator extends AbstractHealthIndicator {
        private Logger logger = LoggerFactory.getLogger(MyKafkaHealthIndicator.class);
        static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
    
        @Autowired
        private KafkaAdmin kafkaAdmin;
    
        private Integer TIMEOUT_MS = 1000;
    
        private DescribeClusterOptions describeOptions = new DescribeClusterOptions().timeoutMs(TIMEOUT_MS);
    
        @Override
        protected void doHealthCheck(Health.Builder builder) throws Exception {
            try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfigurationProperties())) {
                DescribeClusterResult clusterResult = adminClient.describeCluster(this.describeOptions);
    
                // print detail of each of the nodes
                adminClient.describeCluster(this.describeOptions)
                        .nodes()
                        .get()
                        .stream()
                        .forEach(node -> {
                            try {
                                int replicationNum = getReplicationNumber(node.id() + "", adminClient);
                                logger.info("遍历kafka节点:node.id={}, host={}, port={}, replicationNum={}", node.id(), node.host(), node.port(), replicationNum);
                            } catch (Exception e) {
                                logger.error(e.getMessage(), e);
                            }
                        });
    
                // kafka 充当控制器的节点   the node as controller node
                String brokerId = clusterResult.controller().get(TIMEOUT_MS, TimeUnit.MILLISECONDS).idString();
                int replicationNum = getReplicationNumber(brokerId, adminClient);
                int availableNodesNum = clusterResult.nodes().get().size();
                Status status = availableNodesNum >= replicationNum ? Status.UP : Status.DOWN;
                builder.status(status)
                        .withDetail("clusterId", clusterResult.clusterId().get())
                        .withDetail("controller-node-brokerId", brokerId)
                        .withDetail("available-nodes-number", availableNodesNum);
            }
        }
    
        private int getReplicationNumber(String brokerId,
                                         AdminClient adminClient) throws ExecutionException, InterruptedException {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            String replicationNumStr = adminClient
                    .describeConfigs(Collections.singletonList(configResource))
                    .all()
                    .get()
                    .get(configResource)
                    .get(REPLICATION_PROPERTY)
                    .value();
            return Integer.valueOf(replicationNumStr);
        }
    }

    regist KafkaAdmin bean in springboot:

    @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                    StringUtils.arrayToCommaDelimitedString("192.168.2.62:9092,192.168.2.63:9092".split(",")));
            return new KafkaAdmin(configs);
        }

     remeber import dependency:

    <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
    </dependency>

    My springboot version is 2.3.1.RELEASE

    end.

  • 相关阅读:
    OpenGL_ES-纹理
    GCD 初步学习
    关于心理的二十五种倾向(查理&#183;芒格)-3
    黑马day18 jquery高级特性&amp;Ajax的load方法
    九度 1138
    FusionCharts简单教程---建立第一个FusionCharts图形
    【转】第二课.配置和初始化
    【转】Git详解之一:Git起步
    【转】1.5 起步
    【转】Cygwin的包管理器:apt-cyg
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13583428.html
Copyright © 2011-2022 走看看