zoukankan      html  css  js  c++  java
  • RocketMQ笔记3-事务型消息

    事务型消息简介

    • RocketMQ事务消息(Transactional Message) 是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

    过程

    • 阶段一(发送消息,执行本地事务,更行消息状态)
      • 发送消息(half消息)
      • 服务端响应消息写入结果
      • 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
      • 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
    • 阶段二(补偿)
      • 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
      • Producer收到回查消息,检查回查消息对应的本地事务的状态
      • 根据本地事务状态,重新Commit或者Rollback
        其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
    • 详细过程可以到参考文档中

    事务型消息的使用

    • 事务型生产者
    @Component
    public class MqProducer {
    
        @Value("${mq.nameserver.addr}")
        private String nameAddr;
    
        @Value("${mq.topicname}")
        private String topicName;
    
        //事务型生成者
        private TransactionMQProducer transactionMQProducer;
    
        @PostConstruct
        public void init() throws MQClientException {
            transactionMQProducer=new TransactionMQProducer("transaction_producer_group");
            //设置nameSrv
            transactionMQProducer.setNamesrvAddr(nameAddr);
            
            //开启事务型生产者
            transactionMQProducer.start();
            
            //监听本地事务(关键),可以自定义一个类实现  TransactionListener  接口,或是通过匿名内部类
            transactionMQProducer.setTransactionListener(new TransactionListener() {
                 /*
                     执行本地事务
                */
                @Override
                public LocalTransactionState executeLocalTransaction(Message message, Object args) {                
                    try {
                       //执行本地事务
                    } catch (Exception e) {
                        e.printStackTrace();
                        //当执行到这里,说明本地事务执行异常,回滚
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                    //执行本地事务成功,提交
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                
                /*
                     回查方法
                */
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    //当LocalTransaction无法更新prepared消息,会通过这个方法执行本地策略
                    if(判断1){
                        //说明本地事务执行成功,提交
                        return LocalTransactionState.COMMIT_MESSAGE;
                    }else if(判断2){
                        //无法判断原因,当消息为UNKNOW时,会定时回查
                        return LocalTransactionState.UNKNOW;
                    }else{
                        //其他则一律回滚
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
            });
        }
    
         /*
                发送消息
         */
        public boolean sendMessage(Object value){  
            Map<String,Object> mapArgs= new HashMap<>();
            mapArgs.put("key",value);;
    
            Message message=new Message(topicName,"Tag", "message");
    
            TransactionSendResult transactionSendResult=null;
            try {
                 transactionSendResult = transactionMQProducer.sendMessageInTransaction(message, mapArgs);
            } catch (MQClientException e) {
                e.printStackTrace();
                return false;
            }
            if(transactionSendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){
                return true;
            }else if(transactionSendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){
                return false;
            }else {
                return false;
            }
        }
    }
    
    

    • 消费者
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者对象
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumerGroup");
            
            //设置NamesrvAddr 及消费位置ConsumeFromWhere
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            
            //订阅topic,  * 指该主题下的所有消息都能消费
            consumer.subscribe("test_topic","*");
            
            //注册监听并消费
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //获取消息
                    MessageExt msg = list.get(0);
                    try {
                            //可以根据消息来执行其他业务,达到和本地事务的最终一致性
                    }catch (Exception e){
                        e.printStackTrace();
                        int reconsumeTimes = msg.getReconsumeTimes();
                        if(reconsumeTimes == 2){
                            //记录日志,补偿处理
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费服务
            System.out.println("消费服务启动...");
            consumer.start();
        }
    }
    

    参考

  • 相关阅读:
    在Ubuntu20.04上安装MySQL8.0及正确配置[已验证]
    Linux Ubuntu 20.04 —添加开机启动(服务/脚本)
    Ubuntu进入文件夹路径及查看文件夹目录
    Ubuntu20.04安装JDK
    How to install shutter on Linux 20.04
    在Ubuntu 20.04 中以管理员Root 身份用可视化的方式打开根目录文件夹
    在Ubuntu 20.04 上安装Chromium
    Ubuntu20.04安装搜狗输入法的详细步骤
    Ubuntu 20.04换国内源 清华源 阿里源 中科大源 163源
    linux Deepin 20 系统更新源.txt
  • 原文地址:https://www.cnblogs.com/wuba/p/11772953.html
Copyright © 2011-2022 走看看