1、添加依赖
修改pom文件,添加spring-data-redis的依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.6</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.yas</groupId> <artifactId>RabbitMQBoot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQBoot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
2、修改配置文件,application.yml:
spring: redis: host: ubu port: 6379 password: 123456
3、修改生产者代码:
1 package com.yas.rabbitmqboot; 2 3 import org.junit.jupiter.api.Test; 4 import org.springframework.amqp.rabbit.connection.CorrelationData; 5 import org.springframework.amqp.rabbit.core.RabbitTemplate; 6 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.boot.test.context.SpringBootTest; 8 9 import java.io.IOException; 10 import java.util.UUID; 11 12 @SpringBootTest 13 class RabbitMqBootApplicationTests { 14 @Autowired 15 RabbitTemplate rabbitTemplate; 16 17 @Test 18 void contextLoads() throws IOException { 19 CorrelationData messageid = new CorrelationData(UUID.randomUUID().toString()); 20 rabbitTemplate.convertAndSend("boot-topic-exchange", "slow.red.dog", "慢红狗", messageid); 21 System.out.println("消息已生产"); 22 System.in.read(); 23 } 24 }
4、修改消费者代码:
1 package com.yas.rabbitmqboot.listen; 2 3 import com.rabbitmq.client.Channel; 4 import org.springframework.amqp.core.Message; 5 import org.springframework.amqp.rabbit.annotation.RabbitListener; 6 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.data.redis.core.StringRedisTemplate; 8 import org.springframework.stereotype.Component; 9 10 import java.io.IOException; 11 import java.util.concurrent.TimeUnit; 12 13 @Component 14 public class Consumer { 15 16 @Autowired 17 StringRedisTemplate redisTemplate; 18 19 @RabbitListener(queues = "boot-queue") 20 public void getMessage(String msg, Channel channel, Message message) throws IOException { 21 22 //0 获取messageid 23 String messageid = message.getMessageProperties().getHeader("spring_returned_message_correlation"); 24 25 //1 26 if (redisTemplate.opsForValue().setIfAbsent(messageid, "0", 10, TimeUnit.SECONDS)) { 27 //执行返回true,表示原来不存在这个值 28 System.out.println("接收到消息:" + msg); 29 //处理完毕 30 redisTemplate.opsForValue().set(messageid, "1"); 31 //手动ack 32 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 33 34 } else { 35 //执行返回false,表示原来存在这个值 36 if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageid))) { 37 //手动ack 38 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 39 }else{ 40 System.out.println("不需要做任何处理"); 41 } 42 } 43 44 45 } 46 }