zoukankan      html  css  js  c++  java
  • RocketMQ事务消息实现原理上篇

    1 发送事务消息的入口为:TransactionMQProducer#sendMessageInTransaction:

        public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
            if (null == this.transactionListener) { // @1
                throw new MQClientException("TransactionListener is null", (Throwable)null);
            } else {
                return this.defaultMQProducerImpl.sendMessageInTransaction(msg, (LocalTransactionExecuter)null, arg);  // @2
            }
        }

      代码@1:如果transactionListener为空,则直接抛出异常。
         代码@2:调用defaultMQProducerImpl的sendMessageInTransaction方法。


    2. DefaultMQProducerImpl#sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
            TransactionListener transactionListener = this.getCheckListener();
            if (null == localTransactionExecuter && null == transactionListener) {
                throw new MQClientException("tranExecutor is null", (Throwable)null);
            } else {
                Validators.checkMessage(msg, this.defaultMQProducer);
                SendResult sendResult = null;
                MessageAccessor.putProperty(msg, "TRAN_MSG", "true");   // @1
                MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
    
                try {
                    sendResult = this.send(msg);
                } catch (Exception var11) {
                    throw new MQClientException("send message Exception", var11);
                }
    
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable localException = null;
                switch(sendResult.getSendStatus()) {
                case SEND_OK:
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
    
                        String transactionId = msg.getProperty("UNIQ_KEY");
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
    
                        if (null != localTransactionExecuter) {
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                        } else if (transactionListener != null) {
                            this.log.debug("Used new transaction API");
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
    
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            this.log.info(msg.toString());
                        }
                    } catch (Throwable var10) {
                        this.log.info("executeLocalTransactionBranch exception", var10);
                        this.log.info(msg.toString());
                        localException = var10;
                    }
                    break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                }
    
                try {
                    this.endTransaction(sendResult, localTransactionState, localException);
                } catch (Exception var9) {
                    this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var9);
                }
    
                TransactionSendResult transactionSendResult = new TransactionSendResult();
                transactionSendResult.setSendStatus(sendResult.getSendStatus());
                transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
                transactionSendResult.setMsgId(sendResult.getMsgId());
                transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
                transactionSendResult.setTransactionId(sendResult.getTransactionId());
                transactionSendResult.setLocalTransactionState(localTransactionState);
                return transactionSendResult;
            }
        }
    

      Step1:首先先阐述一下参数含义。final Message msg:消息;TransactionListener tranExecuter:事务监听器; Object arg:其他附加参数,该参数会再TransactionListener 回调函数中原值传入。

      Step2:代码@1 在消息属性中,添加两个属性:TRAN_MSG,其值为true,表示为事务消息;PGROUP:消息所属发送者组,然后以同步方式发送消息。

      DefaultMQProducerImpl#sendKernelImpl

    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                    }
    

      在消息发送之前,会先检查消息的属性TRAN_MSG,如果存在并且值为true,则通过设置消息系统标记的方式,设置消息为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

    SendMessageProcessor#sendMessage

    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
                 return response;
           }
          putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
          putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
    

      Step3:Broker端首先客户发送消息请求后,判断消息类型,如果是事务消息,则调用TransactionalMessageService#prepareMessage方法,否则走原先的逻辑,调用MessageStore#putMessage方法。

    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage

    public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
            return transactionalMessageBridge.putHalfMessage(messageInner);
     }
    

      step4:事务消息,将调用TransactionalMessageServiceImpl#prepareMessage方法,继而调用TransactionalMessageBridge#prepareMessage方法。

    TransactionalMessageBridge#parseHalfMessageInner

    public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
            return store.putMessage(parseHalfMessageInner(messageInner));
        }
    
        private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
            MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
            MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
                String.valueOf(msgInner.getQueueId()));
            msgInner.setSysFlag(
                MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
            msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
            msgInner.setQueueId(0);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
            return msgInner;
        }
    

      

     Step5:备份消息的原主题名称与原队列ID,然后取消是事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。然后调用MessageStore#putMessage方法将消息持久化,这里TransactionalMessageBridge桥接类,就是封装事务消息的相关流程,最终调用MessageStore完成消息的持久化。消息入库后,会继续回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通过同步将消息发送到消息服务端。
    DefaultMQProducerImpl#sendMessageInTransaction

    switch (sendResult.getSendStatus()) {
                case SEND_OK: {
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
                        String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
                        localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            log.info(msg.toString());
                        }
                    } catch (Throwable e) {
                        log.info("executeLocalTransactionBranch exception", e);
                        log.info(msg.toString());
                        localException = e;
                    }
                }
                break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                    break;
                default:
                    break;
            }
    

      

    Step6:如果消息发送成功,会回调TransactionListener#executeLocalTransaction方法,执行本地事务,并且返回本地事务状态为:public enum LocalTransactionState {COMMIT_MESSAGE,ROLLBACK_MESSAGE,
    UNKNOW,} 之一,注意:TransactionListener#executeLocalTransaction是在发送者成功发送PREPARED消息后,会执行本地事务方法,然后返回本地事务状态;如果PREPARED消息发送失败,则不会调用
    TransactionListener#executeLocalTransaction,并且本地事务消息,设置为
    LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滚。
       DefaultMQProducerImpl#sendMessageInTransaction

    try {
    this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
    

      step7:调用endTransaction方法结束事务(提交或回滚)。
       DefaultMQProducerImpl#endTransaction

    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
             break;
        case ROLLBACK_MESSAGE:
             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
             break;
         case UNKNOW:
             requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
             break;
         default:
             break;
    }
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    

      

     step8:组装结束事务请求,主要参数为:事务ID、事务操作(commitOrRollback)、消费组、消息队列偏移量、消息ID,fromTransactionCheck,从这里发出的请求,默认为false。Broker端的请求处理器为:EndTransactionProcessor。
       step9:EndTransactionProcessor根据事务提交类型:TRANSACTION_COMMIT_TYPE(提交事务)、TRANSACTION_ROLLBACK_TYPE(回滚事务)、TRANSACTION_NOT_TYPE、忽略该请求,会记录info级别的日志相关的代码将在下文详细分析,在这里,我们先大概梳理一条消息发送的路径TransactionMQProducer#sendMessageInTransaction的调用链来总结一下事务消息的发送流程。

    本文到这里,初步展示了事务消息的发送流程,总的说来,RocketMQ的事务消息发送使用二阶段提交思路,首先,在消息发送时,先发送消息类型为Prepread类型的消息,然后在将该消息成功存入到消息服务器后,会回调   TransactionListener#executeLocalTransaction,执行本地事务状态回调函数,然后根据该方法的返回值,结束事务:
       1、COMMIT_MESSAGE :提交事务。
       2、ROLLBACK_MESSAGE:回滚事务。
       3、UNKNOW:未知事务状态,此时消息服务器(Broker)收到EndTransaction命令时,将不对这种消息做处理,消息还处于Prepared类型,存储在主题为:RMQ_SYS_TRANS_HALF_TOPIC的队列中,然后消息发送流程将结束,那这些消息如何提交或回滚呢?为了实现避免客户端需要再次发送提交、回滚命令,RocketMQ会采取定时任务将RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客户端,判断该消息是否需要提交或回滚,来完成事务消息的声明周期,该部分内容将在下节重点探讨。


    作者:唯有坚持不懈
    来源:CSDN
    原文:https://blog.csdn.net/prestigeding/article/details/81263833

  • 相关阅读:
    re | [SWPU2019]ReverseMe
    wp | re | 2020“巅峰极客”网络安全技能挑战赛
    re | [CFI-CTF 2018]IntroToPE
    re | [FlareOn1]Bob Doge
    re | [ACTF新生赛2020]SoulLike
    re | [GKCTF2020]Chelly's identity
    ospf配置与ofps选举DR/BDR
    静态路由的配置
    配置三层交换机实现vlan间通信
    hybrid接口
  • 原文地址:https://www.cnblogs.com/yx88/p/11147941.html
Copyright © 2011-2022 走看看