zoukankan      html  css  js  c++  java
  • RocketMQ解决分布式事务

    1.原理图:

     2.设计实现思路:

    1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 可以被消费;

    2.开始执行我们的本地事务,本地事务执行的结果(回滚或者提交)发送Broker;

    3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;

    4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果

    核心代码发送方

     1 @RestController
     2 public class ProducerController {
     3     @Autowired
     4     private OrderService orderService;
     5 
     6     @RequestMapping("/sendMsg")
     7     public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
     8         String orderId = orderService.sendOrder();
     9         return orderId;
    10 
    11     }
    12 }
     1 @Slf4j
     2 @Component
     3 @RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
     4 public class SyncProducerListener implements RocketMQLocalTransactionListener {
     5 
     6     @Autowired
     7     private TransationalUtils transationalUtils;
     8 
     9     @Autowired
    10     private OrderMapper orderMapper;
    11 
    12     @Override
    13     public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    14         TransactionStatus beginStatus = null;
    15         try {
    16             beginStatus = transationalUtils.begin();
    17             MessageHeaders headers = message.getHeaders();
    18             String objMsg = (String) headers.get("msg");
    19             if (StringUtils.isEmpty(objMsg)) {
    20                 return RocketMQLocalTransactionState.ROLLBACK;
    21             }
    22             OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
    23             int result = orderMapper.addOrder(orderEntity);
    24             if (result > 0) {
    25                 transationalUtils.commit(beginStatus);
    26             }
    27             log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o);
    28             return null;
    29         } catch (Exception e) {
    30             e.printStackTrace();
    31             log.error("【执行本地业务异常】 exception message:{}", e.getMessage());
    32             if (beginStatus != null) {
    33                 transationalUtils.rollback(beginStatus);
    34             }
    35             return RocketMQLocalTransactionState.ROLLBACK;
    36         }
    37     }
    38 
    39     @Override
    40     public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    41         log.info("【执行检查任务】");
    42         MessageHeaders headers = message.getHeaders();
    43         String objMsg = (String) headers.get("msg");
    44         if (StringUtils.isEmpty(objMsg)) {
    45             return RocketMQLocalTransactionState.UNKNOWN;
    46         }
    47         OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
    48         String orderId = orderEntity.getOrderId();
    49         OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
    50         if (orderDbEntity == null) {
    51             return RocketMQLocalTransactionState.UNKNOWN;
    52         }
    53         return RocketMQLocalTransactionState.COMMIT;
    54     }
    55 }

    消费者

     1 @Service
     2 @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic")
     3 public class OrdeConsumer implements RocketMQListener<String> {
     4     @Autowired
     5     private DispatchMapper dispatchMapper;
     6 
     7     @Override
     8     public void onMessage(String msg) {
     9         OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
    10         System.out.println(orderEntity.toString());
    11     }
    12 
    13 }

    手动事务

     1 @Service
     2 public class TransationalUtils {
     3     @Autowired
     4     public DataSourceTransactionManager transactionManager;
     5 
     6     public TransactionStatus begin() {
     7         TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
     8         return transaction;
     9     }
    10 
    11     public void commit(TransactionStatus transaction) {
    12         transactionManager.commit(transaction);
    13 
    14     }
    15 
    16     public void rollback(TransactionStatus transaction) {
    17         transactionManager.rollback(transaction);
    18     }
    19 
    20 }

    注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。

  • 相关阅读:
    RS485通信和Modbus协议(转)
    Modbus通讯错误检测方法
    Modbus消息帧
    Modbus通讯两种传输方式
    echarts自定义图例legend文字和样式
    host文件的作用
    webpack --watch和supervisor的不同
    vue中如何使用echarts
    Vue父子组件生命周期
    Git总结笔记
  • 原文地址:https://www.cnblogs.com/lmyupupblogs/p/12143386.html
Copyright © 2011-2022 走看看