zoukankan      html  css  js  c++  java
  • 记一次RabbitMQ解决分布式事务问题

    案例说明:

    经典案例,以目前流行点外卖的案例,用户下单后,调用订单服务,让后订单服务调用派单系统通知送外卖人员送单,这时候订单系统与派单系统采用MQ异步通讯,保证订单表和派单表必须一致!

    用传统的HTTP协议不能解决高并发:

    RabbitMQ解决分布式事务原理方案

    1. 确保生产者一定要将数据投递到MQ服务器中
      • 生产者采用confirm,确认应答机制
      • 如果失败,生产者进行重试。
    2. MQ消费者消息能够正常消费消息。
      • 采用手动ACK模式,使用补偿机制,注意幂等性问题。
    3. 采用补单机制。
      • 在创建一个补单消费者进行监听,如果订单创建后,又回滚了(数据不一致),此时需要将订单进行补偿。
      • 交换机采用路由键模式,补单队列和派但队列都绑定同一个路由键。

    代码

    派件表:

    create TABLE platoon(
      id INT PRIMARY KEY AUTO_INCREMENT,
      orderId VARCHAR(255),
      takeout_userId int
     
    )

    订单表:

    create TABLE order_info(
      id INT PRIMARY KEY AUTO_INCREMENT,
      name VARCHAR(30),  
      order_money INT,
      orderId VARCHAR(255)
      
    );

    生产者   

     1.实现接口 implements RabbitTemplate.ConfirmCallback

     2. 重写回调方法 ,成功、失败的调用  

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    send方法里面调用回调函数:

    this.rabbitTemplate.setMandatory(true);
    this.rabbitTemplate.setConfirmCallback(this);

    yml需要配置回调机制:

    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true

     注意:重试也是有一定次数限制的 如果超过一定次数 就需要进行人工补偿了

     下单:

    pom:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.mayikt</groupId>
        <artifactId>rabbitmq_order</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
        <dependencies>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.1.1</version>
            </dependency>
            <!-- mysql 依赖 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <!-- 阿里巴巴数据源 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.14</version>
            </dependency>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- springboot- 整个 lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
        </dependencies>
    </project>

    基础包:

    public interface ApiConstants {
        // 响应请求成功
        String HTTP_RES_CODE_200_VALUE = "success";
        // 系统错误
        String HTTP_RES_CODE_500_VALUE = "fial";
        // 响应请求成功code
        Integer HTTP_RES_CODE_200 = 200;
        // 系统错误
        Integer HTTP_RES_CODE_500 = 500;
        // 未关联QQ账号
        Integer HTTP_RES_CODE_201 = 201;
    
    }
    import org.springframework.stereotype.Component;
    
    @Component
    public class BaseApiService {
    
        public ResponseBase setResultError(Integer code, String msg) {
            return setResult(code, msg, null);
        }
    
        // 返回错误,可以传msg
        public ResponseBase setResultError(String msg) {
            return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null);
        }
    
        // 返回成功,可以传data值
        public ResponseBase setResultSuccess(Object data) {
            return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data);
        }
    
        // 返回成功,沒有data值
        public ResponseBase setResultSuccess() {
            return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null);
        }
    
        // 返回成功,沒有data值
        public ResponseBase setResultSuccess(String msg) {
            return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null);
        }
    
        // 通用封装
        public ResponseBase setResult(Integer code, String msg, Object data) {
            return new ResponseBase(code, msg, data);
        }
    
    }
    import lombok.Data;
    
    @Data
    public class ResponseBase {
    
        private Integer rtnCode;
        private String msg;
        private Object data;
    
        public ResponseBase() {
    
        }
    
        public ResponseBase(Integer rtnCode, String msg, Object data) {
            super();
            this.rtnCode = rtnCode;
            this.msg = msg;
            this.data = data;
        }
    
        @Override
        public String toString() {
            return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
        }
    
    }

    补偿队列:

    import lombok.Data;
    
    @Data
    public class ResponseBase {
    
        private Integer rtnCode;
        private String msg;
        private Object data;
    
        public ResponseBase() {
    
        }
    
        public ResponseBase(Integer rtnCode, String msg, Object data) {
            super();
            this.rtnCode = rtnCode;
            this.msg = msg;
            this.data = data;
        }
    
        @Override
        public String toString() {
            return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]";
        }
    
    }

    entity:

    import lombok.Data;
    
    @Data
    public class OrderEntity {
    
        private Long id;
        // 订单名称
        private String name;
        // 下单金额
        private Double orderMoney;
        // 订单id
        private String orderId;
    }

    mapper:

    import org.apache.ibatis.annotations.Insert;
    import org.apache.ibatis.annotations.Options;
    import org.apache.ibatis.annotations.Param;
    import org.apache.ibatis.annotations.Select;
    
    import com.mayikt.entity.OrderEntity;
    
    public interface OrderMapper {
    
        @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})")
        @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
        public int addOrder(OrderEntity orderEntity);
    
        @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};")
        public OrderEntity findOrderId(@Param("orderId") String orderId);
    
    }

    MQ的配置:

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitmqConfig {
    
        // 下单并且派单存队列
        public static final String ORDER_DIC_QUEUE = "order_dic_queue";
        // 补单队列,判断订单是否已经被创建
        public static final String ORDER_CREATE_QUEUE = "order_create_queue";
        // 下单并且派单交换机
        private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";
    
        // 1.定义订单队列
        @Bean
        public Queue directOrderDicQueue() {
            return new Queue(ORDER_DIC_QUEUE);
        }
    
        // 2.定义补订单队列
        @Bean
        public Queue directCreateOrderQueue() {
            return new Queue(ORDER_CREATE_QUEUE);
        }
    
        // 2.定义交换机
        @Bean
        DirectExchange directOrderExchange() {
            return new DirectExchange(ORDER_EXCHANGE_NAME);
        }
    
        // 3.订单队列与交换机绑定
        @Bean
        Binding bindingExchangeOrderDicQueue() {
            return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
        }
    
        // 3.补单队列与交换机绑定
        @Bean
        Binding bindingExchangeCreateOrder() {
            return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
        }
    
    }

    service:

    import java.util.UUID;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.alibaba.fastjson.JSONObject;
    import com.mayikt.base.BaseApiService;
    import com.mayikt.base.ResponseBase;
    import com.mayikt.entity.OrderEntity;
    import com.mayikt.mapper.OrderMapper;
    
    @Service
    public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback {
        @Autowired
        private OrderMapper orderMapper;
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public ResponseBase addOrderAndDispatch() {
            //先下单 订单表插入数据
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setName("黄焖鸡米饭");
            // 价格是300元
            orderEntity.setOrderMoney(300d);
            // 商品id
            String orderId = UUID.randomUUID().toString();
            orderEntity.setOrderId(orderId);
            // 1.先下单,创建订单 (往订单数据库中插入一条数据)
            int orderResult = orderMapper.addOrder(orderEntity);
            System.out.println("orderResult:" + orderResult);
            if (orderResult <= 0) {
                return setResultError("下单失败!");
            }
            // 2.订单表插插入完数据后 订单表发送 外卖小哥
            send(orderId);
        //    int i = 1/0;   //发生异常
            return setResultSuccess();
        }
    
        private void send(String orderId) {
            JSONObject jsonObect = new JSONObject();
            jsonObect.put("orderId", orderId);
            String msg = jsonObect.toJSONString();
            System.out.println("msg:" + msg);
            // 封装消息
            Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .setContentEncoding("utf-8").setMessageId(orderId).build();
            // 构建回调返回的数据
            CorrelationData correlationData = new CorrelationData(orderId);
            // 发送消息
            this.rabbitTemplate.setMandatory(true);
            this.rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);
    
        }
    
        // 生产消息确认机制 生产者往服务器端发送消息的时候 采用应答机制
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String orderId = correlationData.getId(); //id 都是相同的哦  全局ID
            System.out.println("消息id:" + correlationData.getId()); 
            if (ack) { //消息发送成功
                System.out.println("消息发送确认成功");
            } else {
                //重试机制
                send(orderId); 
                System.out.println("消息发送确认失败:" + cause);
            }
    
        }
    
    }

    controller:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.mayikt.base.BaseApiService;
    import com.mayikt.base.ResponseBase;
    import com.mayikt.service.OrderService;
    
    @RestController
    public class OrderController extends BaseApiService {
        @Autowired
        private OrderService orderService;
    
        @RequestMapping("/addOrder")
        public ResponseBase addOrder() {
            return orderService.addOrderAndDispatch();
        }
    
    }

    yml:

    spring:
      rabbitmq:
      ####连接地址
        host: 192.168.91.6
       ####端口号   
        port: 5672
       ####账号 
        username: admin
       ####密码  
        password: admin
       ### 地址
        virtual-host: /admin_toov5
        ###开启消息确认机制 confirms
        publisher-confirms: true
        publisher-returns: true 
      #数据库连接信息
      datasource: 
            name: test
            url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
            username: root
            password: root
            # 使用druid数据源
            type: com.alibaba.druid.pool.DruidDataSource
            driver-class-name: com.mysql.jdbc.Driver

    启动类:

    package com.mayikt;
    
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @MapperScan("com.mayikt.mapper")
    @SpringBootApplication
    public class AppOrder {
    
        public static void main(String[] args) {
            SpringApplication.run(AppOrder.class, args);
        }
    
    }

    派单:

     

    Entity:

    package com.mayikt.entity;
    
    import lombok.Data;
    
    @Data
    public class DispatchEntity {
    
        private Long id;
        // 订单号
        private String orderId;
        // 外卖员id
        private Long takeoutUserId;
        
    }

    Mapper:

    package com.mayikt.mapper;
    
    import org.apache.ibatis.annotations.Insert;
    
    import com.mayikt.entity.DispatchEntity;
    
    public interface DispatchMapper {
    
        /**
         * 新增派单任务
         */
        @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});")
        public int insertDistribute(DispatchEntity distributeEntity);
    
    }

    consumer:

     做一些路由器 队列 路由键的绑定 声明工作

    package com.mayikt.rabbitmq.consumer;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitmqConfig {
    
        // 下单并且派单存队列
        public static final String ORDER_DIC_QUEUE = "order_dic_queue";
        // 补单队列,判断订单是否已经被创建
        public static final String ORDER_CREATE_QUEUE = "order_create_queue";
        // 下单并且派单交换机
        private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";
    
        // 1.定义派单队列
        @Bean
        public Queue OrderDicQueue() {
            return new Queue(ORDER_DIC_QUEUE);
        }
    /*
        // 2.定义补订单队列
        @Bean
        public Queue directCreateOrderQueue() {
            return new Queue(ORDER_CREATE_QUEUE);
        }*/
    
        // 2.定义交换机
        @Bean
        DirectExchange directOrderExchange() {
            return new DirectExchange(ORDER_EXCHANGE_NAME);
        }
    
        // 3.订单队列与交换机绑定
        @Bean
        Binding bindingExchangeOrderDicQueue() {
            return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
        }
    
        /*// 3.补单队列与交换机绑定
        @Bean
        Binding bindingExchangeCreateOrder() {
            return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
        }*/
    
    }

    派件消费:

    package com.mayikt.rabbitmq.consumer;
    
    import java.util.Map;
    
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    import com.mayikt.entity.DispatchEntity;
    import com.mayikt.mapper.DispatchMapper;
    import com.rabbitmq.client.Channel;
    
    /**
     * 派单服务
     */
    @Component
    public class DispatchConsumer {
        @Autowired
        private DispatchMapper dispatchMapper;
    
        @RabbitListener(queues = "order_dic_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("派单服务平台" + msg + ",消息id:" + messageId);
            JSONObject jsonObject = JSONObject.parseObject(msg);
            String orderId = jsonObject.getString("orderId");
            if (StringUtils.isEmpty(orderId)) {
                // 日志记录
                return;
            }
            DispatchEntity dispatchEntity = new DispatchEntity();
            // 订单id
            dispatchEntity.setOrderId(orderId);
            // 外卖员id
            dispatchEntity.setTakeoutUserId(12l);
            
            try {
                int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity);
                if (insertDistribute > 0) {
                    // 手动签收消息,通知mq服务器端删除该消息
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            } catch (Exception e) {
                e.printStackTrace();
                // // 丢弃该消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    
    }

    启动类:

    package com.mayikt;
    
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @MapperScan("com.mayikt.mapper")
    @SpringBootApplication
    public class AppDispatch {
    
        public static void main(String[] args) {
            SpringApplication.run(AppDispatch.class, args);
        }
    
    }

    yml:

    spring:
      rabbitmq:
      ####连接地址
        host: 192.168.91.6
       ####端口号   
        port: 5672
       ####账号 
        username: admin
       ####密码  
        password: admin
       ### 地址
        virtual-host: /admin_toov5
        listener:
          simple:
            retry:
            ####开启消费者(程序出现异常的情况下会)进行重试
              enabled: true
             ####最大重试次数
              max-attempts: 5
            ####重试间隔次数
              initial-interval: 3000
            ####开启手动ack  
            acknowledge-mode: manual
             
      #数据库连接信息
      datasource: 
            name: test
            url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
            username: root
            password: root
            # 使用druid数据源
            type: com.alibaba.druid.pool.DruidDataSource
            driver-class-name: com.mysql.jdbc.Driver
    server:
      port: 8081

    pom:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.itmayiedu</groupId>
        <artifactId>rabbitmq_stock</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.1.RELEASE</version>
        </parent>
        <dependencies>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.1.1</version>
            </dependency>
            <!-- mysql 依赖 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <!-- 阿里巴巴数据源 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.14</version>
            </dependency>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- springboot- 整个 lombok -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
        </dependencies>
    </project>
  • 相关阅读:
    Redis Cluster笔记
    http协议之 COOKIE
    lor框架代码分析
    PHP--SPL扩展学习笔记
    lua协程----ngx-lua线程学习笔记
    设计模式
    eclipse中使用maven创建springMVC项目
    一般处理程序里使用session对象
    .Dot NET Cored简介
    Spring 依赖注入(基本注入和自动适配注入)
  • 原文地址:https://www.cnblogs.com/toov5/p/10289999.html
Copyright © 2011-2022 走看看