zoukankan      html  css  js  c++  java
  • Kafka的常用管理命令

    1. 查看kafka都有那些topic

    a. list
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper test1.hadoop.puhuifinance.com:2181
    b. describe
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
    c. delete, 这里要设置 delete.topic.enable=true
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
    d. 查看topic中的数据
    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test 
    # --from-beginning 从头开始读取
    # --property print.key=true 打印key
    e. 查看针对topic每个group 消费的程度, 是不是特定的zookeeper path?
    /usr/hdp/current/kafka-broker/bin/kafka-consumer-offset-checker.sh --zookeeper=test1.hadoop.puhuifinance.com:2181 --group=test
    f. 生产者:
    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list test1.hadoop.puhuifinance.com:6667 --topic test < flume-spark.log
    2. topic的创建与修改.
    a. create
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper test1.hadoop.puhuifinance.com:2181 --replication-factor 2 --partitions 4 --topic test
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper test1.hadoop.puhuifinance.com:2181 --replication-factor 3 --partitions 3 --topic test
    b. alter partitions
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --alter --zookeeper test1.hadoop.puhuifinance.com:2181 --partitions 3 --topic test
    3. 更改replication 
    a. 首先查看topic当前的partition 及replication状态.
    cat topics-info.json
    {"topics": [{"topic": "TaoBaoTopic"}],
    "version":1
    }
    /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topics-to-move-json-file topics-info.json --broker-list "1,2" --generate
    b. 会得到类似下面的结果:
    Current partition replica assignment
    {
    "version": 1,
    "partitions": [
    {
    "topic": "TaoBaoTopic",
    "partition": 2,
    "replicas": [
    1
    ]
    },
    {
    "topic": "TaoBaoTopic",
    "partition": 0,
    "replicas": [
    2
    ]
    },
    {
    "topic": "TaoBaoTopic",
    "partition": 1,
    "replicas": [
    0
    ]
    }
    ]
    }
    c. 根据要改动的replication策略, 例如:
    cat TaoBao_Topic_alter_replication.json
    {
    "version": 1,
    "partitions": [
    {
    "topic": "TaoBaoTopic",
    "partition": 2,
    "replicas": [
    1,2,0
    ]
    },
    {
    "topic": "TaoBaoTopic",
    "partition": 0,
    "replicas": [
    2,0,1
    ]
    },
    {
    "topic": "TaoBaoTopic",
    "partition": 1,
    "replicas": [
    0,1,2
    ]
    }
    ]
    }

    d. 将更改的策略进行实施, TaoBao_Topic_alter_replication.json
    /usr/hdp/current/kafka-broker/bin//kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --reassignment-json-file TaoBao_Topic_alter_replication.json --execute
    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"TaoBaoTopic","partition":2,"replicas":[1]},{"topic":"TaoBaoTopic","partition":0,"replicas":[2]},{"topic":"TaoBaoTopic","partition":1,"replicas":[0]}]}
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"TaoBaoTopic","partition":2,"replicas":[1,2,0]},{"topic":"TaoBaoTopic","partition":0,"replicas":[2,0,1]},{"topic":"TaoBaoTopic","partition":1,"replicas":[0,1,2]}]}
    e. 查看replication的过程,或者是服务迁移的过程, 例如将原有的topic分配到新增加的服务器上.
    /usr/hdp/current/kafka-broker/bin//kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --reassignment-json-file TaoBao_Topic_alter_replication.json --verify
    Status of partition reassignment:
    Reassignment of partition [TaoBaoTopic,2] completed successfully
    Reassignment of partition [TaoBaoTopic,0] completed successfully
    Reassignment of partition [TaoBaoTopic,1] completed successfully
    f. 通过之前的命令, 可以再查看下当前的状态.
    /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topics-to-move-json-file topics-info.json --broker-list "0,1,2" --generate
    或者是describe
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
    4. 一些参数设置
    a. producer http://kafka.apache.org/documentation.html#producerconfigs
    request.required.acks=1 
    producer.type=sync
    compression.codec=snappy

  • 相关阅读:
    72. Edit Distance
    电脑常识
    java try·····catch·····异常处理学习
    java链接sqlserver数据库
    HTTP Status 500
    初识NDA
    Sublime Text_v2.02包含中文包以及使用方法
    ol 与ul 的区别
    word-break: break-all word-break:keep-all word-wrap: break-word三者的区别
    用deamon打开ISO文件,提示命令行错误!!
  • 原文地址:https://www.cnblogs.com/lingfengblogs/p/5242297.html
Copyright © 2011-2022 走看看