分布式事务常见解决方案
- 2PC:两阶段提交, 基于XA协议
- TCC:Try、Confirm、Cancel
- 等等
框架
- GTS -> 开源 Fescar。地址:https://github.com/alibaba/fescar
- LCN -> 地址:https://github.com/codingapi/tx-lcn
RokcetMQ 分布式事务消息的总体架构
RocketMQ 事务消息:
- RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。
半消息 Half Message:
- 暂不能投递的消息(暂不能消费),Producer 已经将消息成功发送到了 Broker 端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
消息回查:
- 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
整体交互流程
- Producer向broker端发送消息。
- 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作
RocketMQ 事务消息的状态
- COMMIT_MESSAGE:提交事务消息,消费者可以消费此消息
- ROLLBACK_MESSAGE:回滚事务消息,消息会在 broker 中删除,消费者不能消费
- UNKNOW:Broker需要回查确认消息的状态
关于事务消息的消费
事务消息 consumer 端的消费方式和普通消息是一样的,RocketMQ 能保证消息能被 consumer 收到(消息重试等机制,最后也存在 consumer 消费失败的情况,这种情况出现的概率极低)。
RocketMQ 分布式事务消息实战
项目结构如下图所示:
PayController 类代码如下:
@RestController public class PayController { @Autowired private TransactionProducer transactionMQProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String tag, String otherParam) throws Exception { Message message = new Message(JmsConfig.TOPIC, tag, tag + "_key", tag.getBytes()); SendResult sendResult = transactionMQProducer.getProducer(). sendMessageInTransaction(message, otherParam); System.out.printf("发送结果=%s, sendResult=%s ", sendResult.getSendStatus(), sendResult.toString()); return new HashMap<>(); } }
JmsConfig 类代码如下:
public class JmsConfig { public static final String NAME_SERVER = "192.168.159.133:9876;192.168.159.130:9876"; // public static final String NAME_SERVER = "192.168.159.130:9876"; public static final String TOPIC = "xdclass_pay_test_topic_0"; public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly"; }
PayConsumer 类代码如下:
@Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.TOPIC, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); String key = msg.getKeys(); try { System.out.printf("%s 2 Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
PayProducer 类代码如下:
@Component public class PayProducer { private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); //生产者投递消息重试次数 producer.setRetryTimesWhenSendFailed(3); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer() { return this.producer; } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { this.producer.shutdown(); } }
TransactionProducer 和 TransactionListenerImpl 类代码如下:
@Component public class TransactionProducer { private String producerGroup = "trac_producer_group"; //事务监听器 private TransactionListener transactionListener = new TransactionListenerImpl(); private TransactionMQProducer producer = null; //一般自定义线程池的时候,需要给线程加个名称 private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); public TransactionProducer() { producer = new TransactionMQProducer(producerGroup); producer.setNamesrvAddr(JmsConfig.NAME_SERVER); producer.setTransactionListener(transactionListener); producer.setExecutorService(executorService); //指定NameServer地址,多个地址以 ; 隔开 start(); } public TransactionMQProducer getProducer() { return this.producer; } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown() { this.producer.shutdown(); } } class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("====executeLocalTransaction======="); String body = new String(msg.getBody()); String key = msg.getKeys(); String transactionId = msg.getTransactionId(); System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body); // 执行本地事务begin TODO // 执行本地事务end TODO int status = Integer.parseInt(arg.toString()); //二次确认消息,然后消费者可以消费 if (status == 1) { return LocalTransactionState.COMMIT_MESSAGE; } //回滚消息,broker端会删除半消息 if (status == 2) { return LocalTransactionState.ROLLBACK_MESSAGE; } //broker端会进行回查消息,再或者什么都不响应 if (status == 3) { return LocalTransactionState.UNKNOW; } return null; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("====checkLocalTransaction======="); String body = new String(msg.getBody()); String key = msg.getKeys(); String transactionId = msg.getTransactionId(); System.out.println("transactionId=" + transactionId + ", key=" + key + ", body=" + body); //要么commit 要么rollback //可以根据key去检查本地事务消息是否完成 return LocalTransactionState.COMMIT_MESSAGE; } }
TransactionListener使用
- executeLocalTransaction:执行本地事务
- checkLocalTransaction:回查消息,要么 commit 要么rollback,reconsumeTimes 不生效
注意点:TransactionMQProducer 的 groupName 要唯一,不能和普通的 producer 一样