zoukankan      html  css  js  c++  java
  • 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载

    原文:http://blog.csdn.net/changong28/article/details/39325079

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通过API的方式进行设定,无需在之前或之后使用脚本进行操作,所以才了这篇文章。查看源码发现,其实内部所有的实现都是通过TopicCommand的main方法,在此记录两种方式:

    1、创建主题(Topic)

    【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

    【JAVA API方式】:

    1. String[] options = new String[]{  
    2.     "--create",  
    3.     "--zookeeper",  
    4.     "zk_host:port/chroot",  
    5.     "--partitions",  
    6.     "20",  
    7.     "--topic",  
    8.     "my_topic_name",  
    9.     "--replication-factor",  
    10.     "3",  
    11.     "--config",  
    12.     "x=y"  
    13. };  
    14. TopicCommand.main(options);  

    2、查看所有主题

    【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181

    【JAVA API方式】:

    1. String[] options = new String[]{  
    2.     "--list",  
    3.     "--zookeeper",  
    4.     "localhost:2181"  
    5. };  
    6. TopicCommand.main(options);  


    3、查看指定主题:

    【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

    【JAVA API方式】: 

    1. String[] options = new String[]{  
    2.     "--describe",  
    3.     "--zookeeper",  
    4.     "localhost:2181",  
    5.     "--topic",  
    6.     "my-replicated-topic",  
    7. };  
    8. TopicCommand.main(options);  


    4、修改主题:

    【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
    【JAVA API方式】:

     
    1. String[] options = new String[]{  
    2.     "--alter",  
    3.     "--zookeeper",  
    4.     "zk_host:port/chroot",  
    5.     "--topic",  
    6.     "my_topic_name",  
    7.     "--deleteConfig",  
    8.     "x"  
    9. };  
    10. TopicCommand.main(options);  



    5、删除出题:

       【命令方式】:无

       【JAVA API方式】:

     
      1. String[] options = new String[]{  
      2.     "--zookeeper",  
      3.     "zk_host:port/chroot",  
      4.     "--topic",  
      5.     "my_topic_name"  
      6. };  
      7. DeleteTopicCommand.main(options);  

    另:下文kafka删除topic的方法(出自 “菜光光的博客” 博客,出处http://caiguangguang.blog.51cto.com/1652935/1548069)

    0.8的官方文档提供了一个删除topic的命令:

    kafka-topics.sh --delete 但是在运行时会报错找不到这个方法。

    kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。

    在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。 

    kafka.admin.DeleteTopicCommand 

    其中删除topic的具体实现代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import org.I0Itec.zkclient.ZkClient
    import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
    .......
        val topic = options.valueOf(topicOpt)
        val zkConnect = options.valueOf(zkConnectOpt)
        var zkClient: ZkClient = null
        try {
          zkClient = new ZkClient(zkConnect, 3000030000, ZKStringSerializer)
          zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))  //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
          println("deletion succeeded!")
        }
        catch {
          case e: Throwable =>
            println("delection failed because of " + e.getMessage)
            println(Utils.stackTrace(e))
        }
        finally {
          if (zkClient != null)
            zkClient.close()
        }

    因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。

  • 相关阅读:
    肥胖儿筛选标准
    文章索引
    面向对象66原则
    [精]Xpath路径表达式
    [精]XPath入门教程
    孕产期高危因素
    “华而不实”的转盘菜单(pie menu)
    xmind用例导excel用例,然后再用python排版
    NSObject
    [self class]与[super class]
  • 原文地址:https://www.cnblogs.com/davidwang456/p/4313784.html
Copyright © 2011-2022 走看看