zoukankan      html  css  js  c++  java
  • kafka相关业务必会操作命令整理

    参考:https://kafka.apache.org

    • 服务相关命令

    1、启动/停止zk

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    > bin/zookeeper-server-stop.sh

    2、启动kafka

    > bin/kafka-server-start.sh config/server.properties
    > bin/kafka-server-stop.sh

    3、配置多节点brokers集群

    首先复制出两个配置文件:

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    然后配置:
    broker.id是两个broker的唯一标志,port需要修改为不同的两个
    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
    启动两个节点:
    > bin/kafka-server-start.sh config/server-1.properties &
    > bin/kafka-server-start.sh config/server-2.properties &
    • topic命令

    1、创建一个topic
    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TEST_TOPIC

    2、查看topic列表

    > bin/kafka-topics.sh --list --bootstrap-server localhost:9092

    Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

    除了手动创建topic,还可以配置auto-create topics,当一个不存在的topic发布时可自动创建

    3、查看某个topic信息

    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic TEST_TOPIC

    4、修改某个topic的partition个数(分区只能增不能减)

    > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic TEST_TOPIC --partitions 10

    5、删除某个topic(只会删除zookeeper内的元数据,消息文件需要手动删除)

    > bin/kafka-topics.sh  --delete --zookeeper localhost:2181  --topic TEST_TOPIC
    • consumer相关

    1、查看所有consumer的group列表

    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

    2、查看指定group(group_test)的消费情况

    > bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
    • console相关

    1、生产数据

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC

    2、消费数据

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic TEST_TOPIC
    • 其他

    1、重置指定topic的offset

    背景,因为程序写的不够好,辛苦造了2亿多数据跑入库测性能,跑到中间数据库服务器撑爆了,结果都写进日志了。。。

    > bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets -to-offset 97000000 --topic TEST_TOPIC --execute

    Other supported arguments:

    --shift-by [positive or negative integer] - Shifts offset forward or backward from given integer.往前或往后加减offset

    --to-current and --to-latest are same as --to-offset [integer] and --to-earliest.

    --to-datetime [Datetime format is yyyy-MM-ddTHH:mm:ss.xxx]

     > bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group group_test --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute

    how to validata:

    > bin/kafka-consumer-groups --bootstrap-server locahost:9092 --group <group_id> --describe

    注意:这里重置的offset是针对每个partition而言,重置offset之前,所有的consumer必须是inactive的,也就是应用消费必须暂停

  • 相关阅读:
    洛谷-P5734 【深基6.例6】文字处理软件
    洛谷-P5733 【深基6.例1】自动修正
    洛谷-P5730 【深基5.例10】显示屏
    洛谷-P2615 神奇的幻方
    栈和堆的狗屎笔记
    值类型和引用类型笔记(连接到栈和堆的芝士点)
    总结Linq或者lamdba的写法(2)
    安装VS和SQL的默认文件夹路径
    托管调试助手 "ContextSwitchDeadlock":“CLR 无法从 COM 上下文 0x2d59360 转换为 COM 上下文 0x2d592a8,这种状态已持续 60 秒。拥有目标上下文/单元的线程很有可能执行的是非泵式等待或者在不发送 Windows 消息的情况下处理一个运行时间非常长的操作。这种情况通常会影响
    超详细的Canal入门,看这篇就够了!
  • 原文地址:https://www.cnblogs.com/yb38156/p/11840731.html
Copyright © 2011-2022 走看看