zoukankan      html  css  js  c++  java
  • 分布式事务解决方案4--消息队列MQ(事务最终一致方案)

    1、消息队列MQ(事务最终一致方案)介绍

    原理、流程与本地消息表类似

    不同点:

    1) 本地消息表改为MQ

    2) 定时任务改为MQ的消费者

    架构图

    优点: 

    不依赖定时任务,基于MQ更高效、更可靠。

    适合于公司内的系统 (比如公司内的多个系统,要做一致性处理)

    不同公司之间无法基于MQ,本地消息表更合适

    2、Rabbit MQ安装

    参考:Windown10下Rocket MQ 安装

    3、Rocket MQ生产者和消费者配置

    @Configuration
    public class RocketMQConfig {
    
        /**
         * 生产者 配置
         * @return
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQProducer producer(){
    
            DefaultMQProducer producer = new
                    DefaultMQProducer("paymentGroup");
            producer.setNamesrvAddr("localhost:9876");
            return producer;
        }
    
        /**
         * 消费者配置
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown" )
        public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
    
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
    
            // Subscribe one more more topics to consume.
            consumer.subscribe("payment", "*");
    
            consumer.registerMessageListener(messageListener);
            return  consumer;
        }
    }
    

      

    4、业务实现

     1) 配置RocketMQ

    @Configuration
    public class RocketMQConfig {
    
        /**
         * 生产者 配置
         * @return
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQProducer producer(){
    
            DefaultMQProducer producer = new
                    DefaultMQProducer("paymentGroup");
            producer.setNamesrvAddr("localhost:9876");
            return producer;
        }
    
        /**
         * 消费者配置
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown" )
        public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
    
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
    
            // Subscribe one more more topics to consume.
            consumer.subscribe("payment", "*");
    
            consumer.registerMessageListener(messageListener);
            return  consumer;
        }
    }
    

      这里的主题名称为payment

    2) 服务层PaymentService

    增加支付接口。扣掉余额后,将消息放到消息队列

        /**
         * 支付接口(消息队列) 。扣掉余额后,将消息放到消息队列
         * @param userId 用户Id
         * @param orderId 订单Id
         * @param amount 支付金额
         * @return 0: 成功; 1:用户不存在  2:余额不足
         */
        @Transactional(transactionManager = "tm134", rollbackFor = Exception.class)
        public int paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
    
            //支付操作 扣款
            AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
            if(accountA == null){
                return  1;
            }
            if(accountA.getBalance().compareTo(amount) < 0){
                return 2;
            }
    
            accountA.setBalance(accountA.getBalance().subtract(amount));
            accountAMapper.updateByPrimaryKey(accountA);
    
            Message message = new Message();
            message.setTopic("payment");
            message.setKeys(orderId +"");
            message.setBody("订单已支付".getBytes());
            try {
               SendResult result = producer.send(message);
               if(result.getSendStatus() == SendStatus.SEND_OK){
                   return  0;
               }else {
                   throw  new Exception("消息发送失败");
               }
            } catch (Exception e) {
                e.printStackTrace();
                throw  e;
            }
    
    
    
        }
    

      

    3)、控制层增加支付接口

    @RestController
    public class PaymentController {
    
        @Autowired
        private PaymentService paymentService;
    
    
        //localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200
        @RequestMapping("paymentMQ")
        public String paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
            int result = paymentService.paymentMQ(userId, orderId,amount);
            return  "支付结果:" + result;
        }
    }
    

      

    4、消费者

    @Component("messageListener")
    public class ChangeOrderStatus implements MessageListenerConcurrently {
    
    
        @Resource
        OrderMapper orderMapper;
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (list == null || list.size() == 0) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            System.out.println("Rocket MQ 消费者接收到消息");
            //默认list最大值为1
            for (MessageExt messageExt : list) {
                String orderId = messageExt.getKeys();
                String msg = new String(messageExt.getBody());
                System.out.println("msg=" + msg);
    
                Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
    
                if (order == null) {
                    //消费者订单查询不到,根据业务是否要再次消费
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                try {
                    order.setOrderStatus(1); //已支付
                    order.setUpdateTime(new Date());
                    order.setUpdateUser(0); //系统更新
                    orderMapper.updateByPrimaryKey(order);
                }catch (Exception e){
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
    
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }  

    消费者收到消息队列的消息后,更新订单状态为已支付。

    5、数据准备

    134和129两台数据库

     129数据库的t_order表中,order_status为0 未支付

    6、测试

    localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200

     可以看到金额扣款200,并且订单状态order_status=1 已支付。

  • 相关阅读:
    生活感悟
    shell语法
    mycat
    阐述 如何高效理解学习
    部署ETCD集群
    文件修改
    文件处理
    a's
    shell中备份web站点及数据库
    openssh版本升级修复漏洞
  • 原文地址:https://www.cnblogs.com/linlf03/p/14010865.html
Copyright © 2011-2022 走看看