事务型消息简介
- RocketMQ事务消息(Transactional Message) 是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
过程
- 阶段一(发送消息,执行本地事务,更行消息状态)
- 发送消息(half消息)
- 服务端响应消息写入结果
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
- 阶段二(补偿)
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
- Producer收到回查消息,检查回查消息对应的本地事务的状态
- 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
- 详细过程可以到参考文档中
事务型消息的使用
@Component
public class MqProducer {
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
//事务型生成者
private TransactionMQProducer transactionMQProducer;
@PostConstruct
public void init() throws MQClientException {
transactionMQProducer=new TransactionMQProducer("transaction_producer_group");
//设置nameSrv
transactionMQProducer.setNamesrvAddr(nameAddr);
//开启事务型生产者
transactionMQProducer.start();
//监听本地事务(关键),可以自定义一个类实现 TransactionListener 接口,或是通过匿名内部类
transactionMQProducer.setTransactionListener(new TransactionListener() {
/*
执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object args) {
try {
//执行本地事务
} catch (Exception e) {
e.printStackTrace();
//当执行到这里,说明本地事务执行异常,回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//执行本地事务成功,提交
return LocalTransactionState.COMMIT_MESSAGE;
}
/*
回查方法
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//当LocalTransaction无法更新prepared消息,会通过这个方法执行本地策略
if(判断1){
//说明本地事务执行成功,提交
return LocalTransactionState.COMMIT_MESSAGE;
}else if(判断2){
//无法判断原因,当消息为UNKNOW时,会定时回查
return LocalTransactionState.UNKNOW;
}else{
//其他则一律回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
}
/*
发送消息
*/
public boolean sendMessage(Object value){
Map<String,Object> mapArgs= new HashMap<>();
mapArgs.put("key",value);;
Message message=new Message(topicName,"Tag", "message");
TransactionSendResult transactionSendResult=null;
try {
transactionSendResult = transactionMQProducer.sendMessageInTransaction(message, mapArgs);
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
if(transactionSendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){
return true;
}else if(transactionSendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){
return false;
}else {
return false;
}
}
}
public class Consumer {
public static void main(String[] args) throws MQClientException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumerGroup");
//设置NamesrvAddr 及消费位置ConsumeFromWhere
consumer.setNamesrvAddr(Const.NAMESRV_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅topic, * 指该主题下的所有消息都能消费
consumer.subscribe("test_topic","*");
//注册监听并消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//获取消息
MessageExt msg = list.get(0);
try {
//可以根据消息来执行其他业务,达到和本地事务的最终一致性
}catch (Exception e){
e.printStackTrace();
int reconsumeTimes = msg.getReconsumeTimes();
if(reconsumeTimes == 2){
//记录日志,补偿处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费服务
System.out.println("消费服务启动...");
consumer.start();
}
}
参考