zoukankan      html  css  js  c++  java
  • 数据一致性解决方案实践

    一、问题概述

      在实际业务中,有一个相对耗时的操作,但是客户端又需要急速响应,一般的处理就是使用缓存,但是这个处理如果涉及事务问题,那么就比较麻烦,一般情况下会使用消息队列,对简要数据做入库,后续的操作消费队列进行处理,这里就有个问题,就是前置的事务和消费消息的事务不是原子操作,那么就可能存在操作不一致的问题。

      举个例子,在秒杀场景中使用redis秒杀成功,但是发送完消息后,消息在消费过程中有可能成功也可能失败,这就造成了数据的不一致,那么这样的话,消息在秒杀的完成前发送MQ和秒杀完成后发送MQ都存在问题,在秒杀之前,那么有可能秒杀失败,在秒杀之后,有可能消息消费失败。

      对于上述的极端问题,实际在RocketMQ中已经有了对应的解决方案:事务消息。

      这里简单描述一下RocketMQ事务消息的流程和原理:

        

         1、首先生产者向Broker发送一个half消息(半事务消息),这时消息并不会被存储在原有的topic,而是会被存储在RMS_SYS_TRANS_HALF_TOPIC的topic中

        2、然后生产者继续执行本地事务,然后会将本地事务的执行结果发送给broker

        3、broker在接收到生产者本地事务的执行结果,如果结果是commit,则将消息放到原topic下,让消费者可以正常消费,如果结果是rollback,则将RMS_SYS_TRANS_HALF_TOPIC的topic中的消息置为已消费。

        4、同时broker内部启动了一个定时任务,会扫描RMS_SYS_TRANS_HALF_TOPIC的topic中没有被消费的消息,然后查询生产者本地事务结果后,按照上述的流程处理消息。

      更详细的流程如下图所示

          

      那么对于编码来说,就比较简单了,producer需要增加一个监听器,分别是本地事务的处理逻辑,还有就是消息回查实现方法。 

    二、实现

      1、秒杀业务实现

        首先处理秒杀的业务实现,因为事务消息也业务强相关,里面需要处理业务。

        public HttpResult startKilledMoreBetter(Long killId, String userId) throws BaseException {
            // 判断此商品是否已售罄
            Integer endStatus = (Integer) redisTemplate.opsForValue().get(Constants.REDIS_GOODS_END_KEY + killId);
    
            // 判断是否可以进行秒杀(省略)
            .........try {
                // 2、从缓存中扣减即可
                // 成功,失败
                // 扣减库存: 不考虑数据一致性问题,只需要在最终时候考虑数据一致性问题即可(这里是直接在扣减redis中的库存,保证不会扣超Long res = redisTemplate.opsForValue().increment(Constants.REDIS_GOODS_STOCK_KEY+killId, -1);)
                boolean res = this.reduceStock(killId);
    
                // 锁定库存
                // 支付完成后,对数据库减法 :
                // 1、数据库库存 - 锁定库存
                // 2、删除锁定库存
    
                if(!res){
                    return HttpResult.error("下单失败");
                }
    
    
                // 3、下单操作异步化
                // 队列:
                // BlockingQueue队列,disruptor队列
                // Redis消息队列
                // RocketMQ队列
                //下单
                TbSeckillOrder order = new TbSeckillOrder();
                order.setSeckillId(killId);
                order.setUserId(userId);
                // 把秒杀商品id,用户id成功放入队列,秒杀成功
                Boolean produce = SeckillQueue.getMailQueue().produce(order);
                if(!produce){
                    throw new BaseException(HttpStatus.SEC_GOODS_STOCK_FAIL,"下单失败");
                }
    
                seckillGoods.setStockCount(null);
                seckillGoods.setTranStatus(1);
                // 更新事务状态
                seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
    
    
                return HttpResult.ok("秒杀成功");
            } catch (Exception e) {
                e.printStackTrace();
    
                seckillGoods.setStockCount(null);
                seckillGoods.setTranStatus(2);
                // 更新事务状态
                seckillGoodsMapper.updateByPrimaryKeySelective(seckillGoods);
    
            }
            return null;
        }

      2、Producer

      生产者的定义,对于事务消息,最主要的是在发送时添加一个监听器,监听发送消息,发送成功时,执行事务;还有一个状态回查的方法。因此在处理本地事务时,需要加上一个判断表示,可以让消息回查事务成功失败状态。

    @Component
    public class MqProducer {
        @Autowired
        private MqConfigProperties configProperties;
        private TransactionMQProducer producer;
        @Autowired
        private SeckillOrderService orderService;
        @Autowired
        private SeckillGoodsMapper seckillGoodsMapper;
        @Autowired
        private RedisTemplate redisTemplate;
    
        @PostConstruct
        public void initProducer() {
            producer = new TransactionMQProducer(configProperties.getNamesvc_group());
            producer.setNamesrvAddr(configProperties.getNamesrvAddr());
            producer.setRetryTimesWhenSendFailed(3);
            try {
                producer.start();
                // 设置一个监听器
                producer.setTransactionListener(new TransactionListener() {
                    /*执行本地业务方法*/
                    @Override
                    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                        String seckillId = null;
                        try {
                            // 获取消息
                            String msg = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            // 把消息转换为map
                            Map<String,String> maps = JSON.parseObject(msg,Map.class);
                            // 获取数据
                            seckillId = maps.get("seckillId");
                            String userId = maps.get("userId");
                            // 调用下单业务方法,实现下单操作
                            orderService.startKilledMoreBetter(Long.parseLong(seckillId),userId);
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }catch (BaseException e){
                            // 订单下单异常现象,为了保证缓存操作一致性,需要对库存做回补
                            redisTemplate.opsForValue().increment(Constants.REDIS_GOODS_STOCK_KEY+seckillId, 1);
                        }
                        return null;
                    }
                    /*状态回查,确定事务提交,还是回滚*/
                    @Override
                    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                        try {
                            // 获取消息
                            String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            // 把消息转换为map
                            Map<String,String> maps = JSON.parseObject(msg,Map.class);
                            // 获取数据
                            String seckillId = maps.get("seckillId");
                            // 根据id查询事务状态
                            TbSeckillGoods seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillId);
                            // 获取事务状态
                            Integer tranStatus = seckillGoods.getTranStatus();
                            if (null != tranStatus) {
                                switch (tranStatus) {
                                    case 0:
                                        return LocalTransactionState.UNKNOW;
                                    case 1:
                                        return LocalTransactionState.COMMIT_MESSAGE;
                                    case 2:
                                        return LocalTransactionState.ROLLBACK_MESSAGE;
                                }
                            }
                            return LocalTransactionState.COMMIT_MESSAGE;
    
    
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
                });
                System.out.println("[Producer 已启动]");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public String send(String topic, String tags, String msg) {
            SendResult result = null;
            try {
                Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                result = producer.send(message);
                System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "{"MsgId":"" + result.getMsgId() + ""}";
        }
    
        @PreDestroy
        public void shutDownProducer() {
            if (producer != null) {
                producer.shutdown();
            }
        }
    
        /**
         * @Description: 发送消息,同步数据库库存
         * @Author: hubin
         * @CreateDate: 2020/10/26 21:59
         * @UpdateUser: hubin
         * @UpdateDate: 2020/10/26 21:59
         * @UpdateRemark: 修改内容
         * @Version: 1.0
         */
        public boolean asncSendMsg(Long seckillId) {
            try {
                Message message = new Message("seckill_goods_asnc_stock", "increase", (seckillId+"").getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送消息
                producer.send(message);
            } catch (Exception e) {
                e.printStackTrace();
                //发送失败
                return false;
            }
            return true;
        }
    
    
    
        /**
         * @Description: 发送消息,使用事务型消息把所有的操作原子化
         * @Author: hubin
         * @CreateDate: 2020/10/26 21:59
         * @UpdateUser: hubin
         * @UpdateDate: 2020/10/26 21:59
         * @UpdateRemark: 修改内容
         * @Version: 1.0
         */
        public boolean asncSendTransactionMsg(Long seckillId,String userId) {
            try {
    
                Map<String,String> maps = new HashMap<>();
                maps.put("seckillId",seckillId+"");
                maps.put("userId",userId);
    
                //把对象转换为字符串
                String jsonStr = JSON.toJSONString(maps);
    
                // 发送sekillId,userId
                Message message = new Message("seckill_goods_asnc_stock", "increase", jsonStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送事务消息
                producer.sendMessageInTransaction(message,null);
            } catch (Exception e) {
                e.printStackTrace();
                //发送失败
                return false;
            }
            return true;
        }
    }

      3、Consumer

        对于消费者就非常简单了,就是处理业务就好了,和正常的MQ没有什么差别。

    @Component
    public class MqConsumer {
    
    
        @Autowired
        private MqConfigProperties configProperties;
    
    
        @Autowired
        private SeckillGoodsMapper seckillGoodsMapper;
    
        @Bean
        public DefaultMQPushConsumer defaultMQPushConsumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(configProperties.getNamesvc_group());
            consumer.setNamesrvAddr(configProperties.getNamesrvAddr());
            try {
                //广播模式消费
                //consumer.setMessageModel(MessageModel.BROADCASTING);
                consumer.subscribe("seckill_goods_asnc_stock", "*");
    
                // 如果是第一次启动,从队列头部开始消费
                // 如果不是第一次启动,从上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
                            String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
    
                            //
                            Map<String,String> maps = JSON.parseObject(messageBody, Map.class);
    
                            String seckillId = maps.get("seckillId");
    
                            //执行扣减库存的操作
                            //同步数据库的库存
                            seckillGoodsMapper.updateSeckillGoodsByPrimaryKeyByLock(Long.parseLong(seckillId));
                            System.out.println("[Consumer] msgID(" + messageExt.getMsgId() + ") msgBody : " + seckillId);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出现异常,必须告知消息进行重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
                System.out.println("[Consumer 已启动]");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return consumer;
        }
    }

      4、方法调用

      实际就是在操作的时候,首先发送了一个 half message消息,然后在上面事务消息的监听器中,处理业务信息,处理完毕后自动发送确认消息,保证了消息和事务的一致性。

        @RequestMapping("/order/kill/better/{killId}/{token}")
        public HttpResult startKilledMoreBetter(@PathVariable Long killId, @PathVariable String token){
            //判断校验//获取userid
            String userId = user.getId()+"";
    
            try {
                // 在业务开始之前就发送一个消息:half message
                Object obj = executorService.submit(()-> {
                    // 发送消息
                    boolean res = producer.asncSendTransactionMsg(killId, userId);
                    // 判断
                    if(!res){
                        return HttpResult.error("下单失败");
                    }
                    return res;
                }).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
            return HttpResult.ok();
    
        }
    ------------------------------------------------------------------
    -----------------------------------------------------------
    ---------------------------------------------
    朦胧的夜 留笔~~
  • 相关阅读:
    史上最全HashMap红黑树解析
    使用httpClient 调用get,Post接口
    VS 安装resharper 后 无法进行UnitTest
    [转]大数据的高并发的数据库优化
    【转】2019年7月份,阿里最新Java高频面试真题汇总
    【转】Apache的架构师们遵循的30条设计原则
    B树索引最通俗易懂的介绍
    spring Boot 学习(八、Spring Boot与与监控管理)
    spring Boot 学习(七、Spring Boot与开发热部署)
    vs快捷键
  • 原文地址:https://www.cnblogs.com/liconglong/p/15480259.html
Copyright © 2011-2022 走看看