1 配置 生产者
和普通的生成者不同,需要使用事物消息的生产者,并且需要指定 TransactionListenerImpl ,TransactionListenerImpl
/** * 初始化生产者 * * @param rockterMQConfig * @return * @throws MQClientException */ @Bean public DefaultMQProducer defaultMQProducer(RockterMQConfig rockterMQConfig) throws MQClientException { TransactionMQProducer producer = new TransactionMQProducer(rockterMQConfig.getProducerGroup()); producer.setNamesrvAddr(rockterMQConfig.getNamesrvAddr()); producer.setInstanceName(rockterMQConfig.getInstanceName()); producer.setTransactionListener( new TransactionListenerImpl() ); producer.start(); return producer; }
2 实现 TransactionListener ,TransactionListener 里面有两个方法,分别描述怎么持久化这个消息发送记录,和 提供会查,以供决定未被确认的消息应该丢弃,还是确认,或者重复回查
package com.xyebank.hzx.message.listener; import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class TransactionListenerImpl implements TransactionListener { // 存储事务状态信息 key:事务id value:当前事务执行的状态 private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); // 执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 事务id String transactionId = message.getTransactionId(); // 0:执行中,状态未知 1:执行成功 2:执行失败 localTrans.put(transactionId, 0); // 业务执行,本地事务,service System.out.println("hello-demo-transaction"); try { System.out.println("正在执行本地事务---"); Thread.sleep( 60*1000 ); System.out.println("本地事务执行成功---"); localTrans.put(transactionId, 1); } catch (InterruptedException e) { e.printStackTrace(); localTrans.put(transactionId, 2); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } // 消息回查 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { // 获取对应事务的状态信息 String transactionId = messageExt.getTransactionId(); // 获取对应事务id执行状态 Integer status = localTrans.get(transactionId); // 消息回查 System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status); switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; } }
备注:正常我们肯定不会这么干,因为这样重启一次就丢了。最好的做法是和本地数据库使用同一套事务环境,持久到本地数据库中。写到每次写入都持久化的redis也是可以的。
3 事务消息的发送
@Override @Transactional public void send(MessageInfo mqMessage) throws Exception { if (mqMessage == null) { log.debug("无效的消息"); return; } String body = com.alibaba.fastjson.JSONObject.toJSONString(mqMessage); Message message = new Message("log-topic", "user-tag", body.getBytes()); producer.sendMessageInTransaction(message, "args2"); log.debug("发送了消息:{}", body); }
备注:这里是发送事务消息,不是普通消息,第二个参数是可以随便填写,可以在TransactionListener 里面获取到
4 接收者那边就正常接收了。没啥特别的,只有在消息发
原理,发送的事务消息会在 mq中存着,不会投递出去,TransactionListener 鉴定到 事务提交,并且做好记录以后,会确认这个消息,然后消息会被投递到消费者,如果长时间没有收到确认消息,mq会调用回查接口,然后决定应该重发,删除,还是确认这个消息。
关于rocktermq 的 两篇博客
https://blog.csdn.net/u012921921/article/details/104331412
https://blog.51cto.com/qiangsh/1937184