之前也学习过消息队列,但一直没有使用的场景,今天项目中遇到了 kafka 那便有了应用场景
1. Kafka
Kafka 是一个分布式、支持分区,多副本的基于 zookeeper 的消息队列。使用消息队列,是应用 A 将要处理的信息发送到消息队列然后继续下面的任务,需要该信息的应用 B 从消息队列里面获取信息再做处理,这样做像是多此一举,应用 A 直接发信息给应用 B 不就可以了吗?存在即合理,使用消息队列其作用如下:
- 异步处理:用户注册后发送邮件、短信、验证码等可以异步处理,使注册这个过程写入数据库后就可立即返回
- 流量消峰:秒杀活动超过阈值的请求丢弃转向错误页面,然后根据消息队列的消息做业务处理
- 日志处理:可以将error的日志单独给消息队列进行持久化处理
- 应用解耦:购物的下单操作,订单系统与库存系统中间加消息队列,使二者解耦,若后者故障也不会导致消息丢失
之前 笔者也写过 RabbitMQ 的笔记,传送门
2. 生产消费模型
结合 kafka 的下面这些名词来解释其模型会更加容易理解
名称 | 解释 |
---|---|
Broker | kafka 的实例,部署多台 kafka 就是有多个 broker |
Topic | 消息订阅的话题,是这些消息的分类,类似于消息订阅的频道 |
Producer | 生产者,负责往 kafka 发送消息 |
Consumer | 消费者,从 kafka 读取消息来进行消费 |
3. 安装部署
kafka 和依赖的 zookeeper 是 java 编写的工具,其需要 jdk8 及其以上。笔者这里使用 Docker 安装,偷懒了贪图方便快捷
# 使用 wurstmeister 制作的镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 启动 zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
# 单机启动 kafka
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=xxx.xxx.xxx.xxx:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx.xxx.xxx.xxx:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
4. Quickstart
kafka 官网也有很好的介绍,quickstart
# 进入kafka容器
docker exec -it kafka /bin/sh
# 进入 bin 目录
cd /opt/kafka_2.13-2.8.1/bin
# partitions 分区
# replication 副本因子
# 创建一个主题(参数不懂可直接填写,后面会讲解说明)
./kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092
# 查看
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
# 写入 topic(回车表示一条消息,ctrl + c 结束输入)
# 消息默认存储 7 天,下一步的消费可以验证
./kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
# 读取 topic(运行多次可以读取消息,因为默认存储 7 天)
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
./kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
5. SpringBoot 集成
SpringBoot 集成了 Kafka,添加依赖后可使用内置的 KafkaTemplate 模板方法来操作 kafka 消息队列
5.1 添加依赖
<!-- sprinboot版本管理中有kafka可不写版本号 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
5.2 配置文件
server:
port: 8080
spring:
# 消息队列
kafka:
producer:
# broker地址,重试次数,确认接收个数,消息的编解码方式
bootstrap-servers: 101.200.197.22:9092
retries: 3
acks: 1
key-serializer: org.springframework.kafka.support.serializer.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.StringSerializer
consumer:
# broker地址,自动提交,分区offset设置
bootstrap-servers: 101.200.197.22:9092
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
5.3 生产者
@RestController
@RequestMapping("/kafka")
public class Producer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/producer1")
public String sendMessage1(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
SendResult<String, Object> sendResult = future.get();
return sendResult.toString();
}
@GetMapping("/producer2")
public String sendMessage2(@RequestParam(value = "message", defaultValue = "123") String message) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topic1", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("faile");
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("success");
}
});
return "";
}
}
5.4 消费者
@Component
public class Consumer {
@KafkaListener(topics = {"topic1"})
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
6. 存储目录结构
kafka
|____kafka-logs
|____topic1
| |____00000000000000000000.log(存储接收的消息)
| |____consumer_offsets-01(消费者偏移量)
| |____consumer_offsets-02
|____topic2
|____00000000000000000000.log
|____consumer_offsets-01
|____consumer_offsets-02
每台 broker 实例接收到消息后将之存储到 00000.log 里面,保存的方式是先入先出。消息被消费后不会被删除,相反可以设置 topic 的消息保留时间,重要的是 Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的
消费者会将自己消费偏移量 offset 提交给 topic 在 _consumer_offsets 里面保存,然后通过偏移量来确定消息的位置,默认从上次消费的位置开始,添加参数 --frombeginning 则从头开始消费,可获取之前所有存储的消息。kafka 也会定期清除内部的消息,直到保存最新的一条(文件保存的消息默认保存 7 天)
7. 消费组
这个在笔者配置消费者的时候发现的问题,启动时报错说没有指定消费组
- 每条分区消息只能被同组的一个消费者消费,consumer1 和 consumer2 同组,所以只有其中一个能消费同条消息
- 每条分区消息能被不同组的单个消费者消费,consumer2 和 consumer4 不同组,所以都能消费同条消息
- 以上二个规则同时成立
- 其作用是可以保证消费顺序,同个分区里的消息会被同个消费者顺序消费
8. 分区和副本
topic 消息保存的文件 0000.log 可以进行物理切分,这就是分区的概念,类似于数据库的分库分表。这样做的好处在于单个保存的文件不会太大从而影响性能,最重要的是分区后不是单个文件串行执行了,而是多区多文件可并行执行提高了并发能力
分区:消费者会消费同一 topic 的不同分区,所以会保存不同分区的偏移量,其格式为:GroupId + topic + 分区号
副本:副本是对分区的备份,集群中不同的分区在不同的 broker 上,但副本会对该分区备份到指定数量的 broker 上,这些副本有 leader 和 follower 的区别,leader负责读写,挂了再重新选举,副本为了保持数据一致性
9. 常见问题
9.1 生产者同步和异步消息
生产者发送消息给 broker,之后 broker 会响应 ack 给生产者,生产者等待接收 ack 信号 3 秒,超时则重试 3 次
生产者 ack 确认配置:
- ack = 0:不需要同步消息
- ack = 1:则 leader 收到消息,并保存到本地 log 之后才响应 ack 信息
- ack 默认配置为 2
9.2 消费者自动提交和手动提交
- 自动提交:消费者 pull 消息之后马上将自身的偏移量提交到 broker 中,这个过程是自动的
- 手动提交:消费者 pull 消息时或之后,在代码里将偏移量提交到 broker
- 二者区别:防止消费者 pull 消息之后挂掉,在消息还没消费但又提交了偏移量
9.3 消息丢失和重复消费
- 消息丢失
- 生产者:配置 ack ,以及配置副本和分区数值一致
- 消费者:设置手动提交
- 重复消费
- 设置唯一主键,Mysql 主键唯一则插入失败
- 分布式锁
9.4 顺序消费方案
- 生产者:关闭重试,使用同步发送,成功了再发下一条
- 消费者:消息发送到一个分区中,只有一个消费组的消费者能接收消息