zoukankan      html  css  js  c++  java
  • customize health checking of kafka in springboot app 在springboot应用中自定义kafka的健康状况检查

    java code of health check :

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.DescribeClusterOptions;
    import org.apache.kafka.clients.admin.DescribeClusterResult;
    import org.apache.kafka.common.config.ConfigResource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.actuate.health.AbstractHealthIndicator;
    import org.springframework.boot.actuate.health.Health;
    import org.springframework.boot.actuate.health.Status;
    import org.springframework.kafka.core.KafkaAdmin;
    import org.springframework.stereotype.Component;
    
    import java.util.Collections;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * localhost:8080/actuator/health
     * refer docs:
     * https://github.com/spring-projects/spring-boot/blob/7cd19822c6de99e835bcaff1307f104e863da265/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java
     *
     * spring boot github address:
     * https://github.com/spring-projects/spring-boot/tree/master/spring-boot-project
     */
    @Component("kafka")
    public class MyKafkaHealthIndicator extends AbstractHealthIndicator {
        private Logger logger = LoggerFactory.getLogger(MyKafkaHealthIndicator.class);
        static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
    
        @Autowired
        private KafkaAdmin kafkaAdmin;
    
        private Integer TIMEOUT_MS = 1000;
    
        private DescribeClusterOptions describeOptions = new DescribeClusterOptions().timeoutMs(TIMEOUT_MS);
    
        @Override
        protected void doHealthCheck(Health.Builder builder) throws Exception {
            try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfigurationProperties())) {
                DescribeClusterResult clusterResult = adminClient.describeCluster(this.describeOptions);
    
                // print detail of each of the nodes
                adminClient.describeCluster(this.describeOptions)
                        .nodes()
                        .get()
                        .stream()
                        .forEach(node -> {
                            try {
                                int replicationNum = getReplicationNumber(node.id() + "", adminClient);
                                logger.info("遍历kafka节点:node.id={}, host={}, port={}, replicationNum={}", node.id(), node.host(), node.port(), replicationNum);
                            } catch (Exception e) {
                                logger.error(e.getMessage(), e);
                            }
                        });
    
                // kafka 充当控制器的节点   the node as controller node
                String brokerId = clusterResult.controller().get(TIMEOUT_MS, TimeUnit.MILLISECONDS).idString();
                int replicationNum = getReplicationNumber(brokerId, adminClient);
                int availableNodesNum = clusterResult.nodes().get().size();
                Status status = availableNodesNum >= replicationNum ? Status.UP : Status.DOWN;
                builder.status(status)
                        .withDetail("clusterId", clusterResult.clusterId().get())
                        .withDetail("controller-node-brokerId", brokerId)
                        .withDetail("available-nodes-number", availableNodesNum);
            }
        }
    
        private int getReplicationNumber(String brokerId,
                                         AdminClient adminClient) throws ExecutionException, InterruptedException {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId);
            String replicationNumStr = adminClient
                    .describeConfigs(Collections.singletonList(configResource))
                    .all()
                    .get()
                    .get(configResource)
                    .get(REPLICATION_PROPERTY)
                    .value();
            return Integer.valueOf(replicationNumStr);
        }
    }

    regist KafkaAdmin bean in springboot:

    @Bean
        public KafkaAdmin admin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                    StringUtils.arrayToCommaDelimitedString("192.168.2.62:9092,192.168.2.63:9092".split(",")));
            return new KafkaAdmin(configs);
        }

     remeber import dependency:

    <dependency>
           <groupId>org.springframework.kafka</groupId>
           <artifactId>spring-kafka</artifactId>
    </dependency>

    My springboot version is 2.3.1.RELEASE

    end.

  • 相关阅读:
    yii 引入文件
    CodeForces 621C Wet Shark and Flowers
    面试题题解
    POJ 2251 Dungeon Master
    HDU 5935 Car(模拟)
    HDU 5938 Four Operations(暴力枚举)
    CodeForces 722C Destroying Array(并查集)
    HDU 5547 Sudoku(dfs)
    HDU 5583 Kingdom of Black and White(模拟)
    HDU 5512 Pagodas(等差数列)
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13583428.html
Copyright © 2011-2022 走看看