zoukankan      html  css  js  c++  java
  • kafka主题管理


    title: kafka主题管理
    top: false
    cover: false
    toc: true
    mathjax: true
    date: 2021-03-28 21:49:14
    password:
    summary:
    tags:

    • kafka
      categories:
    • kafka

    主题增删改查

    创建

    bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1
    

    从 Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数

    查询

    # 查询所有主题的列表
    bin/kafka-topics.sh --bootstrap-server broker_host:port --list
    # 查询单个主题的详细数据
    bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
    

    修改

    # 增加分区
    bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
    
    # 修改主题级别参数,常规的主题级别参数,使用 --zookeeper;动态参数使用 --bootstrap-server 
    bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
    
    # 变更副本数
    # reassign.json
    {"version":1, "partitions":[
     {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
      {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
      {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
      {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
      ...
      {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
    ]}` 
    bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
    
     # 修改test主题限速
    bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
    bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
    
    # 主题分区迁移
    kafka-reassign-partitions
    

    删除

    bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>
    

    特殊主体管理

    查看消费者组提交的位移数

    bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning
    

    查看消费者组的状态信息

    bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$GroupMetadataMessageFormatter" --from-beginning
    

    常见主题错误处理

    主题删除失败

    • 副本所在的 Broker 宕机了;--重启即可
    • 待删除主题的部分分区依然在执行迁移过程

    解决

    第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。

    第 2 步,手动删除该主题在磁盘上的分区目录。

    第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。--非必须,可能造成大面积的分区 Leader 重选举

    __consumer_offsets 占用太多的磁盘

    jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。

  • 相关阅读:
    理解WebKit和Chromium: Web应用和Web运行环境
    理解WebKit和Chromium: 网页渲染的基本过程
    【闲谈】我的大学
    使用GDAL将下载的Google卫星图像转为带坐标的tif
    Linux下使用GDAL进行开发(automake使用)
    Linux下编译GDAL
    【Unity技巧】统一管理回调函数——观察者模式
    【Unity技巧】使用单例模式Singleton
    【Unity插件】LitJson杂谈
    理解WebKit和Chromium:Chromium资源磁盘缓存
  • 原文地址:https://www.cnblogs.com/hainingwyx/p/14612541.html
Copyright © 2011-2022 走看看