zoukankan      html  css  js  c++  java
  • Spring Boot 中使用kafka AdminClient管理Kafka

    2021-03-27

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.Map;
    import java.util.concurrent.ExecutionException;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.Config;
    import org.apache.kafka.clients.admin.ConfigEntry;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.DescribeClusterOptions;
    import org.apache.kafka.clients.admin.DescribeClusterResult;
    import org.apache.kafka.clients.admin.DescribeConfigsResult;
    import org.apache.kafka.clients.admin.ListTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.admin.TopicListing;
    import org.apache.kafka.common.Node;
    import org.apache.kafka.common.config.ConfigResource;
    import org.junit.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaAdmin;
    
    import com.ibm.dsw.quote.preentitlement.base.AbstractBaseTest;
    
    public class AdminClientTest extends AbstractBaseTest {
    
        @Autowired
        private KafkaAdmin admin;
    
        @Test
        public void listTopic() throws InterruptedException, ExecutionException {
            AdminClient client = AdminClient.create(admin.getConfig());
    
            // create topic
            NewTopic newTopic = new NewTopic("test1", 3, (short) 1);
            Collection<NewTopic> newTopicList = new ArrayList<>();
            newTopicList.add(newTopic);
            CreateTopicsResult createTopicResult = client.createTopics(newTopicList);
            createTopicResult.all().get();
            
            // list topic
            ListTopicsResult result = client.listTopics();
            Collection<TopicListing> topic = result.listings().get();
            topic.forEach(each -> System.out.println(each.name()));
    
            
            //get topic configuration
            DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "TopicName")));
            Map<ConfigResource, Config> configs = ret.all().get();
            for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
                 ConfigResource key = entry.getKey();
                 Config value = entry.getValue();
                 System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
                 Collection<ConfigEntry> configEntries = value.entries();
                 for (ConfigEntry each : configEntries) {
                      System.out.println(each.name() + " = " + each.value());
                 }
             }
    
            //get cluster information
            DescribeClusterResult ret2= client.describeCluster(new DescribeClusterOptions());
            String clusterId = ret2.clusterId().get();
            System.out.println("----------------clusterId------------"+clusterId);
            Collection<Node> nodes = ret2.nodes().get();
            for (Node node: nodes) {
                System.out.println(node.host());
            }
            
               
            client.close();
    
        }
    
    }
  • 相关阅读:
    Java设计模式(十二) 策略模式
    Java设计模式(二) 工厂方法模式
    Java设计模式(一) 简单工厂模式不简单
    Kafka设计解析(四)- Kafka Consumer设计解析
    Kafka设计解析(三)- Kafka High Availability (下)
    Kafka设计解析(二)- Kafka High Availability (上)
    Spark 灰度发布在十万级节点上的成功实践 CI CD
    Spark SQL / Catalyst 内部原理 与 RBO
    Java进阶(七)正确理解Thread Local的原理与适用场景
    Kafka设计解析(八)- Exactly Once语义与事务机制原理
  • 原文地址:https://www.cnblogs.com/Ivyduan/p/14586400.html
Copyright © 2011-2022 走看看