zoukankan      html  css  js  c++  java
  • SpringBoot分布式事务RocketMQ

    原文链接:https://blog.csdn.net/Timeguys/article/details/107949660

    一、使用:

    一、引入依赖:

    1.  
      <dependency>
    2.  
      <groupId>org.apache.rocketmq</groupId>
    3.  
      <artifactId>rocketmq-spring-boot-starter</artifactId>
    4.  
      <version>2.0.3</version>
    5.  
      </dependency>

    二、举例:生产者创建订单---->生产者发送消息----->MQ服务接受消息----->消费者监听消息并减库存

    【生产者】:

    application.yml

    1.  
      rocketmq:
    2.  
      name-server: 192.168.85.128:9876 # rocketMQ地址
    3.  
      producer:
    4.  
      group: producer-group-test # 生产者的组名需要和消费者监听consumerGroup一致

    业务代码:

    1.  
      @Service
    2.  
      public class OrderServiceImpl extends ServiceImpl<OrderMapper, TbOrder> implements OrderService {
    3.  
       
    4.  
      @Resource
    5.  
      private RocketMQTemplate rocketMQTemplate;
    6.  
       
    7.  
      @Override
    8.  
      public void create() {
    9.  
      //创建订单--> 发送消息 --> 消息发送成功后调用本地事务提交 -->
    10.  
      TbOrder order = new TbOrder();
    11.  
      order.setCount(10);
    12.  
      order.setMoney(BigDecimal.valueOf(10));
    13.  
      order.setProductId(1L);
    14.  
      order.setStatus(1);
    15.  
      order.setUserId(1L);
    16.  
      sendMsg(order);
    17.  
      }
    18.  
      @Override
    19.  
      public void sendMsg(TbOrder order){
    20.  
      /**
    21.  
      * String txProducerGroup, 生产者分组
    22.  
      * String destination, topic
    23.  
      * Message<?> message, 消息
    24.  
      * Object arg 消息参数
    25.  
      */
    26.  
      Message<String> build = MessageBuilder.withPayload(JSONObject.toJSONString(order)).build();
    27.  
      rocketMQTemplate.sendMessageInTransaction("tx-producer-group","txmsg-topic",build , null);
    28.  
      }
    29.  
      }

    创建  ProducerTxmsgListener 并实现 RocketMQLocalTransactionListener:

    1.  
      @Component
    2.  
      // txProducerGroup 的值和发送事务消息指定的 txProducerGroup 相同
    3.  
      @RocketMQTransactionListener(txProducerGroup = "txmsg-producer-group")
    4.  
      public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
    5.  
      @Resource
    6.  
      private OrderService orderService;
    7.  
       
    8.  
      /**
    9.  
      * @Description: 执行本地事务提交
    10.  
      */
    11.  
      @Override
    12.  
      @Transactional
    13.  
      public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    14.  
      try {
    15.  
      TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
    16.  
      System.out.println(tbOrder);
    17.  
      orderService.save(tbOrder);
    18.  
      return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
    19.  
      }catch (Exception e){
    20.  
      return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行异常,将消息遗弃
    21.  
      }
    22.  
      }
    23.  
       
    24.  
      /**
    25.  
      * @Description: 检查本地事务是否执行成功
    26.  
      */
    27.  
      @Override
    28.  
      public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    29.  
       
    30.  
      TbOrder tbOrder = JSONObject.parseObject(message.getPayload().toString(), TbOrder.class);
    31.  
      TbOrder order = orderService.getById(tbOrder.getId());
    32.  
      // 不为null 则表示执行成功
    33.  
      if (order != null){
    34.  
      return RocketMQLocalTransactionState.COMMIT; //变更消息状态为:可消费
    35.  
      }
    36.  
      // 执行本地事务发生问题或还没执行完成, UNKNOWN 表示会继续回查
    37.  
      return RocketMQLocalTransactionState.UNKNOWN;
    38.  
      }
    39.  
      }

    【消费者】:

    application.yml

    1.  
      rocketmq:
    2.  
      name-server: 127.0.0.1:9876 # rocketMQ地址
    3.  
      producer:
    4.  
      group: producer-test-group # 生产者的组名需要和消费者监听consumerGroup一致

    创建MyListener 并实现 RocketMQListener 接口:

    1.  
      // topic 对应生产者发消息是的topic
    2.  
      @RocketMQMessageListener(topic = "test-topic" , consumerGroup = "consumer-group")
    3.  
      public class MyListener implements RocketMQListener<String> {
    4.  
       
    5.  
      @Override
    6.  
      public void onMessage(String message) {
    7.  
      //执行 减库存业务 如果发生异常,则消息会隔段时间再次消费
    8.  
      System.out.println(message);
    9.  
      }
    10.  
      }

    原理图:

  • 相关阅读:
    ffmpeg rtmp推流 视频转码
    java日志发展史 log4j slf4j log4j2 jul jcl 日志和各种桥接包的关系
    nginx stream 流转发,可以转发rtmp、mysql访问流,转发rtmp、jdbc请求
    java web http 转https 通过nginx代理访问
    linux 服务器磁盘挂载
    novnc 通过websockify代理 配置多点访问
    linux 文件服务 minio 安装部署配置
    AOP实现原理,手写aop
    java 泛型
    JAVA反射getGenericSuperclass()用法
  • 原文地址:https://www.cnblogs.com/fswhq/p/13853532.html
Copyright © 2011-2022 走看看