zoukankan      html  css  js  c++  java
  • 【RocketMQ源码学习】- 4. Client 事务消息源码解析

    介绍

    > 基于4.5.2版本的源码
    1. RocketMQ是从4.3.0版本开始支持事务消息的。
    2. RocketMQ的消息队列能够保证生产端,执行数据和发送MQ消息事务一致性,而消费端的事务一致则有消费重试来补偿实现
    3. 基于2PC思想来实现,增加一个补偿逻辑来处理二阶段超时或者失败的消息

    名词解释

    名词解释
    prepare消息 又名Half Message,半消息,标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息
    RMQ_SYS_TRANS_HALF_TOPIC prepare消息在被投递到Mq服务器后,会存储于Topic为RMQ_SYS_TRANS_HALF_TOPIC的消费队列中
    RMQ_SYS_TRANS_OP_HALF_TOPIC 在prepare消息被commit或者rollback处理后,会存储到Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中,标识prepare消息已被处理

    RocketMQ事务消息流程概要

     

    1.事务消息发送及提交:
    1. 发送消息(half消息)。
    2. 服务端响应消息写入结果。
    3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
    4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
    2.补偿流程:
    1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
    2. Producer收到回查消息,检查回查消息对应的本地事务的状态
    3. 根据本地事务状态,重新Commit或者Rollback 

    Producer发送事务消息示例Demo

    下面是发送事务消息的Demo, 省略了一部分代码,不可运行,包含了 一、需要去设置事务监听,二、设置检查事务的线程

    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
            ExecutorService executorService = new ThreadPoolExecutor()// 创建线程池代码,省略了一部分
    
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            // 省略的code。。。
    
            // 发送事务消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    
            producer.shutdown();
        }
    }

    手动实现事务监听的类

    1. 需要去设置事务监听
    2. 设置检查事务的线程
    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            System.out.println("执行本地事务" + msg.getTransactionId());
            return LocalTransactionState.UNKNOW;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("检查本地事务:" + msg.getTransactionId());
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

     发送消息的源码

    1. 发送消息时,RocketMQ会先以同步的方式发送Half消息
    2. 当发送成功后执行实现的executeLocalTransaction方法
    3. 把方法的事务状态以oneWay的发送告诉broker
    4. 针对事务为unknow的,broker会发起调用checkLocalTransaction来检查本地事务状态。
    // DefaultMQProducerImpl#sendMessageInTransaction
    public
    TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
            // 1. 获取事务监听,事务监听不能为空
            TransactionListener transactionListener = getCheckListener();
            if (null == localTransactionExecuter && null == transactionListener) {
                throw new MQClientException("tranExecutor is null", null);
            }
            Validators.checkMessage(msg, this.defaultMQProducer);
             // 2. 为msg添加属性{"TRAN_MSG": "true", "PGROUP": defaultMQProducer.getProducerGroup()}
             // 给消息设置Half属性, 用于后续broker接收到消息判断是否是事务消息的prepare
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
            try {
                // 3.发送第一阶段消息,以同步的方式发送,并获取返回值
                sendResult = this.send(msg);
            } catch (Exception e) {
                throw new MQClientException("send message Exception", e);
            }
    
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            switch (sendResult.getSendStatus()) {
                case SEND_OK: {  // 第一阶段消息发送成功
                    try {
                        if (sendResult.getTransactionId() != null) {
                  // 设置事务id属性
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }
                        String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                        // 设置事务ID transactionId=UNIQ_KEY
                if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }
                        if (null != localTransactionExecuter) {
                            // 用线程来执行本地事务
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                        } else if (transactionListener != null) {
                            log.debug("Used new transaction API");
                            // 发送消息成功后,执行本地操作
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }
                        if (null == localTransactionState) {  // 若返回的事务状态为空,则设置本地事务状态为unknow
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
    
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            log.info(msg.toString());
                        }
                    } catch (Throwable e) {
                        log.info("executeLocalTransactionBranch exception", e);
                        log.info(msg.toString());
                        localException = e;
                    }
                }
                break;
                case FLUSH_DISK_TIMEOUT:        // 刷盘超时
                case FLUSH_SLAVE_TIMEOUT:       // 从节点刷盘超时
                case SLAVE_NOT_AVAILABLE:       // 从节点不可用
                    // 设置rollBack
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                    break;
                default:
                    break;
            }
    
            try {
                // 重点:设置结束事务
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception e) {
                log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
            }
    
            // 组装TransactionSendResult, 并返回
            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }

    注:发送同步消息的代码解析在另一篇文章 https://www.cnblogs.com/milicool/p/11836450.html

    上面的代码做了三件事:

      1. 发送一阶段消息

      2. 执行本地事务,返回事务执行的状态,有三种,未知、回滚、提交

      3. 执行endTransaction方法,想事务执行的状态告诉broker,

    结束事务endTransaction方法解析

    1. 发送的第一阶段消息,在broker接收端,会把第一阶段消息的topic修改为RMQ_SYS_TRANS_HALF_TOPIC,这个topic是对消费者不可见的
    2. endTranscation方法会把本地事务执行的状态告诉broker
    3. 如果是提交或者回滚则把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC, 如果是事务状态是unknow,会过一段时间执行检查事务的方法

    查看endTransaction方法

    // DefaultMQProducerImpl#endTransaction方法
    public void endTransaction(
            final SendResult sendResult,
            final LocalTransactionState localTransactionState,
            final Throwable localException) {
        // 获取broker地址
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublis
        // 构建EndTransactionRequestHeader
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());   // offset是prepare消息中offsetMsgId中获取的
        requestHeader.setCommitOrRollback   // 设置回滚/提交状态
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); 
        requestHeader.setMsgId(sendResult.getMsgId());
        // 调用无返回值的发消息队列的方法
        mQClientFactory.getMQClientAPIImpl().endTransactionOneway
    }
    
    // MQClientAPIImpl#endTransactionOneway方法
    public void endTransactionOneway(
            final String addr,
            final EndTransactionRequestHeader requestHeader,
            final String remark,
            final long timeoutMillis
        ) throws RemotingException, MQBrokerException, InterruptedException {
        // 创建远程调用的命令 cmd=END_TRANSACTION 在broker会根据cmd执行事务消息逻辑
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
    
        request.setRemark(remark);
        /** 
          *  调用无返回值发送消息的方法, 内部做的操作:
          *   1. 根据broker的iP地址, 获取连接的channel, 并把channel缓存到channelTables的map中 (在读写channelTables中会使用重入锁来控制并发)
          *   2. 调用doBeforeRpcHooks方法做acl的鉴权
          *   3. 调用this.invokeOnewayImpl(channel, request, timeoutMillis);发送请求
          */
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }
    
    // invokeOneway内部是调用NettyRemotingAbstract#invokeOnewayImpl方法
    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
        // 对semaphoreOneway上锁
        acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        // 上锁成功
            // netty写请求
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {}
            
    
            // 写完释放锁,
            // 报异常也释放锁
        // 上锁失败
            // timeoutMillis <= 0 请求太多异常
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            // timeoutMillis > 0
            throw new RemotingTimeoutException(info);
    }

    到这里,就完成了事务消息流程中1,2,3,4这个几个步骤,下面还有两个重要的流程:

    1. 针对一阶段发送的Half消息,broker要进行处理
    2. 针对endTransaction提交进来的事务状态,broker端要进行处理,如提交的是unknow状态,要进行检查本地事务的状态,如果是提交或者回滚则把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC

    Broker端处理消息

    接下来,我们查看下收到消息的broker是如何处理消息。从服务端接受到消息流程是这样的

    重点查看SendMessageProcessor#sendMessage方法是如何存储消息的
      1 // SendMessageProcessor#sendMessage
      2 private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
      3                                         final RemotingCommand request,
      4                                         final SendMessageContext sendMessageContext,
      5                                         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
      6     // 生成command返回值类
      7     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
      8     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
      9 
     10     // 设置requestId
     11     response.setOpaque(request.getOpaque());
     12     // 设置broker所在的区域ID,取自BrokerConfig#regionId
     13     response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
     14     // 设置是否需要跟踪 traceOn
     15     response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
     16 
     17     log.debug("receive SendMessage request command, {}", request);
     18     // startTimstamp可以判断broke是否可用
     19     final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
     20     if (this.brokerController.getMessageStore().now() < startTimstamp) {
     21         response.setCode(ResponseCode.SYSTEM_ERROR);
     22         response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
     23         return response;
     24     }
     25 
     26     response.setCode(-1);
     27     // 检查 broker是否有写的权限 请求的topic是否可以发消息 、topicConfig、queueIdInt
     28     super.msgCheck(ctx, requestHeader, response);
     29     if (response.getCode() != -1) {
     30         return response;
     31     }
     32 
     33     final byte[] body = request.getBody();
     34 
     35     int queueIdInt = requestHeader.getQueueId();
     36     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
     37 
     38     if (queueIdInt < 0) {   // queueIdInt小于0,则重新设置queueIdInt
     39         queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
     40     }
     41 
     42     MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
     43     // 此时的topic还是真实的topic
     44     msgInner.setTopic(requestHeader.getTopic());
     45     msgInner.setQueueId(queueIdInt);
     46 
     47     // 如果topic是%RETRY%重试消息,则需要重新设置重试次数,消费次数信息
     48     if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
     49         return response;
     50     }
     51 
     52     msgInner.setBody(body);
     53     msgInner.setFlag(requestHeader.getFlag());  
     54     MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
     55     msgInner.setPropertiesString(requestHeader.getProperties());    // 属性的赋值
     56     msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
     57     msgInner.setBornHost(ctx.channel().remoteAddress());    // channel客户端的地址
     58     msgInner.setStoreHost(this.getStoreHost()); // 要存储的store的IP地址
     59     msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
     60     PutMessageResult putMessageResult = null;
     61     Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
     62     // 发送Half消息时,在属性中设置了PROPERTY_TRANSACTION_PREPARED为true,这里根据这个属性判断是否是事务消息
     63     String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
     64     if (traFlag != null && Boolean.parseBoolean(traFlag)) { // 
     65         if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
     66             response.setCode(ResponseCode.NO_PERMISSION);
     67             response.setRemark(
     68                 "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
     69                     + "] sending transaction message is forbidden");
     70             return response;
     71         }
     72         // 重点:事务消息进入这里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盘的方式存入store
     73         // prepareMessage方法会调用TransactionalMessageServiceImpl类下transactionalMessageBridge.putHalfMessage方法
     74         putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
     75     } else {
     76         putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
     77     }
     78 
     79     // 处理返回值
     80     return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
     81 }
     82 
     83 // TransactionalMessageBridge#putHalfMessage
     84 public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
     85     // putMessage把topic消息写到commitlog
     86     // parseHalfMessageInner方法如下
     87     return store.putMessage(parseHalfMessageInner(messageInner));
     88 }
     89 
     90 // 解析Half消息
     91 private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
     92     // 把真实的topic和真实的queueId放在消息的属性中
     93     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
     94     MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
     95         String.valueOf(msgInner.getQueueId()));
     96     // 设置默认的事务状态为TRANSACTION_NOT_TYPE=>unknow
     97     msgInner.setSysFlag(
     98         MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
     99     // 将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC,这个是对消费者不可见的
    100     msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    101     // 设置queueId=0
    102     msgInner.setQueueId(0);
    103     msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    104     return msgInner;
    105 }

    Broker处理EndTransaction消息

    接下来,我们看一下,Broker是如何处理client用oneway方式发过来的endTransaction消息
    做的事:
    用offset查询prepare消息,如果是提交,把prepare消息的改为真实的topic消息写到commitlog中,写成功则进行删除prepare消息
    如果是回滚操作,

     1 // EndTransactionProcessor#processRequest
     2 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
     3     RemotingCommandException {
     4     // 创建默认response返回值,
     5     final RemotingCommand response = RemotingCommand.createResponseCommand(null);
     6     final EndTransactionRequestHeader requestHeader =
     7         (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
     8 
     9     // 判断是slave节点,直接返回 ,代码省略。。。
    10     // 打印日志:根据是否是检查本地事务的日志,是否是提交、回滚、未知,打印不同的日志,代码省略。。。
    11     
    12     OperationResult result = new OperationResult();
    13     if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 提交事务请求
    14         // 根据commitLogOffset获取文件中的message,获取到了返回success
    15         result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    16         if (result.getResponseCode() == ResponseCode.SUCCESS) {
    17             // 检查文件中的消息和请求的是否一致
    18             RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
    19             if (res.getCode() == ResponseCode.SUCCESS) {
    20                 // 生成要保存的消息 这里是commit操作,会把topic修改为真实的topic,queueId也修改为真实的
    21                 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    22                 msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
    23                 msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
    24                 msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
    25                 msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
    26                 // 重要:把真实的topic消息存储到commitlog
    27                 RemotingCommand sendResult = sendFinalMessage(msgInner);
    28                 if (sendResult.getCode() == ResponseCode.SUCCESS) {
    29                     // 重要:删除prepare消息, 其实并没有删除prepare消息,而是把消息改为RMQ_SYS_TRANS_OP_HALF_TOPIC,存入opQueueMap和store
    30                     this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    31                 }
    32                 return sendResult;
    33             }
    34             return res;
    35         }
    36     } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 回滚事务请求
    37         // 查询到half消息则返回成功
    38         result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    39         if (result.getResponseCode() == ResponseCode.SUCCESS) {
    40             // 检查请求request是否和获取到half消息一致
    41             RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
    42             if (res.getCode() == ResponseCode.SUCCESS) {
    43                 // 重要:删除prepare消息, 其实并没有删除prepare消息,而是把消息改为RMQ_SYS_TRANS_OP_HALF_TOPIC,存入opQueueMap和store
    44                 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    45             }
    46             return res;
    47         }
    48     }
    49 
    50     // 返回
    51     response.setCode(result.getResponseCode());
    52     response.setRemark(result.getResponseRemark());
    53     return response;
    54 }
    55 
    56 // 查看一下TransactionalMessageServiceImpl#deletePrepareMessage方法,删除prepareMassage
    57 1. deletePrepareMessage方法调用的是 transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)方法
    58 2. putOpMessage方法调用的是addRemoveTagInTransactionOp(messageExt, messageQueue)方法
    59 private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
    60     // buildOpTopic方法设置消息的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
    61     Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
    62         String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    63     // 把RMQ_SYS_TRANS_OP_HALF_TOPIC消息存入opQueueMap,同时写入store
    64     writeOp(message, messageQueue);
    65     return true;
    66 }

    本地事务状态回查

     》基于数据库模式的事务状态回查

    问题

    1. 说说为什么要回查
      整体的流程是:
      a. 发送half消息
      b. 发送失败,流程结束;发送成功,执行本地事务
      c. 把本地事务执行的状态,发送消息给broker
      所以在b中事务执行的状态可能为UNKNOW状态,或者c过程中,消息发送超时或者失败,broker不知道事务执行是否是成功还是失败,所以broker会启动事务补偿机制来检查本地事务的执行状态
    2. 最大回查次数?
      @ImportantField
      private int transactionCheckMax = 15;
    3. 第一次间隔多久回查?
      60s
      private long transactionCheckInterval = 60 * 1000;

    本地事务回查的流程

     1 /**
     2  * ClientRemotingProcessor#checkTransactionState
     3  * 内部获取请求的messageExt等信息后,会调用 DefaultMQProducerImpl#checkTransactionState
     4  */
     5  // DefaultMQProducerImpl#checkTransactionState
     6 public void checkTransactionState(final String addr, final MessageExt msg,
     7         final CheckTransactionStateRequestHeader header) {
     8     Runnable request = new Runnable() {
     9         // 获取broker地址、消息体、请求体、生产的group,留后续发送使用 【代码省略】
    10 
    11         // 调用线程
    12         @Override
    13         public void run() {
    14             // 新版本TransactionCheckListener不用了
    15             TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
    16             // getCheckListener()来获取我们重写的检查本地事务的方法
    17             TransactionListener transactionListener = getCheckListener();
    18             if (transactionCheckListener != null || transactionListener != null) {
    19                 // 初始化结果
    20                 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    21                 Throwable exception = null;
    22                 try {
    23                     if (transactionCheckListener != null) {
    24                         // 老版本从这里调用执行检查本地事务的接口
    25                         localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
    26                     } else if (transactionListener != null) {
    27                         // 新版本调用我们实现的checkLocalTransaction检查本地线程
    28                         log.debug("Used new check API in transaction message");
    29                         localTransactionState = transactionListener.checkLocalTransaction(message);
    30                     } else {
    31                         log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
    32                     }
    33                 } catch (Throwable e) {
    34                     log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
    35                     exception = e;
    36                 }
    37 
    38                 // 处理事务状态的方法, 代码详情如下:
    39                 this.processTransactionState(
    40                     localTransactionState,
    41                     group,
    42                     exception);
    43             } else {
    44                 log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
    45             }
    46         }
    47 
    48         // 处理事务状态的方法
    49         private void processTransactionState(
    50             final LocalTransactionState localTransactionState,  // 存储检查事务执行的状态值
    51             final String producerGroup,
    52             final Throwable exception) {
    53             // 设置
    54             final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
    55             thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
    56             thisHeader.setProducerGroup(producerGroup);
    57             thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
    58             thisHeader.setFromTransactionCheck(true);   // 检查事务的标志位
    59             // 获取uniqueKey,此时uniquekey=msgId
    60             String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    61             if (uniqueKey == null) {
    62                 uniqueKey = message.getMsgId();
    63             }
    64             thisHeader.setMsgId(uniqueKey);
    65             thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
    66             switch (localTransactionState) {
    67                 case COMMIT_MESSAGE:    // 提交状态
    68                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
    69                     break;
    70                 case ROLLBACK_MESSAGE:  // 回滚状态
    71                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
    72                     log.warn("when broker check, client rollback this transaction, {}", thisHeader);
    73                     break;
    74                 case UNKNOW:    // 未知状态
    75                     thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
    76                     log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
    77                     break;
    78                 default:
    79                     break;
    80             }
    81 
    82             String remark = null;
    83             if (exception != null) {
    84                 remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
    85             }
    86 
    87             try {
    88                 // 再次执行endTransactionOneway, 上文已经对结束事务的状态做了说明
    89                 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
    90                     3000);
    91             } catch (Exception e) {
    92                 log.error("endTransactionOneway exception", e);
    93             }
    94         }
    95     };
    96 
    97     // 提交线程的任务
    98     this.checkExecutor.submit(request);
    99 }

    结语

    重要的

    1. 自定义的TransactionListenerImpl事务监听类
    2. 发一阶段half消息的方法sendMessageInTransaction
    3. broker接受到TRAN_MSG属性为true的消息会把他的topic修改为RMQ_SYS_TRANS_HALF_TOPIC
    4. Broke处理EndTransaction消息会把提交和回滚状态的消息,存放到RMQ_SYS_TRANS_OP_HALF_TOPIC的topic中

    希望

    希望写的这篇文字,能帮助到想看这部分知识的人,当然由于本人的水平有限,难免出现错误,欢迎评论

     ======  【多学一点,for Better】======
     
  • 相关阅读:
    python常用函数年初大总结
    Linux系统巡检常用命令
    码率
    视频码率计算问题
    Python快速教程
    MFC消息机制
    MySQL多表查询
    VoIP的话音质量测量方法
    用Py2exe打包Python脚本简单介绍
    在Eclipse中执行Andorid test preject提示The connection to adb is down, and a severe error has occured.解决方法
  • 原文地址:https://www.cnblogs.com/milicool/p/11884841.html
Copyright © 2011-2022 走看看