1.原理图:
2.设计实现思路:
1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker;
3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
核心代码发送方
1 @RestController 2 public class ProducerController { 3 @Autowired 4 private OrderService orderService; 5 6 @RequestMapping("/sendMsg") 7 public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException { 8 String orderId = orderService.sendOrder(); 9 return orderId; 10 11 } 12 }
1 @Slf4j 2 @Component 3 @RocketMQTransactionListener(txProducerGroup = "mayiktProducer") 4 public class SyncProducerListener implements RocketMQLocalTransactionListener { 5 6 @Autowired 7 private TransationalUtils transationalUtils; 8 9 @Autowired 10 private OrderMapper orderMapper; 11 12 @Override 13 public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { 14 TransactionStatus beginStatus = null; 15 try { 16 beginStatus = transationalUtils.begin(); 17 MessageHeaders headers = message.getHeaders(); 18 String objMsg = (String) headers.get("msg"); 19 if (StringUtils.isEmpty(objMsg)) { 20 return RocketMQLocalTransactionState.ROLLBACK; 21 } 22 OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); 23 int result = orderMapper.addOrder(orderEntity); 24 if (result > 0) { 25 transationalUtils.commit(beginStatus); 26 } 27 log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o); 28 return null; 29 } catch (Exception e) { 30 e.printStackTrace(); 31 log.error("【执行本地业务异常】 exception message:{}", e.getMessage()); 32 if (beginStatus != null) { 33 transationalUtils.rollback(beginStatus); 34 } 35 return RocketMQLocalTransactionState.ROLLBACK; 36 } 37 } 38 39 @Override 40 public RocketMQLocalTransactionState checkLocalTransaction(Message message) { 41 log.info("【执行检查任务】"); 42 MessageHeaders headers = message.getHeaders(); 43 String objMsg = (String) headers.get("msg"); 44 if (StringUtils.isEmpty(objMsg)) { 45 return RocketMQLocalTransactionState.UNKNOWN; 46 } 47 OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); 48 String orderId = orderEntity.getOrderId(); 49 OrderEntity orderDbEntity = orderMapper.findOrderId(orderId); 50 if (orderDbEntity == null) { 51 return RocketMQLocalTransactionState.UNKNOWN; 52 } 53 return RocketMQLocalTransactionState.COMMIT; 54 } 55 }
消费者
1 @Service 2 @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic") 3 public class OrdeConsumer implements RocketMQListener<String> { 4 @Autowired 5 private DispatchMapper dispatchMapper; 6 7 @Override 8 public void onMessage(String msg) { 9 OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class); 10 System.out.println(orderEntity.toString()); 11 } 12 13 }
手动事务
1 @Service 2 public class TransationalUtils { 3 @Autowired 4 public DataSourceTransactionManager transactionManager; 5 6 public TransactionStatus begin() { 7 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute()); 8 return transaction; 9 } 10 11 public void commit(TransactionStatus transaction) { 12 transactionManager.commit(transaction); 13 14 } 15 16 public void rollback(TransactionStatus transaction) { 17 transactionManager.rollback(transaction); 18 } 19 20 }
注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。