在RocketMQ中生产者有三种角色NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),
根据名字大概可以看出各个代表着什么作用,我们这里用TransactionProducer(事务)来解决分布式事务问题。
说到分布式事务,就会谈到那个经典的”账户转账”问题:2个账户,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?
一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。
但这里面有个问题:
A是先update DB,后发送消息呢?还是先发送消息,后update DB?
假设先update DB成功,发送消息网络失败,重发又失败,怎么办?
假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?
所以,这里下个结论:只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。
那这个问题怎么解决呢??
为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。
原理描述
生产者向broker发送一条未经commit不可消费的事务性消息(半消息)。如果发送成功返回SEND_OK
,则执行本地事务,执行成功则commit,commit过的消息可正常被服务端消费。执行失败则rollback,rollback的消息则被删除。还有一种情况,就是broker没有收到确认消息,则会回查本地事务的状态,看是commit,还是rollback。
具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。
具体来说,上面的2个步骤,被分解成3个步骤:
(1) 发送Prepared消息
(2) update DB
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。
可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?
(1) 执行业务逻辑的部分
package com.lynch.simple.demo; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; /** * 执行业务逻辑的部分 * * @author jqlin * */ public class TransactionExecuterImpl implements LocalTransactionExecuter { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("执行本地事务msg = " + new String(msg.getBody())); System.out.println("执行本地事务arg = " + arg); //DB操作 应该带上事务 service -> dao //如果数据操作失败 需要回滚 同时返回RocketMQ一条失败消息 意味着消费者无法消费到这条失败的消息 //如果成功 就要返回一条rocketMQ成功的消息,意味着消费者将读取到这条消息 //o就是attachment String tags = msg.getTags(); if (tags.equals("transaction2")) { System.out.println("===> 本地事务执行失败,进行MQ ROLLBACK"); return LocalTransactionState.ROLLBACK_MESSAGE; } System.out.println("===> 本地事务执行成功,发送确认消息"); // return LocalTransactionState.UNKNOW; return LocalTransactionState.COMMIT_MESSAGE; } }
(2) 处理事务回查的代码部分
package com.lynch.simple.demo; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * 处理事务回查的代码部分 * * @author jqlin * */ public class TransactionCheckListenerImpl implements TransactionCheckListener { @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString())); //由于RocketMQ迟迟没有收到消息的确认消息,因此主动询问这条prepare消息,是否正常? //可以查询数据库看这条数据是否已经处理 return LocalTransactionState.COMMIT_MESSAGE; } }
(3) 启动生产者
package com.lynch.simple.demo; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 构造事务消息的生产者 TransactionMQProducer producer = new TransactionMQProducer("transactionProducer"); producer.setNamesrvAddr("127.0.0.1:9876"); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); // 设置事务决断处理类 producer.setTransactionCheckListener(transactionCheckListener); producer.start(); // 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 1; i <= 2; i++) { try { String tags = "transaction" + i; String keys = "KEY" + i; byte[] body = ("Hello RocketMQ " + i).getBytes(); Message msg = new Message("topicTransaction", tags, keys, body); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); } catch (MQClientException e) { e.printStackTrace(); } } producer.shutdown(); } }
(4) 启动消费消息
package com.lynch.simple.demo; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class TransactionConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transactionConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicTransaction", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(msg + ",内容:" + new String(msg.getBody())); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("transaction_Consumer Started."); } }
重点来了:3.2.6之前的版本这样写就可以了,但是之后的版本被关于事务回查这个接口被阉割了,不会在进行事务回查操作。
那么向MQ发送消息如果失败的话,会造成A银行扣款成功而B银行收款未成功的数据不一致的情况
事务回查机制
由于开源版本的rocketMQ3.0.6之后的版本被阉割了事务会回查机制,所以这部分的实现需要自己来实现。