zoukankan      html  css  js  c++  java
  • Kafka常用命令及配置文件

    创建topic,指定备份分区数

    bin/kafka-topics.sh --create --zookeeper zk:2181 --replication-factor 2 --partitions 4 --topic test-topic

    查看topic

    查看topic列表

    bin/kafka-topics.sh --zookeeper zk:2181–list

    查看topic分区情况

    bin/kafka-topics.sh --zookeeper zk:2181 --describe

    指定topic

    bin/kafka-topics.sh --zookeeper zk:2181 --describe --topic test-topic

    创建producer

    bin/kafka-console-producer.sh --broker-list broker:9092 --topic test-topic

    创建consumer

    bin/kafka-console-consumer.sh --zookeeper zk:2181 --topic test-topic

    bin/kafka-console-consumer.sh --zookeeper zk:2181 --topic test-topic --from-beginning

    新生产者(支持0.9版本+)

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

    新消费者(支持0.9版本+)

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

    修改主题分区

    ./kafka-topics.sh --zookeeper zk:2181 --alter --topic TEST_16 --partitions 16

    修改主题参数

    修改保存时间

    bin/kafka-topics.sh --zookeeper zk:2181 -topic test--alter --config retention.ms=86400000

    基于0.8.0版本。

    重新分配分区kafka-reassign-partitions.sh

    这个命令可以分区指定到想要的--broker-list上 bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute cat topic-to-move.json {"topics": [{"topic": "test2"}], "version":1 }

    为Topic增加 partition数目kafka-add-partitions.sh

    bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)

    手动均衡topic

    bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json

    cat preferred-click.json

    { "partitions": [ {"topic": "click", "partition": 0}, {"topic": "click", "partition": 1}, {"topic": "click", "partition": 2}, {"topic": "click", "partition": 3}, {"topic": "click", "partition": 4}, {"topic": "click", "partition": 5}, {"topic": "click", "partition": 6}, {"topic": "click", "partition": 7}, {"topic": "play", "partition": 0}, {"topic": "play", "partition": 1}, {"topic": "play", "partition": 2}, {"topic": "play", "partition": 3}, {"topic": "play", "partition": 4}, {"topic": "play", "partition": 5}, {"topic": "play", "partition": 6}, {"topic": "play", "partition": 7}

    ] }

    分区迁移,扩容

    cat > increase-replication-factor.json <<EOF {"version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]}, {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]}] } EOF

    执行

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

    验证

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

    0.8版本查询offset信息

    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk:2181 --group test--topic test

    新消费者列表查询(支持0.9版本+)

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

    显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)

    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

    显示某个消费组的消费详情(支持0.9版本+)

    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

    平衡leader

    bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

    kafka自带压测命令

    bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

    broker配置项 默认值 推荐值 示例 说明
    broker.id null null broker.id=1 当前broker的id,集群内互异
    host.name null null host.name=10.10.10.10 broker的主机地址,若有配置,会绑定到该地址,若没有,会绑定到所有的地址
    0.10版本引入listeners配置,若listeners有配置,该项不生效
    port 9092 9092 port=9092 broker监听端口
    0.10版本引入listeners配置,若listeners有配置,该项不生效
    listeners "" PLAINTEXT://10.10.10.10:9092 listeners=PLAINTEXT://10.10.10.10:9092 服务监听配置
    auto.create.topics.enable true false auto.create.topics.enable=false 允许自动创建主题
    auto.leader.rebalance.enable true true auto.leader.rebalance.enable=true 后台定期自动重分配leader
    default.replication.factor 1 3 default.replication.factor=3 默认副本数,同时也决定了集群所有主题均可以正常使用的允许的宕机台数为N-1(N为配置值)
    delete.topic.enable true true delete.topic.enable=true 允许删除主题
    group.initial.rebalance.delay.ms 3000 3000 group.initial.rebalance.delay.ms=3000 消费组内各节点第一次启动的负载均衡分配等待时间,配置越大意味着重新平衡次数越少,但会增加等待时间
    log.cleaner.enable true true log.cleaner.enable=true 清理策略定义的过期数据日志
    log.dirs null pathtokafka/kafka-instance-logs log.dirs=/app/kafka/kafka-test-logs kafka数据日志目录
    log.flush.interval.messages 9223372036854775807 20000(根据实际业务情况配置) log.flush.interval.messages=20000 将分区消息刷到磁盘的消息量
    log.flush.interval.ms =log.flush.scheduler.interval.ms=9223372036854775807 300000 log.flush.interval.ms=300000 将消息刷到磁盘的时间间隔
    log.retention.check.interval.ms 300000 600000 log.retention.check.interval.ms=600000 失效数据日志检查间隔
    log.retention.hours 168 72 log.retention.hours=72 数据保留时间
    log.segment.bytes 1073741824 1073741824 log.segment.bytes=1073741824 单个数据文件大小
    num.io.threads 8 8 num.io.threads=8 服务器用于处理请求的线程数,包括磁盘I/O,可根据核数适当增加
    num.network.threads 3 8 num.network.threads=8 处理网络请求的线程数,主要用于读写缓冲区数据,一般不需要太大,可根据核数适当增加
    num.partitions 1 4 num.partitions=4 默认主题分区数
    num.recovery.threads.per.data.dir 1 16(各目录总和核数一半左右) num.recovery.threads.per.data.dir=16 宕机后恢复数据,清理数据的线程数,比较消耗CPU
    num.replica.fetchers 1 8 num.replica.fetchers=8 数据同步线程数
    socket.receive.buffer.bytes 102400 1048576 socket.receive.buffer.bytes=1048576 socket的接收缓冲区 (SO_RCVBUF)
    socket.send.buffer.bytes 102400 1048576 socket.send.buffer.bytes=1048576 socket的发送缓冲区(SO_SNDBUF)
    socket.request.max.bytes 104857600 104857600 socket.request.max.bytes=104857600 socket请求的最大字节数
    transaction.state.log.min.isr 1 1 transaction.state.log.min.isr=1 消息提交后返回成功的最小副本数,ISR中副本数
    zookeeper.connect "" "" zookeeper.connect=10.10.10.10:2181/kafka/instance zk连接串
    zookeeper.connection.timeout.ms =zookeeper.session.timeout.ms=6000 6000 zookeeper.connection.timeout.ms zk连接超时时间
  • 相关阅读:
    python基础(str,list,tuple)
    MySQL数据类型概念
    Ubuntu安装sublime
    Ubuntu安装pycharm
    ubuntu安装mysql
    微信小程序注册开发流程
    新开篇
    被玩坏了的题——马的遍历
    一道数学恶心题——小凯的疑惑
    搜索基础题:八皇后
  • 原文地址:https://www.cnblogs.com/lwhctv/p/12317624.html
Copyright © 2011-2022 走看看