zoukankan      html  css  js  c++  java
  • kafka系列六、java管理kafka Topic

    package com.example.demo.topic;
    
    import kafka.admin.AdminUtils;
    import kafka.admin.RackAwareMode;
    import kafka.server.ConfigType;
    import kafka.utils.ZkUtils;
    import org.apache.kafka.common.requests.MetadataResponse;
    import org.apache.kafka.common.security.JaasUtils;
    import scala.collection.JavaConversions;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    public class KafkaTopic {
        public static void main(String[] args) {
            //createTopic();
            //deleteTopic();
            //listAllTopic();
            // getTopic();
            listTopicAllConfig();
        }
    
        /**
        * 创建主题
        * kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-action --replication-factor 2 --partitions 3
        */
        private static void createTopic() {
            ZkUtils zkUtils = ZkUtils.apply("47.52.199.51:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
            // 创建一个单分区单副本名为t1的topic
            AdminUtils.createTopic(zkUtils, "topic-20", 3, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
            zkUtils.close();
        }
    
        /**
         * 除某主题
         * kafka-topics.sh --zookeeper localhost:2181 --topic kafka-action --delete
         */
        private static void deleteTopic() {
            ZkUtils zkUtils = ZkUtils.apply("47.52.199.51:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
            // 删除topic 't1'
            AdminUtils.deleteTopic(zkUtils, "topic-19");
            zkUtils.close();
        }
    
        /**
         * 修改主题配置     kafka-config --zookeeper localhost:2181 --entity-type topics --entity-name kafka-action     
    * --alter --add-config max.message.bytes=202480 --alter --delete-config flush.messages
    */ private static void updateTopic() { ZkUtils zkUtils = ZkUtils.apply("47.52.199.51:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "topic-19"); // 增加topic级别属性 props.put("min.cleanable.dirty.ratio", "0.3"); // 删除topic级别属性 props.remove("max.message.bytes"); // 修改topic 'test'的属性 AdminUtils.changeTopicConfig(zkUtils, "test", props); zkUtils.close(); } /** * * 查看所有主题 kafka-topics.sh --zookeeper localhost:2181 --list */ public static void listAllTopic() { ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply("47.52.199.51:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); List<String> topics = JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); topics.forEach(System.out::println); } catch (Exception e) { e.printStackTrace(); } finally { if (zkUtils != null) { zkUtils.close(); } } } /** * 得到所有topic的配置信息 kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --describe */ public static void listTopicAllConfig() { ZkUtils zkUtils = null; try { zkUtils = ZkUtils.apply("47.52.199.51:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); Map<String, Properties> configs = JavaConversions.mapAsJavaMap(AdminUtils.fetchAllTopicConfigs(zkUtils)); // 获取特定topic的元数据 MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk("topic-19",zkUtils); // 获取特定topic的配置信息 Properties properties = AdminUtils.fetchEntityConfig(zkUtils,"topics","kafka-test"); for (Map.Entry<String, Properties> entry : configs.entrySet()) { System.out.println("key=" + entry.getKey() + " ;value= " + entry.getValue()); } } catch (Exception e) { e.printStackTrace(); } finally { if (zkUtils != null) { zkUtils.close(); } } } }
  • 相关阅读:
    设计模式学习笔记——单例(Singleton)模式
    设计模式学习笔记——抽象工厂(Abstract Factory)模式
    一些C++的好书
    C++概念重载、覆盖、隐藏
    集合的子集和集合的全排列问题
    百度二面,悲剧了,附面试题,欢迎探讨。
    【翻译】ASP.NET MVC4 入门(二)添加一个Controller
    遇到问题应该多思考一下——由一个泛型方法想到的
    【翻译】ASP.NET MVC4 入门(四)添加一个Model
    我的第一篇博客——Delegate的秘密
  • 原文地址:https://www.cnblogs.com/wangzhuxing/p/10105961.html
Copyright © 2011-2022 走看看