zoukankan      html  css  js  c++  java
  • RocketMQ4.x 分布式事务消息

    分布式事务常见解决方案

    • 2PC:两阶段提交, 基于XA协议
    • TCC:Try、Confirm、Cancel
    • 等等

    框架

    • GTS -> 开源 Fescar。地址:https://github.com/alibaba/fescar
    • LCN -> 地址:https://github.com/codingapi/tx-lcn

    RokcetMQ 分布式事务消息的总体架构

    RocketMQ 事务消息:

    • RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

    半消息 Half Message:

    • 暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

    消息回查:

    • 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

    整体交互流程

    1. Producer向broker端发送消息。
    2. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
    3. 发送方开始执行本地事务逻辑。
    4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
    5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
    6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
    7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

    RocketMQ 事务消息的状态

    • COMMIT_MESSAGE:提交事务消息,消费者可以消费此消息
    • ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
    • UNKNOW:Broker需要回查确认消息的状态

    关于事务消息的消费

    事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。

    RocketMQ 分布式事务消息实战

    项目结构如下图所示:

    PayController 类代码如下:

    @RestController
    public class PayController {
    
        @Autowired
        private TransactionProducer transactionMQProducer;
    
    
        @RequestMapping("/api/v1/pay_cb")
        public Object callback(String tag, String otherParam) throws Exception {
            Message message = new Message(JmsConfig.TOPIC, tag, tag + "_key", tag.getBytes());
    
            SendResult sendResult = transactionMQProducer.getProducer().
                    sendMessageInTransaction(message, otherParam);
    
            System.out.printf("发送结果=%s, sendResult=%s 
    ", sendResult.getSendStatus(), sendResult.toString());
    
            return new HashMap<>();
        }
    }

    JmsConfig 类代码如下:

    public class JmsConfig {
        public static final String NAME_SERVER = "192.168.159.133:9876;192.168.159.130:9876";
    
        // public static final String NAME_SERVER = "192.168.159.130:9876";
        public static final String TOPIC = "xdclass_pay_test_topic_0";
    
        public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly";
    }

    PayConsumer 类代码如下:

    @Component
    public class PayConsumer {
        private DefaultMQPushConsumer consumer;
    
        private String consumerGroup = "pay_consumer_group";
    
        public PayConsumer() throws MQClientException {
    
            consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //默认是集群方式,可以更改为广播,但是广播方式不支持重试
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.subscribe(JmsConfig.TOPIC, "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    String key = msg.getKeys();
    
                    try {
    
                        System.out.printf("%s 2 Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
    
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
                    } catch (Exception e) {
                        System.out.println("消费异常");
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            consumer.start();
            System.out.println("consumer start ...");
        }
    }

    PayProducer 类代码如下:

    @Component
    public class PayProducer {
        private String producerGroup = "pay_producer_group";
    
        private DefaultMQProducer producer;
        
        public PayProducer() {
            producer = new DefaultMQProducer(producerGroup);
    
            //生产者投递消息重试次数
            producer.setRetryTimesWhenSendFailed(3);
    
            //指定NameServer地址,多个地址以 ; 隔开
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
    
            start();
        }
    
        public DefaultMQProducer getProducer() {
            return this.producer;
        }
    
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown() {
            this.producer.shutdown();
        }
    }

    TransactionProducer 和 TransactionListenerImpl 类代码如下:

    @Component
    public class TransactionProducer {
    
        private String producerGroup = "trac_producer_group";
    
        //事务监听器
        private TransactionListener transactionListener = new TransactionListenerImpl();
    
        private TransactionMQProducer producer = null;
        
        //一般自定义线程池的时候,需要给线程加个名称
        private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
    
        public TransactionProducer() {
            producer = new TransactionMQProducer(producerGroup);
    
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
    
            producer.setTransactionListener(transactionListener);
    
            producer.setExecutorService(executorService);
    
            //指定NameServer地址,多个地址以 ; 隔开
    
            start();
        }
    
        public TransactionMQProducer getProducer() {
            return this.producer;
        }
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start() {
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown() {
            this.producer.shutdown();
        }
    }
    
    class TransactionListenerImpl implements TransactionListener {
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
            System.out.println("====executeLocalTransaction=======");
            String body = new String(msg.getBody());
            String key = msg.getKeys();
            String transactionId = msg.getTransactionId();
            System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);
            // 执行本地事务begin TODO
            
            // 执行本地事务end TODO
    
            int status = Integer.parseInt(arg.toString());
    
            //二次确认消息,然后消费者可以消费
            if (status == 1) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
    
            //回滚消息,broker端会删除半消息
            if (status == 2) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
    
            //broker端会进行回查消息,再或者什么都不响应
            if (status == 3) {
                return LocalTransactionState.UNKNOW;
            }
            return null;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    
            System.out.println("====checkLocalTransaction=======");
            String body = new String(msg.getBody());
            String key = msg.getKeys();
            String transactionId = msg.getTransactionId();
            System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body);
    
            //要么commit 要么rollback
    
            //可以根据key去检查本地事务消息是否完成
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

    TransactionListener使用

    • executeLocalTransaction:执行本地事务
    • checkLocalTransaction:回查消息,要么 commit 要么rollback,reconsumeTimes 不生效

    注意点:TransactionMQProducer 的 groupName 要唯一,不能和普通的 producer 一样

  • 相关阅读:
    公共服务领域英文译写规范
    [.NET Core]
    [WebAPI]
    [C#]
    [ES]
    [.Net Core]
    [SourceTree]
    如何使用一个库中不存在的函数
    在使用masm32 rc编译资源文件.rc出现的问题
    MSDN 2005 安装问题
  • 原文地址:https://www.cnblogs.com/jwen1994/p/12384871.html
Copyright © 2011-2022 走看看