zoukankan      html  css  js  c++  java
  • Kafka入门之命令行操作

    1.创建topic

    [root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 3 --partitions 3 --topic test
    
    # CDH版本
    
    kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --create --replication-factor 3 --partitions 3 --topic test
    

    参数说明:

    • –replication-factor:指定每个分区的副本个数,默认1个(指定的副本数包含自身1个)

    • –partitions:指定当前创建的kafka分区数量,默认为1个

    • –topic:指定新建topic的名称)

    2.查看当前服务器中的所有topic

    [root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
    test
    
    # CDH版本
    
    kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --list
    

    3.删除topic

    [root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic test
    
    # CDH版本
    
    kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --delete --topic test
    

    在kafka集群中删除topic,当前topic只是被标记成删除。

    标记为删除还可以读取数据,只是会在一周后自动删除所有数据

    要想执行删除命令后直接删除,则需要在config/server.properties配置delete.topic.enable=true

    4.创建生产者发送消息

    [root@node01 kafka]$ bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
    >hello world
    >cw  cw
    

    5.创建消费者消费消息

    # kafka0.9之前,指定zookeeper
    [root@node01 kafka]$ bin/kafka-console-consumer.sh 
    --zookeeper node01:2181,node02:2181,node03:2181 --topic test
    
    # kafka0.9及之后,指定的是kafka集群
    [root@node01 kafka]$ bin/kafka-console-consumer.sh 
    --bootstrap-server node01:9092,node02:9092,node03:9092 --topic test
    
    [root@node01 kafka]$ bin/kafka-console-consumer.sh 
    --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic test
    
    --bootstrap-server cm1:9092,cm2:9092,cm3:9092
    # 在输入信息时,想要删除需要按ctrl+backspace
    

    –from-beginning:会把主题中以往所有的数据都读取出来。

    此时因为分为了三个分区,所以只能保证每个分区内部是有序的,分区之间无法保证有序

    此时如果生产者继续添加数据,消费者将按照添加顺序显示,因为此时consumer正在开启,producer生产一个它就消费一个,所以此时顺序能保证有序

    单分区有序,多分区无序

    6.查看某个Topic的详情

    [root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic test
    Topic:test	PartitionCount:3	ReplicationFactor:3	Configs:
    	Topic: test	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
    	Topic: test	Partition: 1	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
    	Topic: test	Partition: 2	Leader: 1	Replicas: 1,0,2	Isr: 1,0,2
    	
    # CDH版本命令
    $ kafka-topics --zookeeper cm1:2181,cm2:2181,cm3:2181 --describe --topic test
    
    • Partition:所在分区

    • Leader:由哪个leader管理

    • Replication:副本所在位置(顺序就是优先副本的顺序)

    • Isr:检查数据的完整性

    7.修改分区数

    注意:topic的分区数只能增加,不能减少,因为partition中已经有数据了

    [root@node01 kafka]$ bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --alter --topic test --partitions 6
    

    8.在zookeeper上查看消费者和broker的消息

    # 启动zookeeper客户端:
    ./zkCli.sh
    # 查看topic相关信息:
    [zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
    [test]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/test
    [partitions]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics/test/partitions
    [0]
    [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/test/partitions/0
    [state]
    [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test/partitions/0/state
    []
    [zk: localhost:2181(CONNECTED) 5] ls
    [zk: localhost:2181(CONNECTED) 6] get /brokers/topics/test/partitions/0/state
    {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1]}
    cZxid = 0x4f00000113
    ctime = Sun Jul 28 17:17:33 CST 2019
    mZxid = 0x4f00000113
    mtime = Sun Jul 28 17:17:33 CST 2019
    pZxid = 0x4f00000113
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 72
    numChildren = 0
    
    # 查看消费者相关信息:
    [zk: localhost:2181(CONNECTED) 7] ls /consumers
    [console-consumer-21115]
    [zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-21115
    [ids, owners, offsets]
    [zk: localhost:2181(CONNECTED) 9] ls /consumers/console-consumer-21115/offsets
    [test]
    [zk: localhost:2181(CONNECTED) 10] ls /consumers/console-consumer-21115/offsets/test
    [0]
    [zk: localhost:2181(CONNECTED) 11] ls /consumers/console-consumer-21115/offsets/test/0
    []
    [zk: localhost:2181(CONNECTED) 12] get /consumers/console-consumer-21115/offsets/test/0
    2
    cZxid = 0x4f0000011e
    ctime = Sun Jul 28 17:18:23 CST 2019
    mZxid = 0x4f0000011e
    mtime = Sun Jul 28 17:18:23 CST 2019
    pZxid = 0x4f0000011e
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 1
    numChildren = 0
    

    9.如何完全删除kafka中的数据

    1. 在kafka集群中删除topic,当前topic被标记成删除。

      标记为删除还可以读取数据,只是会在一周后自动删除所有数据

      开启配置项:0.8版本中 执行删除命令后会直接删除。

      在config/server.properties配置delete.topic.enable=true

      [root@node01 ~]# kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
      test
      [root@node01 ~]# kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic test
      Topic test is marked for deletion.
      Note: This will have no impact if delete.topic.enable is not set to true.
      
    2. 在每台broker节点上删除当前这个topic对应的真实数据。

      [root@node01 ~]# cd /opt/module/kafka/logs
      [root@node01 logs]# ls
      cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  test-0  test-1  test-2  replication-offset-checkpoint
      [root@node01 logs]# rm -rf ./test-0
      [root@node01 logs]# rm -rf ./test-1
      [root@node01 logs]# rm -rf ./test-2
      
      [root@node02 logs]# rm -rf ./test-0
      [root@node02 logs]# rm -rf ./test-1
      [root@node02 logs]# rm -rf ./test-2
      
      [root@node03 logs]# rm -rf ./test-0
      [root@node03 logs]# rm -rf ./test-1
      [root@node03 logs]# rm -rf ./test-2
      
    3. 进入zookeeper客户端,删除topic信息

      [root@node01 logs]# zkCli.sh 
      [zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
      [test]
      [zk: localhost:2181(CONNECTED) 1] rmr /brokers/topics/test
      [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics     
      []
      
    4. 删除config中的元数据消息

      [zk: localhost:2181(CONNECTED) 5] ls /config/topics
      [test]
      [zk: localhost:2181(CONNECTED) 6] rmr /config/topics/test
      [zk: localhost:2181(CONNECTED) 7] ls /config/topics     
      []
      
    5. 删除zookeeper中被标记为删除的topic信息

      [zk: localhost:2181(CONNECTED) 9] rmr /admin/delete_topics/test
      

  • 相关阅读:
    Nginx 跨域配置支持
    数据结构与算法分析
    数据结构与算法分析
    数据结构与算法分析
    Bash shell
    Bash shell
    HHUOJ 1040
    HHUOJ 1114
    HDUOJ 1171
    HDUOJ 1428
  • 原文地址:https://www.cnblogs.com/chenxiaoge/p/13335417.html
Copyright © 2011-2022 走看看