zoukankan      html  css  js  c++  java
  • RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群

    根据上文的描述,发送事务消息的入口为:

        TransactionMQProducer#sendMessageInTransaction:
        public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
                if (null == this.transactionListener) {    // @1
                    throw new MQClientException("TransactionListener is null", null);
                }
         
                return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);  // @2
            }

    代码@1:如果transactionListener为空,则直接抛出异常。
    代码@2:调用defaultMQProducerImpl的sendMessageInTransaction方法。

    接下来重点分享sendMessageInTransaction方法

        DefaultMQProducerImpl#sendMessageInTransaction
        public TransactionSendResult sendMessageInTransaction(final Message msg,
                   final TransactionListener tranExecuter, final Object arg)  throws MQClientException {

    Step1:首先先阐述一下参数含义。

        final Message msg:消息
        TransactionListener tranExecuter:事务监听器
        Object arg:其他附加参数

        DefaultMQProducerImpl#sendMessageInTransaction
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
               sendResult = this.send(msg);
        } catch (Exception e) {
               throw new MQClientException("send message Exception", e);
        }

    Step2:在消息属性中增加两个属性:TRAN_MSG,其值为true,表示为事务消息;PGROUP:消息所属发送者组,然后以同步方式发送消息。在消息发送之前,会先检查消息的属性TRAN_MSG,如果存在并且值为true,则通过设置消息系统标记的方式,设置消息为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

        DefaultMQProducerImpl#sendKernelImpl
        final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
               sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
        }
         
        SendMessageProcessor#sendMessage
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                     response.setCode(ResponseCode.NO_PERMISSION);
                     response.setRemark(
                            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                     return response;
               }
              putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
              putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }

    Step3:Broker端收到客户端发送消息请求后,判断消息类型。如果是事务消息,则调用TransactionalMessageService#prepareMessage方法,否则走原先的逻辑,调用MessageStore#putMessage方法将消息存入Broker服务端。
    本节重点阐述事务消息的实现原理,故接下来将重点关注prepareMessage方法,如想了解RocketMQ消息存储相关,可以关注作者源码分析RocketMQ系列。

        org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage
        public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
                return transactionalMessageBridge.putHalfMessage(messageInner);
         }

    step4:事务消息,将调用TransactionalMessageServiceImpl#prepareMessage方法,继而调用TransactionalMessageBridge#prepareMessage方法。

        TransactionalMessageBridge#parseHalfMessageInner
        public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
                return store.putMessage(parseHalfMessageInner(messageInner));
            }
         
            private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
                MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
                MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                    String.valueOf(msgInner.getQueueId()));
                msgInner.setSysFlag(
                    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
                msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
                msgInner.setQueueId(0);
                msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
                return msgInner;
            }

    Step5:备份消息的原主题名称与原队列ID,然后取消消息的事务消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。然后调用MessageStore#putMessage方法将消息持久化,这里TransactionalMessageBridge桥接类,就是封装事务消息的相关流程,最终调用MessageStore完成消息的持久化。消息入库后,会继续回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通过同步将消息发送到消息服务端。

    注:这是事务消息Prepare状态的处理逻辑,消息是存储在消息服务器了,但存储的并不是原主题,而是RMQ_SYS_TRANS_HALF_TOPIC,故此时消费端是无法消费shen
    生产者发送的消息的。看到这里,如果对RocketMQ比较熟悉的话,肯定会有一个“定时任务”去取这个主题下的消息,然后则“合适”的时机将消息的主题恢复。

        DefaultMQProducerImpl#sendMessageInTransaction
        switch (sendResult.getSendStatus()) {
                    case SEND_OK: {
                        try {
                            if (sendResult.getTransactionId() != null) {
                                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                            }
                            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                            if (null != transactionId && !"".equals(transactionId)) {
                                msg.setTransactionId(transactionId);
                            }
                            localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
                            if (null == localTransactionState) {
                                localTransactionState = LocalTransactionState.UNKNOW;
                            }
         
                            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                                log.info(msg.toString());
                            }
                        } catch (Throwable e) {
                            log.info("executeLocalTransactionBranch exception", e);
                            log.info(msg.toString());
                            localException = e;
                        }
                    }
                    break;
                    case FLUSH_DISK_TIMEOUT:
                    case FLUSH_SLAVE_TIMEOUT:
                    case SLAVE_NOT_AVAILABLE:
                        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                        break;
                    default:
                        break;
                }

    Step6:如果消息发送成功,会回调TransactionListener#executeLocalTransaction方法,执行本地事务,并且返回本地事务状态为LocalTransactionState,枚举值如下:

        COMMIT_MESSAGE,
        ROLLBACK_MESSAGE,
        UNKNOW

    注意:TransactionListener#executeLocalTransaction是在发送者成功发送PREPARED消息后,会执行本地事务方法,然后返回本地事务状态;如果PREPARED消息发送失败,则不会调用TransactionListener#executeLocalTransaction,并且本地事务消息,设置为LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滚。

        DefaultMQProducerImpl#sendMessageInTransaction
        try {
        this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

    step7:调用endTransaction方法结束事务(提交或回滚)。

        DefaultMQProducerImpl#endTransaction
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                 break;
            case ROLLBACK_MESSAGE:
                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                 break;
             case UNKNOW:
                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                 break;
             default:
                 break;
        }
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());

    step8:组装结束事务请求,主要参数为:事务ID、事务操作(commitOrRollback)、消费组、消息队列偏移量、消息ID,fromTransactionCheck,从这里发出的请求,默认为false。Broker端的请求处理器为:EndTransactionProcessor。

    step9:EndTransactionProcessor根据事务提交类型:TRANSACTION_COMMIT_TYPE(提交事务)、TRANSACTION_ROLLBACK_TYPE(回滚事务)、TRANSACTION_NOT_TYPE(忽略该请求)。

    到目前为止,已详细梳理了RocketMQ事务消息的发送流程,更加准确的说是Prepare状态的消息发送流程。具体流程如图所示:

    本文到这里,初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,会回调TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务:
    1、COMMIT_MESSAGE :提交事务。
    2、ROLLBACK_MESSAGE:回滚事务。
    3、UNKNOW:未知事务状态,此时消息服务器(Broker)收到EndTransaction命令时,将不对这种消息做处理,消息还处于Prepared类型,存储在主题为:RMQ_SYS_TRANS_HALF_TOPIC的队列中,然后消息发送流程将结束,那这些消息如何提交或回滚呢?

    为了实现避免客户端需要再次发送提交、回滚命令,RocketMQ会采取定时任务将RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客户端,判断该消息是否需要提交或回滚,来完成事务消息的声明周期,该部分内容将在下节重点探讨。

    若您对RocketMQ技术感兴趣,请加入作者所在的 RocketMQ技术交流群
    ————————————————
    版权声明:本文为CSDN博主「阿里云技术」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_43970890/article/details/86076377

  • 相关阅读:
    Oracle EBS-SQL (PO-3):检查期间手工下达的采购订单记录数.sql
    Oracle EBS-SQL (PO-2):检查当月到货补单的记录数.sql
    Oracle EBS-SQL (PO-1):检查供货比例异常.sql
    Oracle EBS-SQL (MRP-2):检查期间主计划录入记录数.sql
    Oracle EBS-SQL (MRP-1):检查期间内计划完成的任务.sql
    Oracle EBS-SQL (INV-3):检查仓库库存价值明细.sql
    Oracle EBS-SQL (INV-2):检查帐户别名发放记录.sql
    Oracle EBS-SQL (INV-1):检查物料成本为0并且物料状态不是'NEW'的物料.sql
    Oracle EBS-SQL (BOM-13):检查未定义库存分的物料类.sql
    Oracle EBS-SQL (BOM-12):BOM清单查询
  • 原文地址:https://www.cnblogs.com/lishiqi-blog/p/11898862.html
Copyright © 2011-2022 走看看