zoukankan      html  css  js  c++  java
  • RocketMQ 事务消息

    RocketMQ事务消息定义

    RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

    RocketMQ事务消息和业务流程

    1. 发送方发送半事务消息
    2. Broker收到半事务消息存储后返回结果
    3. 发送半事务消息方处理本地事务
    4. 发送方把本地事务处理结果以消息形式发送到Broker
    5. Broker在固定的时间内(默认60秒)未收到4的确认消息,Broker为发送方发送回查消息
    6. 业务发送发收到Broker回查消息后,查询本地业务执行结果
    7. 业务方发送回查结果消息

    1-4 是同步调用,5-7是异步调用。RocketMQ事务消息使用了2PC+事后补偿机制保证了最终一致性。

    RocketMQ事务消息实现原理

    1. 业务消息发送方使用TransactionMQProducer 发送业务消息到指定的Topic(如:Griez_Topic),TransactionMQProducer 会填充Message的properties属性键值对 TRAN_MSG=true,表示该消息为事务消息。
    2. Broker收到消息,在做存储逻辑时根据消息的properties的TRAN_MSG属性判断是否是事务消息,如果true(事务消息),把原Topic名字保存到Message的properties属性REAL_TOPIC中,并把本消息的Topic替换成RMQ_SYS_TRANS_HALF_TOPIC,然后保存并返回结果(到此完成发送半事务消息)。
    3. 业务方收到消息发送结果后,处理完本地事务。
    4. 把3处理结果保存到EndTransactionRequestHeader并给Broker发送END_TRANSACTION指令消息。
    5. Broker收到END_TRANSACTION指令消息,在RMQ_SYS_TRANS_HALF_TOPIC中获取对应的消息(变异消息)。如果业务本地事务是COMMIT情况,把消息还原原始消息(Topic还原成Griez_Topic)并保存到Griez_Topic中(此时该消费者端才能消费该消息),保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成;如果业务本地事务是ROLLBACK情况,保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成。
    6. Broker在启动是时启动了一个定时(60秒)回查的服务,根据半事务消息Topic RMQ_SYS_TRANS_HALF_TOPIC 和处理半事务消息Topic RMQ_SYS_TRANS_OP_HALF_TOPIC 差值判断需要回查的半事务消息,发CHECK_TRANSACTION_STATE回查指令到业务方查询本地事务执行结果。
    7. 业务方收到CHECK_TRANSACTION_STATE回查指令之后执行回查接口,然后把结果跟4的操作一样处理。

    RocketMQ事务消息源码解析

    Producer发送事务消息以及本地事务确认消息

    实现TransactionListener

    public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 2:
                    return LocalTransactionState.COMMIT_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    业务中需要实现TransactionListener接口。

    executeLocalTransaction 方法是具体的业务逻辑处理(本地事务处理);

    checkLocalTransaction 方法是Broker回查本地事务状态接口。

    TransactionMQProducer使用

    public static void main(String[] args) throws MQClientException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        TransactionListener transactionListener = new TransactionListenerImpl(); 
        TransactionMQProducer producer = new TransactionMQProducer("griez_test"); // @1
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
    
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService); // @2
        producer.setTransactionListener(transactionListener);
        producer.start();
    
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("Griez_Topic",  ("Hello Griez " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null); // @3
                Thread.sleep(1000);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        latch.wait();
        producer.shutdown();
    }
    
    1. 创建事务消息生产者 TransactionMQProducer
    2. 把TransactionListener注册到TransactionMQProducer中
    3. 发送消息

    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction

    public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);
    
        SendResult sendResult = null;
        // 标识为事务消息
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // @1
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 发送半事务消息
            sendResult = this.send(msg); // @2
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
    
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        // 处理本地事务
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg); // @3
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
    
                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
    
        try {
            // 处理结果
            this.endTransaction(sendResult, localTransactionState, localException); // @4
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
    
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
    
    1. 添加属性trans_msg为true,表示该消息是半事务消息;封装group属性,给broker绑定客户端回查用。
    2. 发送消息(跟普通消息一样发送模式),并等待broker接收消息的结果。
    3. 如果broker返回SEND_OK,调用TransactionListener的executeLocalTransaction方法执行本地事务,并返回本地事务执行结果。
    4. 发送本地事务执行结果指令给broker

    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction

    public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
    
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
    }
    

    以上是事务消息的投递过程。接下来是事务消息接收处理过程。

    Broker接收事务处理消息

    org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        // ~~~~~~ 省略N多无关代码
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {    // @1 事务提交
            // 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 参数校验
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @3
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 还原原始topic消息
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @4
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 存储原始topic消息,存储成功后消费者可以消费
                    RemotingCommand sendResult = sendFinalMessage(msgInner);  // @5
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                 // 还原原始topic成功后,往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
                       this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @6
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { //  @7 回滚
            // 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);// @8
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @9
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }
    
    1. 收到指令类型为事务提交时。
    2. 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
    3. 对指令中需要处理的半事务消息做参数校验。
    4. 还原原始topic信息(Griez_Topic)。
    5. 在原始topic中存储消息,这一步完成后业务消费端才可以消息。
    6. 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。
    7. 收到指令类型为事务回滚时。
    8. 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
    9. 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。

    本小节是Broker接收本地事务结果指令后的处理,下一节是Broker异步发回查指令到业务方。

    Broker回查处理

    Broker 启动时启动了一系列定时任务,代码跟踪链路 org.apache.rocketmq.broker.BrokerStartup#start -> org.apache.rocketmq.broker.BrokerController#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run -> org.apache.rocketmq.common.ServiceThread#waitForRunning -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd

    Broker端关键代码

    org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd

    @Override
    protected void onWaitEnd() {
        // 超时时间,默认60秒
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        // 回查次数 15
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }
    

    org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check

    @Override
    public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.info("Check topic={}, queues={}", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) {
                long startTime = System.currentTimeMillis();
                // 获取 RMQ_SYS_TRANS_OP_HALF_TOPIC
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 获取消费位点
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
                    continue;
                }
    
                // 封装已经完成的
                List<Long> doneOpOffset = new ArrayList<>();
                // 已经处理过的
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
                    continue;
                }
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                while (true) {
                    // 这里会不会有问题?
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    if (removeMap.containsKey(i)) { // 3
                        log.info("Half offset {} has been committed/rolled back", i);
                        removeMap.remove(i);
                    } else {
                        // 从 RMQ_SYS_TRANS_HALF_TOPIC 队列中 i 偏移量获取消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) {
                            // 获取为空可以重试一次
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
    
                        // 回查次数大于15次则丢弃 || 消息文件超过过期时间(72小时)则跳过
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 6
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.info("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
    
                        // 已存储的时间
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        // 消息指定的过期时间
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) { // 8
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // 是否需要回查
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);
                        if (isNeedCheck) {
                            // 把消息再次写入 RMQ_SYS_TRANS_HALF_TOPIC 中
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 重点,回查逻辑, 以broker为生产者给client发送 CHECK_TRANSACTION_STATE 消息
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            // 加载更多的消息
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
                            continue;
                        }
                    }
                    newOffset = i + 1;
                    i++;
                }
                if (newOffset != halfOffset) {
                    // 更新回查的偏移量
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    // 更新处理队列的偏移量
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("Check error", e);
        }
    
    }
    

    业务方接收回查指令关键代码:

    org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            // 回查消息
            case RequestCode.CHECK_TRANSACTION_STATE:
                return this.checkTransactionState(ctx, request);
            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
                return this.notifyConsumerIdsChanged(ctx, request);
            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
                return this.resetOffset(ctx, request);
            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
                return this.getConsumeStatus(ctx, request);
            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);
            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
            default:
                break;
        }
        return null;
    }
    

    org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState

    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if (messageExt != null) {
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                messageExt.setTransactionId(transactionId);
            }
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }
    
        return null;
    }
    

    org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState

    @Override
    public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
        Runnable request = new Runnable() {
            private final String brokerAddr = addr;
            private final MessageExt message = msg;
            private final CheckTransactionStateRequestHeader checkRequestHeader = header;
            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
    
            @Override
            public void run() {
                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
                TransactionListener transactionListener = getCheckListener();
                if (transactionCheckListener != null || transactionListener != null) {
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;
                    try {
                        if (transactionCheckListener != null) {
                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                        } else if (transactionListener != null) {
                            log.debug("Used new check API in transaction message");
                            localTransactionState = transactionListener.checkLocalTransaction(message);
                        } else {
                            log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                        }
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }
    
                    this.processTransactionState(
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }
    
            private void processTransactionState(
                final LocalTransactionState localTransactionState,
                final String producerGroup,
                final Throwable exception) {
                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true);
    
                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (uniqueKey == null) {
                    uniqueKey = message.getMsgId();
                }
                thisHeader.setMsgId(uniqueKey);
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
                switch (localTransactionState) {
                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                        break;
                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                        break;
                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                        break;
                    default:
                        break;
                }
    
                String remark = null;
                if (exception != null) {
                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
                }
    
                try {
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
        };
    
        this.checkExecutor.submit(request);
    }
    
  • 相关阅读:
    mysql 慢查询分析工具
    php+redis实现消息队列
    Mysql数据库千万级数据查询优化方案.....
    windows下安装docker详细步骤
    Git基础使用教程
    redis实现消息队列&发布/订阅模式使用
    macos上改变输入法顺序
    ssh动态转发小记
    ubuntu上runsv/runit小记
    使用libcurl下载https地址的文件
  • 原文地址:https://www.cnblogs.com/wolf-bin/p/15128162.html
Copyright © 2011-2022 走看看