1、引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、application配置
spring.kafka.bootstrap-servers=192.168.1.107:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3、写消息
@Component public class Sender { @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void sendMessage(){ Message m = new Message(); m.setId(System.currentTimeMillis()); m.setMsg(UUID.randomUUID().toString()); m.setSendTime(new Date()); kafkaTemplate.send("test1", gson.toJson(m)); } }
4、读消息
@Component public class Receiver { private Gson gson = new GsonBuilder().create(); @KafkaListener(topics = "test1") public void processMessage(String content) { Message m = gson.fromJson(content, Message.class); System.out.println(m.getMsg()); } }