zoukankan      html  css  js  c++  java
  • Kafka客户端操作

    引入kafka依赖

             <!--kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.1.7.RELEASE</version>
            </dependency>  
    

      

    1、创建Topic

     /**
         * 创建topic实例
         */
        public static  void createTopic() throws Exception{
            AdminClient adminClient = adminClient();
            //副本因子
            Short rs = 1;
            NewTopic newTopic = new NewTopic(TOPIC_NAME,1,rs);
            CreateTopicsResult createTopicsResult =  adminClient.createTopics(Arrays.asList(newTopic));
            System.out.println("createTopicsResult: " + createTopicsResult );
            createTopicsResult.all().get();
        }
    
    
        public static AdminClient adminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
    
            return  adminClient;
        }
    

      

    2、显示topic列表

     /**
         * 获取Topic列表
         */
        public static  void topicLists() throws Exception{
            AdminClient adminClient = adminClient();
            //是否查看internal选项
            ListTopicsOptions options = new ListTopicsOptions();
            options.listInternal(true);
            ListTopicsResult listTopicsResult = adminClient.listTopics(options);
            Set<String> names  = listTopicsResult.names().get();
    
            //打印names
            names.stream().forEach(System.out::println);
        }
    

      

    3、删除Topic

        /**
         * 删除Topic
         */
        public static  void delTopics() throws Exception{
            AdminClient adminClient = adminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
            deleteTopicsResult.all().get();
        }
    

      

    4、topic描述

        /**
         * 描述Topic
         */
        public static  void describeTopics() throws Exception{
            AdminClient adminClient = adminClient();
            DescribeTopicsResult describeTopicsResult  = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
            Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
            Set<Map.Entry<String,TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
            entries.stream().forEach((entry) ->{
                System.out.println("name:" + entry.getKey() + ", desc:" + entry.getValue());
            });
        } 

    输出结果

    name:test5,

    desc:(name=test5,

           internal=false,

        partitions=

        (partition=0,

        leader=118.xx.xx.101:9092

        (id: 0 rack: null),

        replicas=118.xx.xx.101:9092

         (id: 0 rack: null),

        isr=118.xx.xx.101:9092

        (id: 0 rack: null)),

        authorizedOperations=null)

    5、查看查看Config

        /**
         * 查看Config
         */
        public static  void describeConfig() throws Exception{
            AdminClient adminClient = adminClient();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
            DescribeConfigsResult describeConfigsResult  = adminClient.describeConfigs(Arrays.asList(configResource));
            Map<ConfigResource, Config>  configResourceConfigMap =describeConfigsResult.all().get();
            configResourceConfigMap.entrySet().stream().forEach((entry) -> {
                System.out.println("configResource:" + entry.getKey()  + ",Config: " + entry.getValue());
            });
    
        }
    

      

    6、修改Config

     /**
         * 修改Config
         */
        public static  void alertConfig() throws Exception{
            AdminClient adminClient = adminClient();
            Map<ConfigResource,Config> configMap = new HashMap<>();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
            Config config = new Config(Arrays.asList(new ConfigEntry("preallocate","false")));
            configMap.put(configResource, config);
    
            AlterConfigsResult alterConfigsResult  = adminClient.alterConfigs(configMap);
            alterConfigsResult.all().get();
        }
    

      

    7、增加partitions数量

      /**
         * 增加partitions数量
         */
        public static  void incrPartitions(int partitions) throws Exception{
            AdminClient adminClient = adminClient();
            Map<String,NewPartitions> partitionsMap = new HashMap<>();
            NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
            partitionsMap.put(TOPIC_NAME, newPartitions);
            CreatePartitionsResult createPartitionsResult =  adminClient.createPartitions(partitionsMap);
            createPartitionsResult.all().get();
        }
    

      

    作者:Work Hard Work Smart
    出处:http://www.cnblogs.com/linlf03/
    欢迎任何形式的转载,未经作者同意,请保留此段声明!

  • 相关阅读:
    Celery
    MongoDB-简介
    人工智障
    Flask-session,WTForms,POOL,Websocket通讯原理 -握手,加密解密过程
    web-socket
    flask基础2
    flask的基础1
    项目部署
    nginx简单学习
    redis的安装与配置
  • 原文地址:https://www.cnblogs.com/linlf03/p/15255251.html
Copyright © 2011-2022 走看看