1、消息队列MQ(事务最终一致方案)介绍
原理、流程与本地消息表类似
不同点:
1) 本地消息表改为MQ
2) 定时任务改为MQ的消费者
架构图

优点:
不依赖定时任务,基于MQ更高效、更可靠。
适合于公司内的系统 (比如公司内的多个系统,要做一致性处理)
不同公司之间无法基于MQ,本地消息表更合适
2、Rabbit MQ安装
3、Rocket MQ生产者和消费者配置
@Configuration
public class RocketMQConfig {
/**
* 生产者 配置
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer producer(){
DefaultMQProducer producer = new
DefaultMQProducer("paymentGroup");
producer.setNamesrvAddr("localhost:9876");
return producer;
}
/**
* 消费者配置
*/
@Bean(initMethod = "start", destroyMethod = "shutdown" )
public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("payment", "*");
consumer.registerMessageListener(messageListener);
return consumer;
}
}
4、业务实现
1) 配置RocketMQ
@Configuration
public class RocketMQConfig {
/**
* 生产者 配置
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer producer(){
DefaultMQProducer producer = new
DefaultMQProducer("paymentGroup");
producer.setNamesrvAddr("localhost:9876");
return producer;
}
/**
* 消费者配置
*/
@Bean(initMethod = "start", destroyMethod = "shutdown" )
public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("payment", "*");
consumer.registerMessageListener(messageListener);
return consumer;
}
}
这里的主题名称为payment
2) 服务层PaymentService
增加支付接口。扣掉余额后,将消息放到消息队列
/**
* 支付接口(消息队列) 。扣掉余额后,将消息放到消息队列
* @param userId 用户Id
* @param orderId 订单Id
* @param amount 支付金额
* @return 0: 成功; 1:用户不存在 2:余额不足
*/
@Transactional(transactionManager = "tm134", rollbackFor = Exception.class)
public int paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
//支付操作 扣款
AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
if(accountA == null){
return 1;
}
if(accountA.getBalance().compareTo(amount) < 0){
return 2;
}
accountA.setBalance(accountA.getBalance().subtract(amount));
accountAMapper.updateByPrimaryKey(accountA);
Message message = new Message();
message.setTopic("payment");
message.setKeys(orderId +"");
message.setBody("订单已支付".getBytes());
try {
SendResult result = producer.send(message);
if(result.getSendStatus() == SendStatus.SEND_OK){
return 0;
}else {
throw new Exception("消息发送失败");
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
3)、控制层增加支付接口
@RestController
public class PaymentController {
@Autowired
private PaymentService paymentService;
//localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200
@RequestMapping("paymentMQ")
public String paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
int result = paymentService.paymentMQ(userId, orderId,amount);
return "支付结果:" + result;
}
}
4、消费者
@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {
@Resource
OrderMapper orderMapper;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list == null || list.size() == 0) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
System.out.println("Rocket MQ 消费者接收到消息");
//默认list最大值为1
for (MessageExt messageExt : list) {
String orderId = messageExt.getKeys();
String msg = new String(messageExt.getBody());
System.out.println("msg=" + msg);
Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
if (order == null) {
//消费者订单查询不到,根据业务是否要再次消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
try {
order.setOrderStatus(1); //已支付
order.setUpdateTime(new Date());
order.setUpdateUser(0); //系统更新
orderMapper.updateByPrimaryKey(order);
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
消费者收到消息队列的消息后,更新订单状态为已支付。
5、数据准备
134和129两台数据库

129数据库的t_order表中,order_status为0 未支付
6、测试
localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200

可以看到金额扣款200,并且订单状态order_status=1 已支付。