一.pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wj</groupId>
<artifactId>boot-rabbit</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-rabbit</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
二.application.yml
spring:
rabbitmq:
addresses: amqp://guest:guest@192.168.10.132:5672
virtual-host: /
connection-timeout: 10000
#生产端配置
publisher-confirm-type: CORRELATED #实现一个监听器用于监听Broker端给我们返回确认请求
publisher-returns: true #保证消息对Broker端是可达的
template:
mandatory: true #这个属性必须设置为true,才能保证监听有效
#消费端配置
listener:
simple:
#配置手工确认模式,用于ACK的手工处理
acknowledge-mode: manual
#配置消费端监听个数和最大个数用于控制消费端并发情况
concurrency: 5
max-concurrency: 10
三.配置类
@Configuration
public class MainConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("boot-exchange",true,false);
}
@Bean
public Queue queue(){
return new Queue("boot.queue",true) ;
}
@Bean
public Binding binding(){
return BindingBuilder.bind(this.queue()).to(topicExchange()).with("springboot.#");
}
}
四.生产端代码
生产者类:
@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
System.out.println(correlationData);
System.out.println(ack);
if(!ack){
System.out.println("异常处理中。。");
}
};
ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey) -> {
System.out.println(exchange +":"+routingKey);
System.out.println(replyCode+":"+replyText);
System.out.println(new String(message.getBody()));
};
public void send(Object message, Map<String,Object> properties){
MessageHeaders headers = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, headers);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//消息的全局唯一id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-exchange","springboot.rabbit",msg,correlationData);
}
}
测试代码:
@Autowired
private Producer producer;
@Test
public void testSend(){
Map<String, Object> map = new HashMap<>();
map.put("name","wj");
map.put("id","1234");
producer.send("hello spring boot amqp",map);
}
测试结果:

五.消费端代码
消费端监听注解@RabbitListener
@Component
public class Consumer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "boot.queue", durable = "true")
, exchange = @Exchange(
value = "boot-exchange"
,type = "topic"
,ignoreDeclarationExceptions = "true"
)
,key = "springboot.#"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel){
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println(message.getPayload());
}
}
六.发送实体类消息
注意:实体类需要实现序列化接口,否则报错
生产端:
public void sendUser(User user,Map<String,Object> properties){
//消息的全局唯一id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-exchange","springboot.rabbit",user,correlationData);
}
消费端:
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "boot.queue", durable = "true")
, exchange = @Exchange(
value = "boot-exchange"
,type = "topic"
,ignoreDeclarationExceptions = "true"
)
,key = "springboot.#"
)
)
@RabbitHandler
public void onMessage(@Payload User user, Channel channel, @Headers Map<String,Object> properties){
System.out.println(user);
System.out.println(properties.get(AmqpHeaders.DELIVERY_TAG));
}
测试:
@Test
public void sendUser(){
User user = new User("张三", 12);
producer.sendUser(user,null);
}