1、消息队列MQ(事务最终一致方案)介绍
原理、流程与本地消息表类似
不同点:
1) 本地消息表改为MQ
2) 定时任务改为MQ的消费者
架构图
优点:
不依赖定时任务,基于MQ更高效、更可靠。
适合于公司内的系统 (比如公司内的多个系统,要做一致性处理)
不同公司之间无法基于MQ,本地消息表更合适
2、Rabbit MQ安装
3、Rocket MQ生产者和消费者配置
@Configuration public class RocketMQConfig { /** * 生产者 配置 * @return */ @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer producer(){ DefaultMQProducer producer = new DefaultMQProducer("paymentGroup"); producer.setNamesrvAddr("localhost:9876"); return producer; } /** * 消费者配置 */ @Bean(initMethod = "start", destroyMethod = "shutdown" ) public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); // Subscribe one more more topics to consume. consumer.subscribe("payment", "*"); consumer.registerMessageListener(messageListener); return consumer; } }
4、业务实现
1) 配置RocketMQ
@Configuration public class RocketMQConfig { /** * 生产者 配置 * @return */ @Bean(initMethod = "start", destroyMethod = "shutdown") public DefaultMQProducer producer(){ DefaultMQProducer producer = new DefaultMQProducer("paymentGroup"); producer.setNamesrvAddr("localhost:9876"); return producer; } /** * 消费者配置 */ @Bean(initMethod = "start", destroyMethod = "shutdown" ) public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); // Subscribe one more more topics to consume. consumer.subscribe("payment", "*"); consumer.registerMessageListener(messageListener); return consumer; } }
这里的主题名称为payment
2) 服务层PaymentService
增加支付接口。扣掉余额后,将消息放到消息队列
/** * 支付接口(消息队列) 。扣掉余额后,将消息放到消息队列 * @param userId 用户Id * @param orderId 订单Id * @param amount 支付金额 * @return 0: 成功; 1:用户不存在 2:余额不足 */ @Transactional(transactionManager = "tm134", rollbackFor = Exception.class) public int paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception { //支付操作 扣款 AccountA accountA = accountAMapper.selectByPrimaryKey(userId); if(accountA == null){ return 1; } if(accountA.getBalance().compareTo(amount) < 0){ return 2; } accountA.setBalance(accountA.getBalance().subtract(amount)); accountAMapper.updateByPrimaryKey(accountA); Message message = new Message(); message.setTopic("payment"); message.setKeys(orderId +""); message.setBody("订单已支付".getBytes()); try { SendResult result = producer.send(message); if(result.getSendStatus() == SendStatus.SEND_OK){ return 0; }else { throw new Exception("消息发送失败"); } } catch (Exception e) { e.printStackTrace(); throw e; } }
3)、控制层增加支付接口
@RestController public class PaymentController { @Autowired private PaymentService paymentService; //localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200 @RequestMapping("paymentMQ") public String paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception { int result = paymentService.paymentMQ(userId, orderId,amount); return "支付结果:" + result; } }
4、消费者
@Component("messageListener") public class ChangeOrderStatus implements MessageListenerConcurrently { @Resource OrderMapper orderMapper; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (list == null || list.size() == 0) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } System.out.println("Rocket MQ 消费者接收到消息"); //默认list最大值为1 for (MessageExt messageExt : list) { String orderId = messageExt.getKeys(); String msg = new String(messageExt.getBody()); System.out.println("msg=" + msg); Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId)); if (order == null) { //消费者订单查询不到,根据业务是否要再次消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } try { order.setOrderStatus(1); //已支付 order.setUpdateTime(new Date()); order.setUpdateUser(0); //系统更新 orderMapper.updateByPrimaryKey(order); }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
消费者收到消息队列的消息后,更新订单状态为已支付。
5、数据准备
134和129两台数据库
129数据库的t_order表中,order_status为0 未支付
6、测试
localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200
可以看到金额扣款200,并且订单状态order_status=1 已支付。