2.1 创建表结构
CREATE TABLE `infrastructure_mq_producer` (
`message_id` bigint(20) NOT NULL COMMENT '消息标识',
`content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
`type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型',
`status` int(2) NOT NULL DEFAULT '0' COMMENT '发送状态: 0:下发中;1:下发失败;2下发成功',
`try_count` int(3) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_retry_time` datetime DEFAULT NULL COMMENT '下一次执行时间',
`error_message_id` bigint(20) DEFAULT NULL COMMENT '错误消息内容id,关联 infrastructure_message表',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`message_id`),
UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,
KEY `idx_query` (`status`,`create_time`,`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ生产者记录';
CREATE TABLE `infrastructure_mq_consumer` (
`message_id` bigint(20) NOT NULL COMMENT '消息标识',
`content_id` bigint(20) NOT NULL COMMENT '消息内容id,关联 infrastructure_message表',
`type` varchar(4) NOT NULL DEFAULT '' COMMENT '消息类型',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`message_id`),
UNIQUE KEY `unq_message_id` (`message_id`) USING BTREE,
KEY `idx_query` (`create_time`,`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='MQ消费者记录';
CREATE TABLE `infrastructure_message` (
`id` bigint(20) NOT NULL COMMENT '消息标识',
`key1` nvarchar(20) DEFAULT NULL COMMENT '相关主键',
`content` text COMMENT '消息内容',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `unq_id` (`id`) USING BTREE,
KEY `idx_key` (`key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='报文数据';
2.2 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
</dependencies>
2.3 相关配置
# 数据库配置
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=sa000
# mybatis 相关配置
mybatis.mapper-locations=classpath:mapper/*/*Mapper.xml
# rabbitmq 配置
## 主机地址
spring.rabbitmq.host=111.231.83.100
## 端口号
spring.rabbitmq.port=5672
## 虚拟主机路径
spring.rabbitmq.virtual-host=/
## 连接超时时间
spring.rabbitmq.connection-timeout=15000
## 消费者设置手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消费者每次消费数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=5
## 开启 confirm 确认机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
## 开启 Return 确认机制
spring.rabbitmq.template.mandatory=true
2.4 实现MQ消息数据库持久化
@Repository
public class MQRepository {
private final MqProducerMapper producerMapper;
private final MqConsumerMapper consumerMapper;
private final MessageMapper messageMapper;
private final MQFactory mqFactory;
public MQRepository(MqProducerMapper producerMapper, MqConsumerMapper consumerMapper, MessageMapper messageMapper, MQFactory mqFactory) {
this.producerMapper = producerMapper;
this.consumerMapper = consumerMapper;
this.messageMapper = messageMapper;
this.mqFactory = mqFactory;
}
public void saveMessage(Long messageId, String content, String type) {
Date currentTime = new Date();
Message message = mqFactory.createMessage(content, currentTime);
messageMapper.insertSelective(message);
final MqProducer producer = mqFactory.createProducer(messageId, message.getId(), type, currentTime);
producerMapper.insertSelective(producer);
}
public void sendSuccess(Long messageId) {
MqProducer producer = new MqProducer();
producer.setMessageId(messageId);
producer.setStatus(2);
producer.setUpdateTime(new Date());
MqProducerExample updateExample = new MqProducerExample();
updateExample.createCriteria()
.andMessageIdEqualTo(messageId)
.andStatusEqualTo(0);
producerMapper.updateByExampleSelective(producer, updateExample);
}
public void sendFailure(Long messageId, String errorInfo) {
Message errorMessage = mqFactory.createMessage(errorInfo, new Date());
messageMapper.insertSelective(errorMessage);
MqProducer producer = new MqProducer();
producer.setMessageId(messageId);
producer.setStatus(1);
producer.setUpdateTime(new Date());
producer.setErrorMessageId(errorMessage.getId());
producerMapper.updateByPrimaryKeySelective(producer);
}
public void consumeSuccess(Long messageId, String content){
Date currentTime = new Date();
Message message = mqFactory.createMessage(content, currentTime);
messageMapper.insertSelective(message);
final MqConsumer consumer = mqFactory.createConsumer (messageId, message.getId(), currentTime);
consumerMapper.insertSelective(consumer);
}
}
@Component
public class MQFactory {
public MqProducer createProducer(Long messageId, Long contentId, String type, Date currentTime) {
MqProducer producer = new MqProducer();
producer.setMessageId(messageId);
producer.setContentId(contentId);
producer.setType(type);
producer.setStatus(0);
producer.setTryCount(0);
producer.setNextRetryTime(currentTime);
producer.setCreateTime(currentTime);
producer.setUpdateTime(currentTime);
return producer;
}
public Message createMessage(String content, Date currentTime) {
Message message = new Message();
message.setId(new Date().getTime());
message.setCreateTime(currentTime);
message.setContent(content);
return message;
}
public MqConsumer createConsumer(Long messageId, Long contentId, Date currentTime) {
MqConsumer consumer = new MqConsumer();
consumer.setMessageId(messageId);
consumer.setContentId(contentId);
consumer.setType("");
consumer.setCreateTime(currentTime);
consumer.setUpdateTime(currentTime);
return consumer;
}
}
2.5 新建回调事件实现接口
@Component
public class ProducerSendConfirmCallback implements RabbitTemplate.ConfirmCallback {
private final MQRepository mqRepository;
public ProducerSendConfirmCallback(MQRepository mqRepository) {
this.mqRepository = mqRepository;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
Long messageId = Long.valueOf(correlationData.getId());
if (ack) {
mqRepository.sendSuccess(messageId);
} else {
mqRepository.sendFailure(messageId, cause);
}
}
}
@Component
public class ProducerSendReturnedCallback implements RabbitTemplate.ReturnCallback {
private final MQRepository mqRepository;
public ProducerSendReturnedCallback(MQRepository mqRepository) {
this.mqRepository = mqRepository;
}
/**
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误文本
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
Long messageId = Long.valueOf(message.getMessageProperties()
.getHeader("spring_returned_message_correlation"));
mqRepository.sendFailure(messageId, replyText);
}
}
2.6 新建 RabbitMQConfig 配置
@Configuration
public class RabbitMQConfig {
private final RabbitTemplate rabbitTemplate;
private final ProducerSendReturnedCallback producerSendReturnedCallback;
private final ProducerSendConfirmCallback producerSendConfirmCallback;
public RabbitMQConfig(RabbitTemplate rabbitTemplate, ProducerSendReturnedCallback producerSendReturnedCallback,
ProducerSendConfirmCallback producerSendConfirmCallback) {
this.rabbitTemplate = rabbitTemplate;
this.producerSendReturnedCallback = producerSendReturnedCallback;
this.producerSendConfirmCallback = producerSendConfirmCallback;
}
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(producerSendConfirmCallback);
rabbitTemplate.setReturnCallback(producerSendReturnedCallback);
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("orderExchange");
}
@Bean
public Queue orderQueue() {
return new Queue("orderQueue");
}
@Bean
public Binding orderBinding(TopicExchange orderExchange, Queue orderQueue) {
return BindingBuilder
// 创建队列
.bind(orderQueue)
// 创建交换机
.to(orderExchange)
// 指定路由 Key
.with("order.#");
}
}
2.7 创建生产者
@RestController
public class ProducerController {
private final RabbitTemplate rabbitTemplate;
private final MQRepository mqRepository;
public TestController(RabbitTemplate rabbitTemplate, MQRepository mqRepository) {
this.rabbitTemplate = rabbitTemplate;
this.mqRepository = mqRepository;
}
@GetMapping("/sendNotExistExchangeAndNotExistRoutingKeyMessage")
public void sendNotExistExchangeAndNotExistRoutingKeyMessage() {
Long messageId = new Date().getTime();
String type = "test";
String content = "123";
CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
mqRepository.saveMessage(messageId, content, type);
rabbitTemplate.convertAndSend("NotExistExchange", "NotExistRoutingKey", content, correlationData);
}
@GetMapping("/sendNotExistRoutingKeyMessage")
public void sendNotExistRoutingKeyMessage() {
Long messageId = new Date().getTime();
String type = "test";
String content = "123";
CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
mqRepository.saveMessage(messageId, content, type);
rabbitTemplate.convertAndSend("orderExchange", "NotExistRoutingKey", "123", correlationData);
}
@GetMapping("/sendMessage")
public void sendMessage() {
Long messageId = new Date().getTime();
String type = "test";
CorrelationData correlationData = new CorrelationData(String.valueOf(messageId));
Order order = new Order("10001", BigDecimal.valueOf(150));
mqRepository.saveMessage(messageId, order.toString(), type);
rabbitTemplate.convertAndSend("orderExchange", "order.create", order, correlationData);
}
}
2.8 创建消费者
@Component
public class OrderConsumer {
private final MQRepository mqRepository;
public OrderConsumer(MQRepository mqRepository) {
this.mqRepository = mqRepository;
}
@RabbitListener(queues = "orderQueue")
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
Long messageId = Long.valueOf((String)headers.get("spring_returned_message_correlation"));
mqRepository.consumeSuccess(messageId, order.toString());
System.err.println("消费端消费:" + order.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}