Kafka是一种分布式的,基于发布/订阅的消息系统。
Kafka的组成包括:
- Kafka将消息以topic为单位进行归纳。
- 将向Kafka topic发布消息的程序成为producers.
- 将预订topics并消费消息的程序成为consumer.
- Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
Kafka的下载:https://kafka.apache.org/downloads 下载最新版本
解压后修改配置(config/server.properties:broker.id、log.dirs)
vim config/server.properties
broker.id = 1
log.dirs = "日志目录地址"
启动服务:
- Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
bin/zookeeper-server-start.sh config/zookeeper.properties &
- 现在启动Kafka
bin/kafka-server-start.sh config/server.properties
- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 启动consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Springboot中整合Kafka:
pom文件中:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
application.yml中:
spring: # KAFKA kafka: # ָkafka服务器地址,可以指定多个 bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094 #=============== producer生产者配置 ======================= producer: retries: 0 # 每次批量发送消息的数量 batch-size: 16384 # 缓存容量 buffer-memory: 33554432 # ָ指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #=============== consumer消费者配置 ======================= consumer: #指定默认消费者的group id group-id: test-app #earliest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 #latest #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 #none #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest enable-auto-commit: true auto-commit-interval: 100ms #指定消费key和消息体的编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
定义一个生产者类:KafkaSender 负责消息推送:
@Component public class KafkaSender { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private final Logger logger = LoggerFactory.getLogger(KafkaSender.class); public void send(String topic, String taskid, String jsonStr) { //发送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override //推送成功 public void onSuccess(SendResult<String, Object> result) { logger.info(topic + " 生产者 发送消息成功:" + result.toString()); } @Override //推送失败 public void onFailure(Throwable ex) { logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage()); } }); } }
定义一个消费者类:KafkaCustomer 用来接收消息
@Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开 @KafkaListener(topics = {"testTopic"}) public void receive(ConsumerRecord<?, ?> record){ logger.info("消费得到的消息---key: " + record.key()); logger.info("消费得到的消息---value: " + record.value().toString()); } }