zoukankan      html  css  js  c++  java
  • 分布式事务解决方案4--消息队列MQ(事务最终一致方案)

    1、消息队列MQ(事务最终一致方案)介绍

    原理、流程与本地消息表类似

    不同点:

    1) 本地消息表改为MQ

    2) 定时任务改为MQ的消费者

    架构图

    优点: 

    不依赖定时任务,基于MQ更高效、更可靠。

    适合于公司内的系统 (比如公司内的多个系统,要做一致性处理)

    不同公司之间无法基于MQ,本地消息表更合适

    2、Rabbit MQ安装

    参考:Windown10下Rocket MQ 安装

    3、Rocket MQ生产者和消费者配置

    @Configuration
    public class RocketMQConfig {
    
        /**
         * 生产者 配置
         * @return
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQProducer producer(){
    
            DefaultMQProducer producer = new
                    DefaultMQProducer("paymentGroup");
            producer.setNamesrvAddr("localhost:9876");
            return producer;
        }
    
        /**
         * 消费者配置
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown" )
        public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
    
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
    
            // Subscribe one more more topics to consume.
            consumer.subscribe("payment", "*");
    
            consumer.registerMessageListener(messageListener);
            return  consumer;
        }
    }
    

      

    4、业务实现

     1) 配置RocketMQ

    @Configuration
    public class RocketMQConfig {
    
        /**
         * 生产者 配置
         * @return
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public DefaultMQProducer producer(){
    
            DefaultMQProducer producer = new
                    DefaultMQProducer("paymentGroup");
            producer.setNamesrvAddr("localhost:9876");
            return producer;
        }
    
        /**
         * 消费者配置
         */
        @Bean(initMethod = "start", destroyMethod = "shutdown" )
        public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup");
    
            // Specify name server addresses.
            consumer.setNamesrvAddr("localhost:9876");
    
            // Subscribe one more more topics to consume.
            consumer.subscribe("payment", "*");
    
            consumer.registerMessageListener(messageListener);
            return  consumer;
        }
    }
    

      这里的主题名称为payment

    2) 服务层PaymentService

    增加支付接口。扣掉余额后,将消息放到消息队列

        /**
         * 支付接口(消息队列) 。扣掉余额后,将消息放到消息队列
         * @param userId 用户Id
         * @param orderId 订单Id
         * @param amount 支付金额
         * @return 0: 成功; 1:用户不存在  2:余额不足
         */
        @Transactional(transactionManager = "tm134", rollbackFor = Exception.class)
        public int paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
    
            //支付操作 扣款
            AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
            if(accountA == null){
                return  1;
            }
            if(accountA.getBalance().compareTo(amount) < 0){
                return 2;
            }
    
            accountA.setBalance(accountA.getBalance().subtract(amount));
            accountAMapper.updateByPrimaryKey(accountA);
    
            Message message = new Message();
            message.setTopic("payment");
            message.setKeys(orderId +"");
            message.setBody("订单已支付".getBytes());
            try {
               SendResult result = producer.send(message);
               if(result.getSendStatus() == SendStatus.SEND_OK){
                   return  0;
               }else {
                   throw  new Exception("消息发送失败");
               }
            } catch (Exception e) {
                e.printStackTrace();
                throw  e;
            }
    
    
    
        }
    

      

    3)、控制层增加支付接口

    @RestController
    public class PaymentController {
    
        @Autowired
        private PaymentService paymentService;
    
    
        //localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200
        @RequestMapping("paymentMQ")
        public String paymentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
            int result = paymentService.paymentMQ(userId, orderId,amount);
            return  "支付结果:" + result;
        }
    }
    

      

    4、消费者

    @Component("messageListener")
    public class ChangeOrderStatus implements MessageListenerConcurrently {
    
    
        @Resource
        OrderMapper orderMapper;
    
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (list == null || list.size() == 0) {
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            System.out.println("Rocket MQ 消费者接收到消息");
            //默认list最大值为1
            for (MessageExt messageExt : list) {
                String orderId = messageExt.getKeys();
                String msg = new String(messageExt.getBody());
                System.out.println("msg=" + msg);
    
                Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
    
                if (order == null) {
                    //消费者订单查询不到,根据业务是否要再次消费
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                try {
                    order.setOrderStatus(1); //已支付
                    order.setUpdateTime(new Date());
                    order.setUpdateUser(0); //系统更新
                    orderMapper.updateByPrimaryKey(order);
                }catch (Exception e){
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
    
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }  

    消费者收到消息队列的消息后,更新订单状态为已支付。

    5、数据准备

    134和129两台数据库

     129数据库的t_order表中,order_status为0 未支付

    6、测试

    localhost:8080/paymentMQ?userId=1&orderId=10010&amount=200

     可以看到金额扣款200,并且订单状态order_status=1 已支付。

  • 相关阅读:
    centos7.6 安装与配置 MongoDB yum方式
    MongoDB 介绍
    centos 关闭selinux
    前端 HTML标签属性
    前端 HTML 标签嵌套规则
    前端 HTML 标签分类
    前端 HTML body标签相关内容 常用标签 表单标签 form里面的 input标签介绍
    前端 HTML body标签相关内容 常用标签 表单标签 form 表单控件分类
    前端 HTML form表单标签 select标签 option 下拉框
    POJ 1426
  • 原文地址:https://www.cnblogs.com/linlf03/p/14010865.html
Copyright © 2011-2022 走看看