概念
Kafka 是一个快速、可扩展和高可用的基于发布-订阅模式(pub-sub model)的消息系统,用作消息中间件,在系统之间传递消息。其核心概念有:
- Topic(话题)
- Producer(生产者)
- Consumer(消费者)
- Broker(经纪人)
在 Kafka 中,所有的消息都由 Topic 来组织,Producer 把消息发送到特定的 Topic,Consumer 从特定的 Topic 中读取消息。作为一个分布式系统,Kafka 运行在集群中,集群中的每个节点称之为 Broker。
快速开始
安装
下载 kafka_2.11-2.1.0.tgz
,也可以使用命令 wget "http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz"
下载,注意 kafka 中已经包含了 zookeeper。执行以下命令解压并切换工作目录:
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
启动
# 启动 zookeeper,默认监听端口 2181
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 kafka,默认监听端口 9092
bin/kafka-server-start.sh config/server.properties
# 启动 zookeeper & kafka,以守护进程方式
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
停止
# 停止 zookeeper
bin/zookeeper-server-stop.sh config/zookeeper.properties
# 停止 kafka
ps aux | grep kafka | grep -v grep | awk '{print $2}'
kill [pid]
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
查看 Topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看指定 Topic 的信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic a_topic
修改指定 Topic 的 partition
bin/kafka-topics.sh --alter --topic STOP_JOB --zookeeper localhost:2181 --partitions 3
查看所有消费组
# 新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 172.16.0.4:9092 --list
# 旧版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
查看消费组偏移量
# 新版
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --describe --group console-consumer-85731
# 旧版
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-11967 --describe
重设偏移量(new)
# 指定偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --group ABOT_MASTER --topic JOB_LEGACY --reset-offsets --to-offset 29261788 –execute
./kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_SLAVE --all-topics --by-duration PT06H0M0S --execute
./kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_MASTER --all-topics --by-duration PT06H0M0S --execute
#最新偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --group ABOT_MASTER --topic JOB_LEGACY --reset-offsets --to-latest –execute
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_MASTER --all-topics --to-latest --execute
bin/kafka-consumer-groups.sh --bootstrap-server 172.16.0.4:9092 --reset-offsets --group ABOT_SLAVE --all-topics --to-latest --execute
进一步学习
Kafka Client 消息接收的三种模式
模式 | 特点 | 设置 |
---|---|---|
At-most-once(最多一次) | 消息最多被消费一次 | 让 Kafka 在特定时间间隔内自动提交,如网络中断恢复或程序重启可能会使消息未被处理完就被提交了。设置 enable.auto.commit 为 true,设置 auto.commit.interval.ms 为一个较小的时间间隔,不用手动调用 commitSync() 。 |
At-least-once(最少一次) | 消息至少被消费一次 | 手动提交,如提交失败,则下次重复推送。设置 enable.auto.commit 为 false,然后调用 commitSync() 。 |
Exactly-once(正好一次) | 消息有且只有被消费一次 | 保证消息处理和提交反馈在同一个事务中(ACID,原子性、一致性、隔离性和持久性)。设置 enable.auto.commit 为 false,保存 ConsumeRecord 中的 offset 到数据库,实现 ConsumerRebalanceListener ,监听 Consumer Rebalance 事件,然后使用seek 方法将数据库的 offset 更新到 Kafka。 |
Consumer Group 与 Partition
- 按照如上的算法,所以如果kafka的消费组需要增加组员,最多增加到和partition数量一致,超过的组员只会占用资源,而不起作用。
- kafka的partition的个数一定要大于消费组组员的个数,并且partition的个数对于消费组组员取模一定要为0,不然有些消费者会占用资源却不起作用。
3.如果需要增加消费组的组员个数,那么也需要根据上面的算法,调整partition的个数。
作者:ens
链接:https://juejin.im/post/5baca032e51d450e735e74af
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
$ /bin/kafka-topics.sh --alter --topic A_TASK --zookeeper localhost:2181 --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
监控
kafka 版本要求 <=1.1.0 ( wget "https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz" )
可以通过 Morningstar/kafka-offset-monitor 监控 Kafka 偏移量,具体是
-
下载 Jar
-
运行
java -Djava.security.auth.login.config=conf/server-client-jaas.conf -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 --kafkaSecurityProtocol SASL_PLAINTEXT --zk zkserver01,zkserver02 --port 8081 --refresh 10.seconds --retain 2.days --dbName offsetapp_kafka
-
通过 http://{{HOST}}:9090 打开 web 页面监控。
排错
kafka 占用磁盘过多
设置 log.retention.hours=6
,只保存日志 6 小时,然后重启 kafka,这样之前保留的日志会被自动清除。
Connection to node -1 could not be established. Broker may not be available.
- 检查连接的 kafka 集群地址是否正确;
- 检查 kafka 集群收发消息是否正常;
- 检查 kafka 配置文件(server.properties)是否正确。
默认配置如下:
listeners=PLAINTEXT://:9092
如果被修改为错误地址那么将无法连接。
ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
一个线程只能有一个 Consumer,如果多个线程共用一个 Consumer,那么就会出现这个错误。
解决方案就是去解决线程问题,确保只有一个线程调用一个 Consumer。
Kafka 消息不按顺序消费
背景:查看消息日志发现原本按顺序发送的消息 1,2,3
被消费的时候变成 1,3,2
了。
原因:Kafka 不保证全局的消息有序性,但保证同一个 Partition 消息的有序性。
解决方案:发消息时设置同一个 Key 或者直接指定同一个 Partition 即可。
Kafka 日志过多问题
调整日志级别为 info,在 logback.xml 文件中写入
<logger name="org.apache.kafka" level="INFO" />