引入依赖
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.7.1</version> </dependency>
如果启动报错
Caused by: java.lang.NoClassDefFoundError: org/springframework/core/log/LogAccessor
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.<init>(KafkaListenerAnnotationBeanPostProcessor.java:148)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:172)
... 19 common frames omitted
就把指定版本去掉
配置文件yml
修改kafka连接地址 其他按需修改
#kafka的topic名称
kafkaTopic: topic-test
spring:
kafka:
bootstrap-servers: 192.168.1.12:9092 #kafka连接地址
producer:
acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 #批量大小
properties:
linger.ms: 0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
buffer-memory: 33554432 #生产端缓冲区大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: defaultConsumerGroup # 默认的消费组ID
enable-auto-commit: true # 是否自动提交offset
## 当kafka中没有初始offset或offset超出范围时将自动重置offset
## earliest:重置为分区中最小的offset;
## latest:重置为分区中最新的offset(消费分区中新产生的数据);
## none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-commit-interval:
ms: 1000
auto-offset-reset: latest
properties:
session.timeout.ms: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
request.timeout.ms: 180000 # 消费请求超时时间
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,自动创建,true时表示如果不存在启动报错
flyway:
connect-retries: 0 #重试次数
消费者:
KafkaConsumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author yvioo */ @Component public class KafkaConsumer { /** * 消费监听 * @param record */ @KafkaListener(topics = "${kafkaTopic}") public void onMessage(ConsumerRecord<?, ?> record){ System.out.println("收到消息:topic名称:"+record.topic()+",分区:"+record.partition()+",值:"+record.value()); } }
生产者
KafkaProducer.java
import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author 。 */ @RestController public class KafkaProducer { @Value("${kafkaTopic}") private String kafkaTopic; @Resource private KafkaTemplate<String, Object> kafkaTemplate; /** * 发送消息 * @param message */ @GetMapping("/send") public void sendMessage1(String message) { kafkaTemplate.send(kafkaTopic, message); } /** * 有发送结果回调 * @param message */ @GetMapping("/send/callback") public void sendMessage3(String message) { kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("fail:"+ex.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("success:topic名称:" + result.getRecordMetadata().topic() + ",分区:" + result.getRecordMetadata().partition() + ",消息在分区中的标识:" + result.getRecordMetadata().offset()); } }); } }