zoukankan      html  css  js  c++  java
  • kafka shell简单使用

    将kafka添加到环境变量中

    vim /etc/profile
    export KAFKA_HOME=/opt/iDataFusion/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    source /etc/profile

    创建topic:

    --create: 指定创建topic动作
    --topic:指定新建topic的名称
    --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
    --config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration
    --partitions:指定当前创建的kafka分区数量,默认为1个
    --replication-factor:指定每个分区的复制因子个数,默认1个
    
    kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic my-kafka-topic
    
    在创建Topic的时候,有两个参数是需要填写的,那就是partions和replication-factor。
    
    partions
      主题分区数。kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。
    消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。
    分区数越多,在一定程度上会提升消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
    如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机 I/O 这个时候对性能影响很大。所以一般来说 Kafka 不能有太多的 Partition。
    
    replication-factor
      用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。

    生产者操作:

    kafka-console-producer.sh --broker-list localhost:9092 --topic my-kafka-topic

    消费者操作:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-topic

    查看所有topic列表:

    kafka-topics.sh --zookeeper localhost:2181 --list

    查看指定topic信息:

    kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-kafka-topic

    控制台向topic生产数据:

    kafka-console-producer.sh --broker-list node86:9092 --topic my-kafka-topic

    控制台消费topic的数据:

    kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-kafka-topic --from-beginning
    增加topic分区数(Kafka分区数量只允许增加,不允许减少)
    为topic my-kafka-topic 增加10个分区
    kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-kafka-topic --partitions 10

    删除topic(确定后再谨慎使用)

    默认情况下Kafka的Topic是没法直接删除的,需要进行相关参数配置
    
    kafka-topics.sh --delete --topic test0 --zookeeper ocalhost:2181
    
    Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:
    方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
    方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

    查看topic消费进度

    kafka-consumer-groups.sh --bootstrap-server test1:9092 --list
    kafka-consumer-groups.sh --bootstrap-server test1:9092 --describe --group groupname
  • 相关阅读:
    网络编程
    网络编程
    网络编程
    Java——倒序输出Map集合
    java——assert(断言)方法
    Linux文件列表查询ll和ls区别
    Java——关键字和保留字
    Java——动态创建Class(不写入文件,直接从内存中创建class)
    web项目——启动时tomcat报错:Server Tomcat v7.0 Server at localhost failed to start.
    eclipse——管理远程资源的缓存,例如从Internet下载的资源。
  • 原文地址:https://www.cnblogs.com/alisapine/p/15090095.html
Copyright © 2011-2022 走看看