zoukankan      html  css  js  c++  java
  • kafka 修改partition,删除topic,查询offset

    修改分区个数:

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --alter --partitions 10 --topic test_topic

    上面命令将test_topic对应的分区数目调整为10个,127.0.0.1是zookeeper的地址,2181是zookeeper端口。

    删除topic

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --delete --topic test_topic

    上面命令将主题test_topic标记为删除,但是如果delete.topic.enable没有配置为True,上述命令无效。

    修改过期时间:

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --alter --partitions 10 --topic test_topic --config retention.ms=86400000 --config cleanup.policy=delete

     查询topic offset

    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <ip:port of kafka instead of zookeeper> --topic topic_to_select --time -1  # 偏移最大值
    ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <ip:port of kafka instead of zookeeper> --topic topic_to_select --time -2 # 偏移最小值

    查询topic详细信息

    ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --describe --topic topic_to_select

     look up consumer group info, include messages lag. there is new_consumer and old_consumer param, default is new_consumer.

    ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test_group_id --describe

     check config, entity-type could be brokers, topics, users

    ./kafka-configs.sh --describe --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --entity-type topics

    set config

    Topics have broker-wide configs that apply by default to any topic that doesn't have a config, but topics can also have topic-specific configs that override or complement broker-wide topic configs. Broker-wide topic configs are set in your service.properties file. Topic specific configs are set using the bin/kafka-topics.sh script.

    To set retention.ms on a new topic:

    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
      --partitions 20 --replication-factor 3 --config retention.ms=-1

    modify config of an existing topic:

    ./kafka-configs.sh --alter --zookeeper 127.0.0.1:2181/kafka/<id_of_kafka> --add-config cleanup.policy=delete --entity-type topic --entity-name <topic_name> 

    kafka启动关闭

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  # zookeeper 守护进程启动
    bin/zookeeper-server-stop.sh  # 关闭zookeeper
    export JMX_PORT=9999   # 设置端口以在kafka mananger显示统计数据
    bin/kafka-server-start.sh -daemon config/server.properties  # 以守护进程启动kafka
    bin/kafka-server-stop.sh  # 关闭kafka

    zookeeper查看kafka节点状态

    ./zookeeper-shell.sh 172.18.185.123:2181,172.18.185.124:2181  <<< "ls /brokers/ids"

    ref: https://stackoverflow.com/questions/48504123/choosing-the-right-cleanup-policy-in-kafka-configuration

    https://stackoverflow.com/questions/40146921/command-to-get-kafka-broker-list-from-zookeeper

  • 相关阅读:
    CF1305 Ozon Tech Challenge 2020 游戏存档
    CF1310A Recommendations 题解
    CF755G PolandBall and Many Other Balls 题解
    关于后缀自动机
    具体数学学习笔记
    Flask-SQLAlchemy中解决1366报错
    常用的SQLalchemy 字段类型
    flask_model防止循环引用
    navicate远程访问ubuntu上的mysql数据库
    flask运行环境搭建(nginx+gunicorn)
  • 原文地址:https://www.cnblogs.com/buxizhizhoum/p/8251494.html
Copyright © 2011-2022 走看看