zoukankan      html  css  js  c++  java
  • CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

    一、相关基础内容

    然后正常kafka的指令是 :  ./bin/kafka-topics.sh --zookeeper hadoop300:2181 .......

    但是使用CDH安装的kafka则不需要全写出此  ./bin/kafka-topics.sh  部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。

    具体有哪些指令可以看此路径下:

    /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/bin

    二、topic主题使用

    所以我们使用topic的指令格式应该都类似:

    kafka-topics --zookeeper hadoop300:2181/kafka ......

    • 创建一个名为test 的主题(Topic):
    kafka-topics --zookeeper hadoop300:2181/kafka --create -replication-factor 1 --partitions 3 --topic test
    • 若是上述中的 ZooKeeper Root 的Kafka服务范围为: " "。则这里的创建主题指令改为:

    kafka-topics --zookeeper hadoop300:2181--create -replication-factor 1 --partitions 3 --topic test

    • 删除创建的topic
    kafka-topics --zookeeper hadoop300:2181/kafka --delete --topic test

    这里如果直接删除,则会输出    Topic *** is marked for deletion  如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。

    方法一:修改kafaka配置文件server.properties, 添加 delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic。

    方法二:通过命令行删除topic:  ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}

    因为kafaka配置文件中server.properties没有配delete.topic.enable=true此时的删除并不是真正的删除,只是把topic标记为:marked for deletion     你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list 来查看所有topic

    方法三:若需要真正删除它

    需要登录zookeeper客户端:zookeeper-client

    找到topic所在的目录:ls /kafka/brokers/topics

    执行命令,即可,此时topic被彻底删除:rmr /kafka/brokers/topics/{topic name}

    • 修改topic的分区数
    kafka-topics --zookeeper hadoop300:2181/kafka --alter --topic test  partitions 5
    • 接着启动消费者
    kafka-console-consumer --bootstrap-server hadoop300:9092 --topic test --from-beginning
    • 查看消费数据后的偏移量 kafka-run-class
    (1)查看每个Partition的最新偏移量
    kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic  --time -1
     
    (2)查看每个Partition的最早的偏移量
    kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic  --time -2
     
    (3)查看consumer组内消费的offset
    kafka-run-class  kafka.tools.ConsumerOffsetChecker --zookeeper hadoop:2181  --topic yourTopic
    • 获取topic消费组的偏移量
    kafka-consumer-offset-checker --zookeeper=hadoop300:2181 --topic=mytopic  --group=my_consumer_group
  • 相关阅读:
    反射
    定义类
    集合list,set,Map
    API util
    进程与多线程
    泛型
    API string
    JDBC存储过程
    预处理
    JDBC
  • 原文地址:https://www.cnblogs.com/hyunbar/p/13584957.html
Copyright © 2011-2022 走看看