(一)pom中引入kafka依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
(二)在配置文件中加入与kafka相关的配置
spring: kafka: consumer: group-id: test111 enable-auto-commit: true auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: localhost:9092
(三)消费类,消费kafka
package com.vincent.config; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Component @Slf4j public class KafkaListeners { @KafkaListener(topics = {"test"}) public void roadConditionMessage(ConsumerRecord<?, ?> record) { try { Long startTime = System.currentTimeMillis(); log.info("消息开始接收,startTime:" + startTime); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); System.out.println(kafkaMessage.get()); } catch (Exception e) { log.error("异常信息", e); } } }
(四)总结
以上springboot集成kafka已经完成。