zoukankan      html  css  js  c++  java
  • Spring Cloud Alibaba学习笔记(11)

    在Spring中,我们要实现事务,一般通过@Transactional注解实现。这在引入RocketMQ之前没有问题,但是在引入了RocketMQ之后,如果消息发送之后的业务逻辑处理发生了异常的话,这时候消息已经发送出去了,就会导致业务的问题。
    为了解决这一问题,RocketMQ引入了Transactional Message【事务消息】。
    RocketMQ事务消息流程图

    1. 生产者向MQServer发送半消息【半消息:会存储进MQ Server,但是被标记为不能投递状态】
    2. 发送半消息成功,生产者实行本地事务
    3. 根据本地事务结果向MQ Server发送二次确认请求
    4. MQ Server根据接受到的消息投递或者丢弃消息
    5. 若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态,MQ服务器将向同一组中的每个生产者发送检查消息,然后继续3,4的操作

    PS:消息三态

    • Commit:提交事务信息,消费者可以消费此消息
    • Rollback:回滚事务消息,broker会删除这条消息,消费者不能消费
    • UNKNOWN:broker需要回查确认消息状态

    代码实现

    发送半消息

    利用rocketMQTemplate类的sendMessageInTransaction实现半消息发送
    第一个参数为txProducerGroup:就是group名称,根据业务自定义
    第二次信息为destination:topic名称
    第三个信息为message:消息体,利用MessageBuilder.withPayload构建
    第四个信息为arg:业务对象,用于处理本地业务
    代码如下:

    // 发送半消息
    rocketMQTemplate.sendMessageInTransaction(
            "test-transactional",
            "test-topic",
            MessageBuilder.withPayload(
                    Demo.builder().demoId(1).remark("哈哈哈").build()
            ).setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString()).build(),
            forObject
    );
    

    实现RocketMQLocalTransactionListener接口

    新建demoTransactionalListener类,继承RocketMQLocalTransactionListener接口,实现了两个方法
    executeLocalTransaction:处理本地业务
    checkLocalTransaction:MQ Server发送检查信息相应
    添加@RocketMQTransactionListener注解,txProducerGroup属性值与半消息的txProducerGroup参数值相同
    代码如下:

    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.apache.rocketmq.spring.support.RocketMQHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    
    @RocketMQTransactionListener(txProducerGroup = "test-transactional")
    public class demoTransactionalListener implements RocketMQLocalTransactionListener {
        /**
         * 处理本地事务
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
            // 消息头
            MessageHeaders headers = message.getHeaders();
            String transactionalId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
    
            // arg:sendMessageInTransaction方法的第四个参数,用于处于本地业务
    
            try {
                // 本地业务
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    
        /**
         * 若在本地事务执行过程中缺少二次确认消息或生产者处于等待状态
         * MQ Server将向同一组中的每个生产者发送检查消息
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    
            try {
                // 检查
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
    }
    

    PS:MQ Server发送检查:可以通过新建一个表,记录本地业务成功或者失败,然后检查相应只需要查一下数据就可以了

  • 相关阅读:
    leetcode 334. Increasing Triplet Subsequence
    leetcode 235. Lowest Common Ancestor of a Binary Search Tree
    leetcode 459. Repeated Substring Pattern
    python爬虫之Xpath
    python爬虫之bs4 美丽汤
    python3爬虫的模拟浏览器
    python爬虫之requests库
    ModelForm:表单中的 Field 和模型中的 Field重复解决
    django中关于表单自定义验证器和常用验证器
    关于http连接的本质 已经cookies和session
  • 原文地址:https://www.cnblogs.com/fx-blog/p/11738419.html
Copyright © 2011-2022 走看看