一、springboot版本和依赖
- springboot 版本 2.1.5
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
- dependencies
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
二、生产端
1、yml 文件配置
server:
port: 8001
servlet:
context-path: /
spring:
rabbitmq:
addresses: ip:5672,ip:5672,ip:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
application:
name: rabbit-producer
http:
encoding:
charset: UTF-8
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: NON_NULL
rabbitmq-exchange: exchange-1
rabbitmq-routingKey: springboot.abc
- address server的ip地址加上端口号
- username passowrd 账号密码
- virtual-host 最上层的一个域 ,例如 /order /logistics
- 连接超时时间
- 生产端的相关配置
- publisher-confirms 是否开启消息确认模式,举例:生产者发送消息到 broker(mq) ,我不确定我的消息是否100%已经投递到mq 中,我们会进行线程监听,mq 会返回一个成功或者是失败的情况
- publisher-returns 是否开启发布者退货模式,举例:生产者发送routingkey: spring.xxx ,queue routingkey:为 springboot.xxx 。那么不匹配路由规则。publisher-returns 设置为false 的话,这条消息就丢掉了,消失了。设置为true的话,会将消息 执行到我们指定的一对 exchange 和 queue 上。 需要和 mandatory 一起使用
- mandatory 是否开启强制性消息
2、编写发送消息方法
package com.example.producer.component;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @Author: qiuj
* @Description:
* @Date: 2020-05-31 11:57
*/
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${rabbitmq-exchange}")
private String exchange;
@Value("${rabbitmq-routingKey}")
private String routingKey;
/**
* 这里就是确认消息的回调监听接口,用于确认消息是否被broker 所收到
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("唯一id:" + correlationData);
System.out.println("消息是否成功投递" + ack );
System.out.println("如果失败则会返回错误消息" + cause);
}
};
public void sends (Object msg, Map<String,Object> properties) {
// 第一步将消息包装成boot 支持的方式
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<?> message = MessageBuilder.createMessage(msg,messageHeaders);
// 指定唯一id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 回调方法
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
System.out.println("在发送消息之前的前置方法" + message);
return message;
}
};
// 前置方法
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend(exchange,
routingKey,
message,
messagePostProcessor,
correlationData);
}
}
三、消费端
1、yml 文件配置
server:
port: 8002
spring:
rabbitmq:
addresses: ip:5672,ip:5672,ip:5672
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 10
prefetch: 2
application:
name: rabbit-consumer
http:
encoding:
charset: UTF-8
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: NON_NULL
queue-name: queue-1
queue-durable: true
exchange-name: exchange-1
exchange-topic: topic
exchange-durable: true
exchange-ignoreDeclarationExceptions: true
routingkey: springboot.*
- 消费端的相关配置
- acknowledge-mode 默认auto ,也就是自动签收消息,生产环境不建议,我们设置为 manual 手工的进行签收
- concurrency max-concurrency 监听器调用线程的最小数量 和最大数量
- prefetch 在单个请求中处理的消息个数,开启限流,指定每次处理消息最多只能处理2条消息
2、编写接受消息方法
package com.example.consumer.component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Author: qiuj
* @Description:
* @Date: 2020-05-30 19:29
*/
@Component
public class RabbitReceive {
/**
* 组合使用监听
* @RabbitListener @QueueBinding @Queue @Exchange
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "${queue-name}",durable = "${queue-durable}"),
exchange = @Exchange(
value = "${exchange-name}",
type = "${exchange-topic}",
durable = "${exchange-durable}",
ignoreDeclarationExceptions = "${exchange-ignoreDeclarationExceptions}"
),
key = "${routingkey}"
))
@RabbitHandler
public void onMessage (Message message, Channel channel) throws IOException {
// 1:收到消息以后进行业务端处理
System.out.println("消费消息:" + message.getPayload());
// 2:处理成功之后 获取deliveryTag 并进行手工的ACK操作,因为我们配置文件里配置的是 手工签收
// spring.rabbitmq.listener.simple.acknowiedge-mode=manual
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag,false);
}
}