RocketMQ事务消息定义
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
RocketMQ事务消息和业务流程
- 发送方发送半事务消息
- Broker收到半事务消息存储后返回结果
- 发送半事务消息方处理本地事务
- 发送方把本地事务处理结果以消息形式发送到Broker
- Broker在固定的时间内(默认60秒)未收到4的确认消息,Broker为发送方发送回查消息
- 业务发送发收到Broker回查消息后,查询本地业务执行结果
- 业务方发送回查结果消息
1-4 是同步调用,5-7是异步调用。RocketMQ事务消息使用了2PC+事后补偿机制保证了最终一致性。
RocketMQ事务消息实现原理
- 业务消息发送方使用TransactionMQProducer 发送业务消息到指定的Topic(如:Griez_Topic),TransactionMQProducer 会填充Message的properties属性键值对 TRAN_MSG=true,表示该消息为事务消息。
- Broker收到消息,在做存储逻辑时根据消息的properties的TRAN_MSG属性判断是否是事务消息,如果true(事务消息),把原Topic名字保存到Message的properties属性REAL_TOPIC中,并把本消息的Topic替换成RMQ_SYS_TRANS_HALF_TOPIC,然后保存并返回结果(到此完成发送半事务消息)。
- 业务方收到消息发送结果后,处理完本地事务。
- 把3处理结果保存到EndTransactionRequestHeader并给Broker发送END_TRANSACTION指令消息。
- Broker收到END_TRANSACTION指令消息,在RMQ_SYS_TRANS_HALF_TOPIC中获取对应的消息(变异消息)。如果业务本地事务是COMMIT情况,把消息还原原始消息(Topic还原成Griez_Topic)并保存到Griez_Topic中(此时该消费者端才能消费该消息),保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成;如果业务本地事务是ROLLBACK情况,保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成。
- Broker在启动是时启动了一个定时(60秒)回查的服务,根据半事务消息Topic RMQ_SYS_TRANS_HALF_TOPIC 和处理半事务消息Topic RMQ_SYS_TRANS_OP_HALF_TOPIC 差值判断需要回查的半事务消息,发CHECK_TRANSACTION_STATE回查指令到业务方查询本地事务执行结果。
- 业务方收到CHECK_TRANSACTION_STATE回查指令之后执行回查接口,然后把结果跟4的操作一样处理。
RocketMQ事务消息源码解析
Producer发送事务消息以及本地事务确认消息
实现TransactionListener
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.ROLLBACK_MESSAGE;
case 2:
return LocalTransactionState.COMMIT_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
业务中需要实现TransactionListener接口。
executeLocalTransaction 方法是具体的业务逻辑处理(本地事务处理);
checkLocalTransaction 方法是Broker回查本地事务状态接口。
TransactionMQProducer使用
public static void main(String[] args) throws MQClientException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("griez_test"); // @1
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService); // @2
producer.setTransactionListener(transactionListener);
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("Griez_Topic", ("Hello Griez " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null); // @3
Thread.sleep(1000);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
latch.wait();
producer.shutdown();
}
- 创建事务消息生产者 TransactionMQProducer
- 把TransactionListener注册到TransactionMQProducer中
- 发送消息
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 标识为事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // @1
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 发送半事务消息
sendResult = this.send(msg); // @2
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
// 处理本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg); // @3
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 处理结果
this.endTransaction(sendResult, localTransactionState, localException); // @4
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
- 添加属性trans_msg为true,表示该消息是半事务消息;封装group属性,给broker绑定客户端回查用。
- 发送消息(跟普通消息一样发送模式),并等待broker接收消息的结果。
- 如果broker返回SEND_OK,调用TransactionListener的executeLocalTransaction方法执行本地事务,并返回本地事务执行结果。
- 发送本地事务执行结果指令给broker
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}
以上是事务消息的投递过程。接下来是事务消息接收处理过程。
Broker接收事务处理消息
org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
// ~~~~~~ 省略N多无关代码
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // @1 事务提交
// 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 参数校验
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @3
if (res.getCode() == ResponseCode.SUCCESS) {
// 还原原始topic消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @4
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 存储原始topic消息,存储成功后消费者可以消费
RemotingCommand sendResult = sendFinalMessage(msgInner); // @5
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 还原原始topic成功后,往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @6
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // @7 回滚
// 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);// @8
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @9
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
- 收到指令类型为事务提交时。
- 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
- 对指令中需要处理的半事务消息做参数校验。
- 还原原始topic信息(Griez_Topic)。
- 在原始topic中存储消息,这一步完成后业务消费端才可以消息。
- 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。
- 收到指令类型为事务回滚时。
- 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
- 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。
本小节是Broker接收本地事务结果指令后的处理,下一节是Broker异步发回查指令到业务方。
Broker回查处理
Broker 启动时启动了一系列定时任务,代码跟踪链路 org.apache.rocketmq.broker.BrokerStartup#start -> org.apache.rocketmq.broker.BrokerController#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run -> org.apache.rocketmq.common.ServiceThread#waitForRunning -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
Broker端关键代码
org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
@Override
protected void onWaitEnd() {
// 超时时间,默认60秒
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 回查次数 15
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 获取 RMQ_SYS_TRANS_OP_HALF_TOPIC
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取消费位点
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
continue;
}
// 封装已经完成的
List<Long> doneOpOffset = new ArrayList<>();
// 已经处理过的
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
// 这里会不会有问题?
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) { // 3
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {
// 从 RMQ_SYS_TRANS_HALF_TOPIC 队列中 i 偏移量获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
// 获取为空可以重试一次
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 回查次数大于15次则丢弃 || 消息文件超过过期时间(72小时)则跳过
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 6
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.info("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
break;
}
// 已存储的时间
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
// 消息指定的过期时间
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) { // 8
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 是否需要回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 把消息再次写入 RMQ_SYS_TRANS_HALF_TOPIC 中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 重点,回查逻辑, 以broker为生产者给client发送 CHECK_TRANSACTION_STATE 消息
listener.resolveHalfMsg(msgExt);
} else {
// 加载更多的消息
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
// 更新回查的偏移量
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 更新处理队列的偏移量
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}
}
业务方接收回查指令关键代码:
org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
// 回查消息
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
return this.notifyConsumerIdsChanged(ctx, request);
case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
return this.resetOffset(ctx, request);
case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
return this.getConsumeStatus(ctx, request);
case RequestCode.GET_CONSUMER_RUNNING_INFO:
return this.getConsumerRunningInfo(ctx, request);
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
default:
break;
}
return null;
}
org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
@Override
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
this.checkExecutor.submit(request);
}