zoukankan      html  css  js  c++  java
  • kafka入门2:java 创建及删除 topic

    1.pom

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.2.1</version>
      </dependency>
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
      </dependency>

    2.KafkaTopicBean

    public class KafkaTopicBean {
        
        private String topicName;       // topic 名称
        private Integer partition;      // partition 分区数量
        private Integer replication;    // replication 副本数量
        private String descrbe;  
        
        public String getTopicName() {  
            return topicName;  
        }  
      
        public void setTopicName(String topicName) {  
            this.topicName = topicName;  
        }  
      
        public Integer getPartition() {  
            return partition;  
        }  
      
        public void setPartition(Integer partition) {  
            this.partition = partition;  
        }  
      
        public Integer getReplication() {  
            return replication;  
        }  
      
        public void setReplication(Integer replication) {  
            this.replication = replication;  
        }  
      
        public String getDescrbe() {  
            return descrbe;  
        }  
      
        public void setDescrbe(String descrbe) {  
            this.descrbe = descrbe;  
        }  
      
        @Override  
        public String toString() {  
            return "KafkaTopicBean [topicName=" + topicName + ", partition=" + partition  
                    + ", replication=" + replication + ", descrbe=" + descrbe +"]";  
        }  
    
    }

    3.KafkaUtil

    import java.util.Properties;
    import org.apache.kafka.common.security.JaasUtils;
    import kafka.admin.AdminUtils;
    import kafka.admin.RackAwareMode;
    import kafka.utils.ZkUtils;
    
    public class KafkaUtil {
        
         public static void createKafaTopic(String ZkStr,KafkaTopicBean topic) {  
             ZkUtils zkUtils = ZkUtils.
                     apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
             
            AdminUtils.createTopic(zkUtils, topic.getTopicName(),  topic.getPartition(), 
                    topic.getReplication(),  new Properties(), new RackAwareMode.Enforced$());  
            zkUtils.close();
        }
         
         public static void deleteKafaTopic(String ZkStr,KafkaTopicBean topic) {  
             ZkUtils zkUtils = ZkUtils.
                     apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
             
            AdminUtils.deleteTopic(zkUtils, topic.getTopicName());  
            zkUtils.close();
        }
    
    }

    4.调用方式

        public static void main(String[] args) {
            
            //zookeeper地址:端口号
            String ZkStr = "912.168.0.1:2181";    
            
            //topic对象
            KafkaTopicBean topic = new KafkaTopicBean();    
            topic.setTopicName("testTopic");  //topic名称        
            topic.setPartition(1);            //分区数量设置为1
            topic.setReplication(1);         //副本数量设置为1
            
            //创建topic
            KafkaUtil.createKafaTopic(ZkStr,topic);
            //删除topic
            KafkaUtil.deleteKafaTopic(ZkStr,topic);
    
        }
  • 相关阅读:
    [Angular2 Form] Build Select Dropdowns for Angular 2 Forms
    [Angular2 Form] Create Radio Buttons for Angular 2 Forms
    [Angular2 Router] Exiting an Angular 2 Route
    [Angular2 Router] Optional Route Query Parameters
    JS 实现地区,省份,城市,县区4级联动
    Linux web工程部署远程必备软件安装
    [置顶] 白话01背包
    APUE读书笔记-第17章-高级进程间通信
    UVA 10779 Collectors Problem(最大流)
    (二) win8+XAML Binding(数据绑定)
  • 原文地址:https://www.cnblogs.com/MIC2016/p/9020562.html
Copyright © 2011-2022 走看看