zoukankan      html  css  js  c++  java
  • Apache Kafka运维常用命令

                    Apache Kafka运维常用命令

                                            作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.管理Kafka服务的命令

    1>.开启kafka服务

    [root@node106.yinzhengjie.org.cn ~]# kafka-server-start.sh /home/softwares/kafka_2.11-0.10.2.1/config/server.properties >> /dev/null  &          

    2>.停止kafka服务(刷新日志到本地且迁移当前broker的所有leader的parition)

    [root@node106.yinzhengjie.org.cn ~]# kafka-server-stop.sh         


    温馨提示:
      需要注意的是,我们在停止kafka集群时,如果时partition的副本数为2的话,不推荐大家同时停掉2台broker,而且在生产环境中执行该脚本时可能需要等一段时间broker进程才会彻底停止,此期间最好不要使用kill命令去强行杀死进程。
      
    优雅关闭:
      Kafka broker可能因为故障停机,但很多时候需要进行计划内的运维操作。这时候可使用更优雅的方式来关闭broker进程,而不是简单地kill掉。优雅关闭有两点好处:
        1>.会将所有logs刷写到磁盘,以便在重启时避免log恢复步骤,log恢复是比较耗时地,因此这回让计划内的重启加快。
        2>.会在关闭前,让该Broker是leader的所有partition把leader迁移到其他broker副本的服务器上去。这会使leader的转移加快,并使每个分区不可用的时间缩小到几毫秒。
      优雅关闭的脚本存放在kafka的安装目录下的bin目录下,脚本名称为"kafka-server-stop.sh",可以看到仍是使用kill来进行的。需要注意,上述好处第一条不需要额外配置,第二条需要设置controlled.shutdown.enable=true,而官方默认的配置就是true,因此我们不需要特意去修改该属性。

    3>.查看kafka进程ID

    [root@node106.yinzhengjie.org.cn ~]# jps | grep Kafka
    16434 Kafka
    [root@node106.yinzhengjie.org.cn ~]# 

    二.kafka-topics.sh命令使用案例

    1>.创建主题(kafka-topics.sh)

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --create --partitions 10 --replication-factor 2 --topic yinzhengjie-kafka
    Created topic "yinzhengjie-kafka".
    [root@node106.yinzhengjie.org.cn ~]# 

    2>.查看主题列表

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --list
    yinzhengjie-kafka
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  70 Jul 11 17:43 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  70 Jul 11 17:44 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-0
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-1
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-3
    
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:18 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root  48 Jul 11 17:43 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  48 Jul 11 17:44 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-0
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-8
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'        #我配置了3个存储目录,因此我们可以在这3个存储目录看到对应的数据目录哟~
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:18 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root  48 Jul 11 17:44 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  48 Jul 11 17:44 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-5
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-6
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  48 Jul 11 17:44 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  48 Jul 11 17:44 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-5
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-9
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:18 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root  48 Jul 11 17:44 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  48 Jul 11 17:45 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-2
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-4
    
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 12
    -rw-r--r-- 1 root root   0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root  56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  48 Jul 11 17:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  48 Jul 11 17:45 replication-offset-checkpoint
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-6
    drwxr-xr-x 2 root root 110 Jul 11 17:43 yinzhengjie-kafka-7
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'

    3>.删除topic

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --delete --topic yinzhengjie-kafka
    Topic yinzhengjie-kafka is marked for deletion.                    #这里告诉我们说"yinzhengjie-kafka"这个topic已经标记为删除啦!
    Note: This will have no impact if delete.topic.enable is not set to true.     #告诉我们如果想要真正删除的话,这里提示我们必须标记为True,但是真正的删除时间我们还得结合"log.retention.check.interval.ms"设置的时间,该参数用来检测触发删除策略的周期。为了实验方便,我推荐大家可以把这个值改为10000,即10秒,当然得重启集群方能生效。   
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node107.yinzhengjie.org.cn:2181,node108.yinzhengjie.org.cn:2181 --list
    yinzhengjie-kafka - marked for deletion
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root 70 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 70 Jul 11 17:55 replication-offset-checkpoint
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs'          #由于我配置的kafka数据存储目录为3个,因此我们可以去验证对应partition的存储目录是否被删除。
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint
    
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs2'
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'
    node107.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 4
    -rw-r--r-- 1 root root  0 Jul 11 15:45 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root  0 Jul 11 15:45 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root  0 Jul 11 15:45 replication-offset-checkpoint
    
    node108.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:18 meta.properties
    -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint
    
    node106.yinzhengjie.org.cn | SUCCESS | rc=0 >>
    total 16
    -rw-r--r-- 1 root root  4 Jul 11 17:55 cleaner-offset-checkpoint
    -rw-r--r-- 1 root root 56 Jul 11 15:45 meta.properties
    -rw-r--r-- 1 root root 48 Jul 11 17:55 recovery-point-offset-checkpoint
    -rw-r--r-- 1 root root 48 Jul 11 17:55 replication-offset-checkpoint
    
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# ansible kafka -m shell -a 'ls -l /home/data/kafka/logs3'

    4>.查看已经创建的topic详细信息

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --create --zookeeper node106.yinzhengjie.org.cn:2181/kafka01 --partitions 5 --replication-factor 2 --topic yinzhengjie-kafka001
    Created topic "yinzhengjie-kafka001".
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001
    Topic:yinzhengjie-kafka001    PartitionCount:5    ReplicationFactor:2    Configs:
        Topic: yinzhengjie-kafka001    Partition: 0    Leader: 108    Replicas: 108,106    Isr: 108,106
        Topic: yinzhengjie-kafka001    Partition: 1    Leader: 106    Replicas: 106,107    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 2    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 3    Leader: 108    Replicas: 108,107    Isr: 108,107
        Topic: yinzhengjie-kafka001    Partition: 4    Leader: 106    Replicas: 106,108    Isr: 106,108
    [root@node106.yinzhengjie.org.cn ~]# 

    5>.修改kafka的分区数并添加对应的配置信息

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --alter --topic yinzhengjie-kafka001 --partitions 10  --config max.message.bytes=10000000
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic "yinzhengjie-kafka001".                                           
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001
    Topic:yinzhengjie-kafka001    PartitionCount:10    ReplicationFactor:2    Configs:max.message.bytes=10000000        #注意,这里的配置就是咱们刚刚使用--config参数自定义的
        Topic: yinzhengjie-kafka001    Partition: 0    Leader: 108    Replicas: 108,106    Isr: 108,106
        Topic: yinzhengjie-kafka001    Partition: 1    Leader: 106    Replicas: 106,107    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 2    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 3    Leader: 108    Replicas: 108,107    Isr: 108,107
        Topic: yinzhengjie-kafka001    Partition: 4    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 5    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 6    Leader: 108    Replicas: 108,107    Isr: 108,107
        Topic: yinzhengjie-kafka001    Partition: 7    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 8    Leader: 107    Replicas: 107,106    Isr: 107,106
        Topic: yinzhengjie-kafka001    Partition: 9    Leader: 108    Replicas: 108,106    Isr: 108,106              #很显然,分区数由之前的5个变为10个啦!
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# 

    6>.为topic删除配置信息

    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001
    Topic:yinzhengjie-kafka001    PartitionCount:10    ReplicationFactor:2    Configs:max.message.bytes=10000000
        Topic: yinzhengjie-kafka001    Partition: 0    Leader: 108    Replicas: 108,106    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 1    Leader: 106    Replicas: 106,107    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 2    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 3    Leader: 108    Replicas: 108,107    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 4    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 5    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 6    Leader: 108    Replicas: 108,107    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 7    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 8    Leader: 107    Replicas: 107,106    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 9    Leader: 108    Replicas: 108,106    Isr: 106,108
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --alter --topic yinzhengjie-kafka001 --delete-config max.message.bytes    #删除对应的配置信息
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic "yinzhengjie-kafka001".
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-topics.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01 --describe --topic yinzhengjie-kafka001
    Topic:yinzhengjie-kafka001    PartitionCount:10    ReplicationFactor:2    Configs:                  #我们发现这里的配置信息被删除啦!
        Topic: yinzhengjie-kafka001    Partition: 0    Leader: 108    Replicas: 108,106    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 1    Leader: 106    Replicas: 106,107    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 2    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 3    Leader: 108    Replicas: 108,107    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 4    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 5    Leader: 107    Replicas: 107,108    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 6    Leader: 108    Replicas: 108,107    Isr: 107,108
        Topic: yinzhengjie-kafka001    Partition: 7    Leader: 106    Replicas: 106,108    Isr: 106,108
        Topic: yinzhengjie-kafka001    Partition: 8    Leader: 107    Replicas: 107,106    Isr: 106,107
        Topic: yinzhengjie-kafka001    Partition: 9    Leader: 108    Replicas: 108,106    Isr: 106,108
    [root@node106.yinzhengjie.org.cn ~]# 

    三.kafka功能测试

    1>.启动生产者

    [root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh --broker-list node108.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka2019
    尹正杰到此一游!
    https://www.cnblogs.com/yinzhengjie/

    [root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh 
    Read data from standard input and publish it to Kafka.
    Option                                   Description                            
    ------                                   -----------                            
    --batch-size <Integer: size>             Number of messages to send in a single 
                                               batch if they are not being sent     
                                               synchronously. (default: 200)        
    --broker-list <String: broker-list>      REQUIRED: The broker list string in    
                                               the form HOST1:PORT1,HOST2:PORT2.    
    --compression-codec [String:             The compression codec: either 'none',  
      compression-codec]                       'gzip', 'snappy', or 'lz4'.If        
                                               specified without value, then it     
                                               defaults to 'gzip'                   
    --key-serializer <String:                The class name of the message encoder  
      encoder_class>                           implementation to use for            
                                               serializing keys. (default: kafka.   
                                               serializer.DefaultEncoder)           
    --line-reader <String: reader_class>     The class name of the class to use for 
                                               reading lines from standard in. By   
                                               default each line is read as a       
                                               separate message. (default: kafka.   
                                               tools.                               
                                               ConsoleProducer$LineMessageReader)   
    --max-block-ms <Long: max block on       The max time that the producer will    
      send>                                    block for during a send request      
                                               (default: 60000)                     
    --max-memory-bytes <Long: total memory   The total memory used by the producer  
      in bytes>                                to buffer records waiting to be sent 
                                               to the server. (default: 33554432)   
    --max-partition-memory-bytes <Long:      The buffer size allocated for a        
      memory in bytes per partition>           partition. When records are received 
                                               which are smaller than this size the 
                                               producer will attempt to             
                                               optimistically group them together   
                                               until this size is reached.          
                                               (default: 16384)                     
    --message-send-max-retries <Integer>     Brokers can fail receiving the message 
                                               for multiple reasons, and being      
                                               unavailable transiently is just one  
                                               of them. This property specifies the 
                                               number of retires before the         
                                               producer give up and drop this       
                                               message. (default: 3)                
    --metadata-expiry-ms <Long: metadata     The period of time in milliseconds     
      expiration interval>                     after which we force a refresh of    
                                               metadata even if we haven't seen any 
                                               leadership changes. (default: 300000)
    --old-producer                           Use the old producer implementation.   
    --producer-property <String:             A mechanism to pass user-defined       
      producer_prop>                           properties in the form key=value to  
                                               the producer.                        
    --producer.config <String: config file>  Producer config properties file. Note  
                                               that [producer-property] takes       
                                               precedence over this config.         
    --property <String: prop>                A mechanism to pass user-defined       
                                               properties in the form key=value to  
                                               the message reader. This allows      
                                               custom configuration for a user-     
                                               defined message reader.              
    --queue-enqueuetimeout-ms <Integer:      Timeout for event enqueue (default:    
      queue enqueuetimeout ms>                 2147483647)                          
    --queue-size <Integer: queue_size>       If set and the producer is running in  
                                               asynchronous mode, this gives the    
                                               maximum amount of  messages will     
                                               queue awaiting sufficient batch      
                                               size. (default: 10000)               
    --request-required-acks <String:         The required acks of the producer      
      request required acks>                   requests (default: 1)                
    --request-timeout-ms <Integer: request   The ack timeout of the producer        
      timeout ms>                              requests. Value must be non-negative 
                                               and non-zero (default: 1500)         
    --retry-backoff-ms <Integer>             Before each retry, the producer        
                                               refreshes the metadata of relevant   
                                               topics. Since leader election takes  
                                               a bit of time, this property         
                                               specifies the amount of time that    
                                               the producer waits before refreshing 
                                               the metadata. (default: 100)         
    --socket-buffer-size <Integer: size>     The size of the tcp RECV size.         
                                               (default: 102400)                    
    --sync                                   If set message send requests to the    
                                               brokers are synchronously, one at a  
                                               time as they arrive.                 
    --timeout <Integer: timeout_ms>          If set and the producer is running in  
                                               asynchronous mode, this gives the    
                                               maximum amount of time a message     
                                               will queue awaiting sufficient batch 
                                               size. The value is given in ms.      
                                               (default: 1000)                      
    --topic <String: topic>                  REQUIRED: The topic id to produce      
                                               messages to.                         
    --value-serializer <String:              The class name of the message encoder  
      encoder_class>                           implementation to use for            
                                               serializing values. (default: kafka. 
                                               serializer.DefaultEncoder)           
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-console-producer.sh     #生产者脚本帮助信息

    2>.启动消费者

    [root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh --bootstrap-server node107.yinzhengjie.org.cn:9092 --topic yinzhengjie-kafka2019 
    尹正杰到此一游!
    https://www.cnblogs.com/yinzhengjie/
    [root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh 
    The console consumer is a tool that reads data from Kafka and outputs it to standard output.
    Option                                   Description                            
    ------                                   -----------                            
    --blacklist <String: blacklist>          Blacklist of topics to exclude from    
                                               consumption.                         
    --bootstrap-server <String: server to    REQUIRED (unless old consumer is       
      connect to>                              used): The server to connect to.     
    --consumer-property <String:             A mechanism to pass user-defined       
      consumer_prop>                           properties in the form key=value to  
                                               the consumer.                        
    --consumer.config <String: config file>  Consumer config properties file. Note  
                                               that [consumer-property] takes       
                                               precedence over this config.         
    --csv-reporter-enabled                   If set, the CSV metrics reporter will  
                                               be enabled                           
    --delete-consumer-offsets                If specified, the consumer path in     
                                               zookeeper is deleted when starting up
    --enable-systest-events                  Log lifecycle events of the consumer   
                                               in addition to logging consumed      
                                               messages. (This is specific for      
                                               system tests.)                       
    --formatter <String: class>              The name of a class to use for         
                                               formatting kafka messages for        
                                               display. (default: kafka.tools.      
                                               DefaultMessageFormatter)             
    --from-beginning                         If the consumer does not already have  
                                               an established offset to consume     
                                               from, start with the earliest        
                                               message present in the log rather    
                                               than the latest message.             
    --key-deserializer <String:                                                     
      deserializer for key>                                                         
    --max-messages <Integer: num_messages>   The maximum number of messages to      
                                               consume before exiting. If not set,  
                                               consumption is continual.            
    --metrics-dir <String: metrics           If csv-reporter-enable is set, and     
      directory>                               this parameter isset, the csv        
                                               metrics will be outputed here        
    --new-consumer                           Use the new consumer implementation.   
                                               This is the default.                 
    --offset <String: consume offset>        The offset id to consume from (a non-  
                                               negative number), or 'earliest'      
                                               which means from beginning, or       
                                               'latest' which means from end        
                                               (default: latest)                    
    --partition <Integer: partition>         The partition to consume from.         
    --property <String: prop>                The properties to initialize the       
                                               message formatter.                   
    --skip-message-on-error                  If there is an error when processing a 
                                               message, skip it instead of halt.    
    --timeout-ms <Integer: timeout_ms>       If specified, exit if no message is    
                                               available for consumption for the    
                                               specified interval.                  
    --topic <String: topic>                  The topic id to consume on.            
    --value-deserializer <String:                                                   
      deserializer for values>                                                      
    --whitelist <String: whitelist>          Whitelist of topics to include for     
                                               consumption.                         
    --zookeeper <String: urls>               REQUIRED (only when using old          
                                               consumer): The connection string for 
                                               the zookeeper connection in the form 
                                               host:port. Multiple URLS can be      
                                               given to allow fail-over.            
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-console-consumer.sh      #消费者脚本帮助信息

    四.其他操作

    1>.Leadership均衡

      当broker关闭或故障时,该节点是leader的所有parition把leader迁移到其他副本的服务器上去。这意味着当broker重启后,其所有partition将只是follower,即不会被任何客户端用来读写。
    
      为避免这种不均衡,kafka有一个倾向副本(prefered replica)的概念。如果某个分区的列表是:108,106,107。则108号节点比106和107更倾向于leader,因为它排在最前面。
      
      可以通过“kafka-preferred-replica-election.sh”让kafka集群尝试让倾向副本重新成为leader。该命令的具体用法下面我给处理具体的执行案例。
    
      频繁允许该命令很无聊,你可以配置一下选项让kafka自动执行"auto.leader.rebalance.enable=true",很显然我们是不需要配置该值的,因为它默认就是true哟~
    [root@node106.yinzhengjie.org.cn ~]# kafka-preferred-replica-election.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01
    Created preferred replica election path with {"version":1,"partitions":[{"topic":"yinzhengjie-kafka2019","partition":5},{"topic":"__consumer_offsets","partition":34},{"topic":"__consumer_offsets","partition":36},{"topic":"__c
    onsumer_offsets","partition":27},{"topic":"yinzhengjie-kafka2019","partition":4},{"topic":"yinzhengjie-kafka","partition":15},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":42},{"topic":"yinzhengjie-kafka","partition":6},{"topic":"__consumer_offsets","partition":49},{"topic":"yinzhengjie-kafka2019","partition":11},{"topic":"test","partition":2},{"topic":"__consumer_offsets","partition":4},{"topic":"__consumer_offsets","partition":33},{"topic":"yinzhengjie-kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":13},{"topic":"__consumer_offsets","partition":14},{"topic":"__consumer_offsets","partition":46},{"topic":"__consumer_offsets","partition":24},{"topic":"__consumer_offsets","partition":28},{"topic":"yinzhengjie-kafka","partition":1},{"topic":"__consumer_offsets","partition":6},{"topic":"yinzhengjie-kafka2019","partition":7},{"topic":"yinzhengjie-kafka","partition":12},{"topic":"yinzhengjie-kafka","partition":16},{"topic":"__consumer_offsets","partition":37},{"topic":"yinzhengjie-kafka2019","partition":3},{"topic":"yinzhengjie-kafka001","partition":3},{"topic":"__consumer_offsets","partition":43},{"topic":"yinzhengjie-kafka001","partition":0},{"topic":"yinzhengjie-kafka","partition":11},{"topic":"test","partition":1},{"topic":"__consumer_offsets","partition":21},{"topic":"yinzhengjie-kafka2019","partition":0},{"topic":"yinzhengjie-kafka","partition":2},{"topic":"__consumer_offsets","partition":15},{"topic":"__consumer_offsets","partition":11},{"topic":"yinzhengjie-kafka","partition":0},{"topic":"yinzhengjie-kafka","partition":5},{"topic":"__consumer_offsets","partition":30},{"topic":"yinzhengjie-kafka001","partition":7},{"topic":"__consumer_offsets","partition":2},{"topic":"__consumer_offsets","partition":47},{"topic":"yinzhengjie-kafka2019","partition":17},{"topic":"__consumer_offsets","partition":25},{"topic":"__consumer_offsets","partition":29},{"topic":"kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":4},{"topic":"yinzhengjie-kafka2019","partition":16},{"topic":"yinzhengjie-kafka2019","partition":2},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":23},{"topic":"yinzhengjie-kafka2019","partition":12},{"topic":"yinzhengjie-kafka001","partition":8},{"topic":"test","partition":0},{"topic":"__consumer_offsets","partition":40},{"topic":"__consumer_offsets","partition":31},{"topic":"yinzhengjie-kafka001","partition":5},{"topic":"yinzhengjie-kafka001","partition":2},{"topic":"yinzhengjie-kafka","partition":8},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":16},{"topic":"kafka001","partition":1},{"topic":"__consumer_offsets","partition":38},{"topic":"yinzhengjie-kafka","partition":9},{"topic":"yinzhengjie-kafka","partition":13},{"topic":"__consumer_offsets","partition":44},{"topic":"__consumer_offsets","partition":10},{"topic":"yinzhengjie-kafka001","partition":1},{"topic":"__consumer_offsets","partition":3},{"topic":"yinzhengjie-kafka","partition":17},{"topic":"__consumer_offsets","partition":35},{"topic":"kafka001","partition":4},{"topic":"yinzhengjie-kafka2019","partition":1},{"topic":"yinzhengjie-kafka2019","partition":10},{"topic":"kafka001","partition":3},{"topic":"yinzhengjie-kafka2019","partition":6},{"topic":"yinzhengjie-kafka","partition":18},{"topic":"__consumer_offsets","partition":26},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":13},{"topic":"yinzhengjie-kafka","partition":10},{"topic":"__consumer_offsets","partition":17},{"topic":"yinzhengjie-kafka","partition":14},{"topic":"__consumer_offsets","partition":22},{"topic":"yinzhengjie-kafka2019","partition":8},{"topic":"yinzhengjie-kafka","partition":3},{"topic":"__consumer_offsets","partition":9},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":41},{"topic":"yinzhengjie-kafka2019","partition":18},{"topic":"yinzhengjie-kafka001","partition":6},{"topic":"yinzhengjie-kafka2019","partition":9},{"topic":"__consumer_offsets","partition":48},{"topic":"yinzhengjie-kafka2019","partition":14},{"topic":"yinzhengjie-kafka","partition":7},{"topic":"__consumer_offsets","partition":18},{"topic":"kafka001","partition":0},{"topic":"__consumer_offsets","partition":32},{"topic":"yinzhengjie-kafka2019","partition":15},{"topic":"yinzhengjie-kafka","partition":19},{"topic":"yinzhengjie-kafka2019","partition":19},{"topic":"__consumer_offsets","partition":12},{"topic":"yinzhengjie-kafka001","partition":9},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":5}]}Successfully started preferred replica election for partitions Set([yinzhengjie-kafka,6], [__consumer_offsets,32], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44], [yinzhengjie-kafka001,1], [yinzheng
    jie-kafka2019,15], [test,0], [__consumer_offsets,28], [yinzhengjie-kafka,8], [yinzhengjie-kafka,3], [kafka001,1], [yinzhengjie-kafka2019,12], [__consumer_offsets,17], [yinzhengjie-kafka,5], [yinzhengjie-kafka2019,6], [__consumer_offsets,23], [yinzhengjie-kafka,19], [__consumer_offsets,7], [yinzhengjie-kafka2019,10], [__consumer_offsets,4], [yinzhengjie-kafka,12], [__consumer_offsets,29], [yinzhengjie-kafka2019,19], [__consumer_offsets,35], [yinzhengjie-kafka,17], [__consumer_offsets,3], [__consumer_offsets,24], [__consumer_offsets,41], [kafka001,2], [__consumer_offsets,0], [kafka001,4], [__consumer_offsets,38], [yinzhengjie-kafka2019,11], [yinzhengjie-kafka001,7], [__consumer_offsets,13], [yinzhengjie-kafka2019,3], [__consumer_offsets,8], [yinzhengjie-kafka2019,17], [yinzhengjie-kafka2019,16], [__consumer_offsets,5], [yinzhengjie-kafka2019,13], [test,2], [__consumer_offsets,39], [__consumer_offsets,36], [yinzhengjie-kafka,11], [yinzhengjie-kafka,13], [__consumer_offsets,40], [yinzhengjie-kafka2019,4], [yinzhengjie-kafka001,3], [__consumer_offsets,45], [__consumer_offsets,15], [__consumer_offsets,33], [yinzhengjie-kafka,0], [__consumer_offsets,37], [yinzhengjie-kafka2019,18], [yinzhengjie-kafka001,8], [__consumer_offsets,21], [yinzhengjie-kafka001,9], [yinzhengjie-kafka,15], [yinzhengjie-kafka,18], [yinzhengjie-kafka,9], [yinzhengjie-kafka2019,2], [test,1], [__consumer_offsets,6], [yinzhengjie-kafka2019,5], [yinzhengjie-kafka,2], [yinzhengjie-kafka,7], [__consumer_offsets,11], [__consumer_offsets,20], [yinzhengjie-kafka001,0], [__consumer_offsets,47], [kafka001,3], [__consumer_offsets,2], [__consumer_offsets,27], [yinzhengjie-kafka001,5], [yinzhengjie-kafka2019,8], [__consumer_offsets,34], [__consumer_offsets,9], [__consumer_offsets,22], [__consumer_offsets,42], [__consumer_offsets,14], [yinzhengjie-kafka001,2], [__consumer_offsets,25], [yinzhengjie-kafka,10], [yinzhengjie-kafka001,6], [__consumer_offsets,10], [yinzhengjie-kafka001,4], [__consumer_offsets,48], [__consumer_offsets,31], [__consumer_offsets,18], [__consumer_offsets,19], [yinzhengjie-kafka2019,1], [__consumer_offsets,12], [yinzhengjie-kafka,4], [yinzhengjie-kafka2019,7], [yinzhengjie-kafka,1], [yinzhengjie-kafka2019,14], [kafka001,0], [yinzhengjie-kafka,16], [__consumer_offsets,46], [__consumer_offsets,43], [__consumer_offsets,1], [yinzhengjie-kafka2019,0], [yinzhengjie-kafka,14], [__consumer_offsets,26], [yinzhengjie-kafka2019,9], [__consumer_offsets,30])[root@node106.yinzhengjie.org.cn ~]# 
    [root@node106.yinzhengjie.org.cn ~]# kafka-preferred-replica-election.sh --zookeeper node108.yinzhengjie.org.cn:2181/kafka01

    2>.集群间数据镜像

      不同Kafka集群间数据的复制称为“镜像”。Kafka内置的跨集群复制工具叫作MirrorMaker。该工具从源集群读取数据并写入到目标集群。典型的用例有旧数据迁移,跨数据中心备份。
    
      可以运行多个镜像进程,以增加吞吐量和容错。
    
      数据会从源集群的topic中读取,以同样的topic写入目标集群。事实上MirrorMaker相当于一个Kafka consumer和producer串接。

      源和目标集群是完全独立的:它们可以有不同的分区数和offset。因此镜像集群不是源集群中的一种容错机制,因为consumer和offset是不一样的。

      集群的容错建议使用集群内的副本机制。但MirrorMaker会保持并使用分区键,因此在键层面是保序的。

      MirrorMaker是高度配置的。首先,它使用了一个生产者和多个消费者,所以产生和消费者的相关配置参数都可以用于配置MirrorMaker。另外,MirrorMaker本身也有一些配置参数,这些配置参数之间有时候会比较复杂的依赖关系。

      先来看一个MirrorMaker的例子,我们着重说明一些重要的配置参数:

          [root@node106.yinzhengjie.org.cn ~]# kafka-mirror-maker.sh --consumer.config /home/softwares/kafka_2.11-0.10.2.1/config/consumer.properties --producer.config /home/softwares/kafka_2.11-0.10.2.1/config/producer.properties --new.consumer -num.streams=2 --whitelist ".*"

      consumer.config:
        改参数用于指定消费者的配置文件。所有的消费者都将共用这个配置,也就是说,只能配置一个源集群和一个group.id。所有的消费者属于同一个消费者群组,这正好与我们的要求不谋而合。配置文件里面有两个必选的参数:bootstrap.servers(源集群的服务器地址)和group.id。除了这两个参数外,还可以为消费者指定其他任意的配置参数。auto.commit.enable参数一般不需要修改,用默认值false就行。MirrorMaker会在消息安全到达目标集群之后提交偏移量,所以不能使用自动提交。如果修改了这个参数,也就是说,MirrorMaker只会对那些在MirrorMaker启动之后到达源集群的数据进行镜像。如果想要之前的数据,需要把该参数设置为earliest。
      producer.config:
        改参数用于指定生产者的配置文件。配置文件里唯一必须的参数是bootstrap.servers(目标集群的服务器地址)。
      new.consumer:
        MirrorMaker只能使用0.8版本或者0.9版本的消费者,建议使用0.9版本的消费者。因为它更加稳定。
      num.strams:
        之前已经解释过,一个流就是一个消费者。所有的消费者共用一个生产者,MirrorMaker将会使用这些流来填充同一个生产者。如果需要额外的吞吐量,就需要创建另一个MirrorMaker进程。
      witelist:
        这是一个正则表达式,代表了需要进行镜像的主题名字。所有表达式匹配的主题都将被镜像。在这个例子里,我们希望镜像所有的topic,不过在实际当中最好使用类似“ad.*
    ”这样的表达式,避免镜像测试用的主题。 当然我们也可以","或者"|"来指定多个topic名称,比如:"data.ad|adta.op|data.er",如果只有部分topic不要采集,可以使用--blacklist参数。

    3>.集群退役和服役(建议先安装kafka-manager或者Kafka Eagle先关监控工具

      将新节点加入Kafka集群很容易,只需要指定一个唯一的broker id并启动Kafka进程即可。但新服务器不会自动获取任何分区,除非将现有分区移动到新服务器上。否则直到创建新的topic,该服务器都不会起作用。

      分区重分布工具不能为退役节点自动生成重分布计划,但可以将退役节点容纳的paritition分配到其他节点即可,这可以通过自动生成计划实现,也可以手动编写计划,只不过略有一点麻烦。      kafka提供了一个分区重分布工具kafka-reassign-partitions.sh可以完成上述工作。该工具可以自动生成重分布计划,也可以指手工指定,但该工具不能自动检测到分区的分布的不均衡,因此仍然需要管理员触发该工具。

      该工具可以运行于三种互斥的模式下:
        (1)generate :
            该模式下,指定一些topic和一些broker,工具可以自动生成重分布计划以将这些topic分布到这些broker上去。这种模式提供了一种建议的方法来生成重分布计划(但需要谨慎使用,可能会造成很大的数据移动)。
        (2)execcute:
            该模式下,工具使用用户提供的重新分布计划来执行充分布。该重分布几乎可以是手工编辑的,也可以是"--generate"模式生成的。
        (3)verify:
            该模式下,工具检验上一次"--execute"执行的重分布几乎的状态。状态可能是successfully complated,failed或in progress。

      详情请参考:https://www.cnblogs.com/yinzhengjie/p/9808125.html

    4>.Kafka Offset相关命令总结

      详情请参考:https://www.cnblogs.com/yinzhengjie/p/10514206.html

    5>.设置配额(所谓配额就是客户端进行读写的流量上线)

      可以在broker配置层面上设置配额,会对所有客户端生效。默认所有客户端都是用无限制的配额。以下配置可将producer和consumer的配额设为10MB/s:
        quta.producer.default=10486750
        quta.consumer.default=10485760
      也可以对某客户端单独设置配额(用的少,了解即可):   
        [root@node106.yinzhengjie.org.cn ~]# kafka-configs.sh --zookeeper node108.yinzhengjie.org.cn:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name ClientA --entity-type clients

    6>.kafka重要监控指标(摘自《Kafka权威指南》中的"broker的度量指标") 

      Kafka使用JMX输出server和client的各项指标,可以使用jconsole等工具展示指标。这里我们只罗列server端可以看到的各项指标及其JMX MBean名称。我们会去到对应的度量指标后,可以使用zabbix的java gateway插件来监控kafka,当然也可以使用open falcon之类的工具进行监控。

  • 相关阅读:
    关于字符串循环遍历的两种方法
    Think PHP 6 .0 学习笔记
    微信小程序当前页面标题设置
    wx.showActionSheet() 选择菜单
    wx.showLoading() 加载框
    wx.showModal() 相当于 JS中的 confirm()
    wx.showToast() 显示消息提示框
    微信小程序 getLauchOprionsSync()
    怎样查看一个网站由那些服务器提供服务
    通过 bootstrap 创建好看的单选框复选框
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9210029.html
Copyright © 2011-2022 走看看