zoukankan      html  css  js  c++  java
  • SpringBoot+RabbitMQ实现消息可靠性投递

    摘抄自简书:https://www.jianshu.com/p/9feddd4af8ee

    RabbitMQ是目前主流的消息中间件,非常适用于高并发环境。各大互联网公司都在使用的MQ技术,晋级技术骨干、团队核心的必备技术!

    谈到消息的可靠性投递,无法避免的,在实际的工作中会经常碰到,比如一些核心业务需要保障消息不丢失,接下来我们看一个可靠性投递的流程图,说明可靠性投递的概念:


     
     
    • Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)
    • Step 2:发送消息到MQ Broker节点(采用confirm方式发送,会有异步的返回结果)
    • Step 3、4:生产者端接受MQ Broker节点返回的Confirm确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!
    • Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)
    • Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败
    • Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。
      接下来,我们使用SpringBoot2.x实现这一可靠性投递策略:

    废话不多说,直接上代码:

    数据库库表结构:订单表和消息记录表

    -- 表 order 订单结构
    CREATE TABLE IF NOT EXISTS `t_order` (
      `id` varchar(128) NOT NULL, -- 订单ID
      `name` varchar(128), -- 订单名称 其他业务熟悉忽略
      `message_id` varchar(128) NOT NULL, -- 消息唯一ID
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- 表 broker_message_log 消息记录结构
    CREATE TABLE IF NOT EXISTS `broker_message_log` (
      `message_id` varchar(128) NOT NULL, -- 消息唯一ID
      `message` varchar(4000) DEFAULT NULL, -- 消息内容
      `try_count` int(4) DEFAULT '0', -- 重试次数
      `status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败
      `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  -- 下一次重试时间 或 超时时间
      `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 创建时间
      `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新时间
      PRIMARY KEY (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    整合SpringBoot实现生产端代码如下:

    修改 pom.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
     
        <groupId>com.bfxy</groupId>
        <artifactId>rabbitmq-springboot-producer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
     
        <name>rabbitmq-springboot-producer</name>
        <description>rabbitmq-springboot-producer</description>
     
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
     
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
     
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency> 
     
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
     
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>        
     
            <!-- 添加JDBC jar --> 
            <dependency>
              <groupId>org.mybatis.spring.boot</groupId>
              <artifactId>mybatis-spring-boot-starter</artifactId>
              <version>1.1.1</version>
            </dependency>
            <dependency>
              <groupId>tk.mybatis</groupId>
              <artifactId>mapper-spring-boot-starter</artifactId>
              <version>1.1.0</version>
            </dependency>    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.24</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <!-- mybatis分页插件 -->
            <dependency>  
                <groupId>com.github.miemiedev</groupId>  
                <artifactId>mybatis-paginator</artifactId>  
                <version>1.2.17</version>  
                <exclusions>
                    <exclusion>
                         <groupId>org.mybatis</groupId>
                        <artifactId>mybatis</artifactId>
                    </exclusion>
                </exclusions>            
            </dependency>                    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.4</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.1.26</version>
            </dependency>    
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>javax.servlet-api</artifactId>
                <scope>provided</scope>    
            </dependency>  
                    <dependency>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                        <version>1.2.17</version>
                    </dependency>              
        </dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
     
     
    </project>
    

    application.properties配置:

    spring.rabbitmq.addresses=192.168.11.76:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
     
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
     
    server.servlet.context-path=/
    server.port=8001
     
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL
     
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.username=root
    spring.datasource.password=root
     
    mybatis.type-aliases-package=com.bfxy.springboot
    mybatis.mapper-locations=classpath:com/bfxy/springboot/mapping/*.xml
     
    logging.level.tk.mybatis=TRACE
    

    数据源druid.properties配置

    ##下面为连接池的补充设置,应用到上面所有数据源中
    #初始化大小,最小,最大
    druid.initialSize=5
    druid.minIdle=10
    druid.maxActive=300
    #配置获取连接等待超时的时间
    druid.maxWait=60000
    #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 
    druid.timeBetweenEvictionRunsMillis=60000
    #配置一个连接在池中最小生存的时间,单位是毫秒
    druid.minEvictableIdleTimeMillis=300000
    druid.validationQuery=SELECT 1 FROM DUAL
    druid.testWhileIdle=true
    druid.testOnBorrow=false
    druid.testOnReturn=false
    #打开PSCache,并且指定每个连接上PSCache的大小
    druid.poolPreparedStatements=true
    druid.maxPoolPreparedStatementPerConnectionSize=20
    #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 
    druid.filters=stat,wall,log4j
    #通过connectProperties属性来打开mergeSql功能;慢SQL记录
    druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    #合并多个DruidDataSource的监控数据
    druid.useGlobalDataSourceStat=true
    

    Entity

    Order

    package com.bfxy.springboot.entity;
     
    import java.io.Serializable;
     
    public class Order implements Serializable {
     
        private static final long serialVersionUID = 9111357402963030257L;
     
        private String id;
     
        private String name;
     
        private String messageId;
     
        public String getId() {
            return id;
        }
     
        public void setId(String id) {
            this.id = id == null ? null : id.trim();
        }
     
        public String getName() {
            return name;
        }
     
        public void setName(String name) {
            this.name = name == null ? null : name.trim();
        }
     
        public String getMessageId() {
            return messageId;
        }
     
        public void setMessageId(String messageId) {
            this.messageId = messageId == null ? null : messageId.trim();
        }
    }
    

    BrokerMessageLog

    package com.bfxy.springboot.entity;
     
    import java.util.Date;
     
    public class BrokerMessageLog {
        private String messageId;
     
        private String message;
     
        private Integer tryCount;
     
        private String status;
     
        private Date nextRetry;
     
        private Date createTime;
     
        private Date updateTime;
     
        public String getMessageId() {
            return messageId;
        }
     
        public void setMessageId(String messageId) {
            this.messageId = messageId == null ? null : messageId.trim();
        }
     
        public String getMessage() {
            return message;
        }
     
        public void setMessage(String message) {
            this.message = message == null ? null : message.trim();
        }
     
        public Integer getTryCount() {
            return tryCount;
        }
     
        public void setTryCount(Integer tryCount) {
            this.tryCount = tryCount;
        }
     
        public String getStatus() {
            return status;
        }
     
        public void setStatus(String status) {
            this.status = status == null ? null : status.trim();
        }
     
        public Date getNextRetry() {
            return nextRetry;
        }
     
        public void setNextRetry(Date nextRetry) {
            this.nextRetry = nextRetry;
        }
     
        public Date getCreateTime() {
            return createTime;
        }
     
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
     
        public Date getUpdateTime() {
            return updateTime;
        }
     
        public void setUpdateTime(Date updateTime) {
            this.updateTime = updateTime;
        }
    }
    

    实体类可以使用Lombok插件进行修改

    数据库连接池代码

    package com.bfxy.springboot.config.database;
     
     
    import java.sql.SQLException;
     
    import javax.sql.DataSource;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.annotation.EnableTransactionManagement;
     
    import com.alibaba.druid.pool.DruidDataSource; 
     
    @Configuration
    @EnableTransactionManagement
    public class DruidDataSourceConfig {
         
        private static Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.class);
         
        @Autowired
        private DruidDataSourceSettings druidSettings;
         
        public static String DRIVER_CLASSNAME ;
         
        @Bean
        public static PropertySourcesPlaceholderConfigurer propertyConfigure(){
            return new PropertySourcesPlaceholderConfigurer();
        }    
         
        @Bean
        public DataSource dataSource() throws SQLException {
            DruidDataSource ds = new DruidDataSource();
            ds.setDriverClassName(druidSettings.getDriverClassName());
            DRIVER_CLASSNAME = druidSettings.getDriverClassName();
            ds.setUrl(druidSettings.getUrl());
            ds.setUsername(druidSettings.getUsername());
            ds.setPassword(druidSettings.getPassword());
            ds.setInitialSize(druidSettings.getInitialSize());
            ds.setMinIdle(druidSettings.getMinIdle());
            ds.setMaxActive(druidSettings.getMaxActive());
            ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
            ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
            ds.setValidationQuery(druidSettings.getValidationQuery());
            ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
            ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
            ds.setTestOnReturn(druidSettings.isTestOnReturn());
            ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
            ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
            ds.setFilters(druidSettings.getFilters());
            ds.setConnectionProperties(druidSettings.getConnectionProperties());
            logger.info(" druid datasource config : {} ", ds);
            return ds;
        }
     
        @Bean
        public PlatformTransactionManager transactionManager() throws Exception {
            DataSourceTransactionManager txManager = new DataSourceTransactionManager();
            txManager.setDataSource(dataSource());
            return txManager;
        }
         
    }
    

    继续

    package com.bfxy.springboot.config.database;
     
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
    import org.springframework.stereotype.Component;
     
    @Component
    @ConfigurationProperties(prefix="spring.datasource") 
    @PropertySource("classpath:druid.properties")
    public class DruidDataSourceSettings {
     
        private String driverClassName;
        private String url;
        private String username;
        private String password;
         
        @Value("${druid.initialSize}")
        private int initialSize;
         
        @Value("${druid.minIdle}")
        private int minIdle;
         
        @Value("${druid.maxActive}")
        private int maxActive;
         
        @Value("${druid.timeBetweenEvictionRunsMillis}")
        private long timeBetweenEvictionRunsMillis;
         
        @Value("${druid.minEvictableIdleTimeMillis}")
        private long minEvictableIdleTimeMillis;
         
        @Value("${druid.validationQuery}")
        private String validationQuery;
         
        @Value("${druid.testWhileIdle}")
        private boolean testWhileIdle;
         
        @Value("${druid.testOnBorrow}")
        private boolean testOnBorrow;
         
        @Value("${druid.testOnReturn}")
        private boolean testOnReturn;
         
        @Value("${druid.poolPreparedStatements}")
        private boolean poolPreparedStatements;
         
        @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
        private int maxPoolPreparedStatementPerConnectionSize;
         
        @Value("${druid.filters}")
        private String filters;
         
        @Value("${druid.connectionProperties}")
        private String connectionProperties;
         
        @Bean
        public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
            return new PropertySourcesPlaceholderConfigurer();
        }
         
        public String getDriverClassName() {
            return driverClassName;
        }
        public void setDriverClassName(String driverClassName) {
            this.driverClassName = driverClassName;
        }
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public int getInitialSize() {
            return initialSize;
        }
        public void setInitialSize(int initialSize) {
            this.initialSize = initialSize;
        }
        public int getMinIdle() {
            return minIdle;
        }
        public void setMinIdle(int minIdle) {
            this.minIdle = minIdle;
        }
        public int getMaxActive() {
            return maxActive;
        }
        public void setMaxActive(int maxActive) {
            this.maxActive = maxActive;
        }
        public long getTimeBetweenEvictionRunsMillis() {
            return timeBetweenEvictionRunsMillis;
        }
        public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
            this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
        }
        public long getMinEvictableIdleTimeMillis() {
            return minEvictableIdleTimeMillis;
        }
        public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
            this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
        }
        public String getValidationQuery() {
            return validationQuery;
        }
        public void setValidationQuery(String validationQuery) {
            this.validationQuery = validationQuery;
        }
        public boolean isTestWhileIdle() {
            return testWhileIdle;
        }
        public void setTestWhileIdle(boolean testWhileIdle) {
            this.testWhileIdle = testWhileIdle;
        }
        public boolean isTestOnBorrow() {
            return testOnBorrow;
        }
        public void setTestOnBorrow(boolean testOnBorrow) {
            this.testOnBorrow = testOnBorrow;
        }
        public boolean isTestOnReturn() {
            return testOnReturn;
        }
        public void setTestOnReturn(boolean testOnReturn) {
            this.testOnReturn = testOnReturn;
        }
        public boolean isPoolPreparedStatements() {
            return poolPreparedStatements;
        }
        public void setPoolPreparedStatements(boolean poolPreparedStatements) {
            this.poolPreparedStatements = poolPreparedStatements;
        }
        public int getMaxPoolPreparedStatementPerConnectionSize() {
            return maxPoolPreparedStatementPerConnectionSize;
        }
        public void setMaxPoolPreparedStatementPerConnectionSize(
                int maxPoolPreparedStatementPerConnectionSize) {
            this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
        }
        public String getFilters() {
            return filters;
        }
        public void setFilters(String filters) {
            this.filters = filters;
        }
        public String getConnectionProperties() {
            return connectionProperties;
        }
        public void setConnectionProperties(String connectionProperties) {
            this.connectionProperties = connectionProperties;
        }
         
    }
    

    定时任务配置代码:

    package com.bfxy.springboot.config.task;
     
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
     
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
     
    @Configuration
    @EnableScheduling
    public class TaskSchedulerConfig implements SchedulingConfigurer {
     
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.setScheduler(taskScheduler());
        }
         
        @Bean(destroyMethod="shutdown")
        public Executor taskScheduler(){
            return Executors.newScheduledThreadPool(100);
        }
     
    }
    
    

    常量类

    package com.bfxy.springboot.constant;
     
    public final class Constants {
     
        public static final String ORDER_SENDING = "0"; //发送中
         
        public static final String ORDER_SEND_SUCCESS = "1"; //成功
         
        public static final String ORDER_SEND_FAILURE = "2"; //失败
         
        public static final int ORDER_TIMEOUT = 1; /*分钟超时单位:min*/
    }
    
    

    消息记录表核心业务:

    package com.bfxy.springboot.mapper;
     
    import java.util.Date;
     
    import org.apache.ibatis.annotations.Param;
     
    import com.bfxy.springboot.entity.BrokerMessageLog;
    import com.sun.tools.javac.util.List;
     
    public interface BrokerMessageLogMapper {
        /**
         * 查询消息状态为0(发送中) 且已经超时的消息集合
         * @return
         */
        List<BrokerMessageLog> query4StatusAndTimeoutMessage();
         
        /**
         * 重新发送统计count发送次数 +1
         * @param messageId
         * @param updateTime
         */
        void update4ReSend(

    对应的SQL代码:

     <select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap">
              <![CDATA[  
              select message_id, message, try_count, status, next_retry, create_time, update_time
                  from broker_message_log bml 
                  where status = '0'
                  and next_retry <= sysdate() 
              ]]> 
        </select>
       
      <update id="update4ReSend" >
        update broker_message_log bml
        set bml.try_count = bml.try_count + 1,
          bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}
      </update>
       
      <update id="changeBrokerMessageLogStatus" >
        update broker_message_log bml
        set bml.status = #{status,jdbcType=VARCHAR},
              bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}  
      </update>
    
    

    核心发送代码:orderService

    package com.bfxy.springboot.service;
     
    import java.util.Date;
     
    import org.apache.commons.lang3.time.DateUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
     
    import com.bfxy.springboot.constant.Constants;
    import com.bfxy.springboot.entity.BrokerMessageLog;
    import com.bfxy.springboot.entity.Order;
    import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
    import com.bfxy.springboot.mapper.OrderMapper;
    import com.bfxy.springboot.producer.RabbitOrderSender;
    import com.bfxy.springboot.utils.FastJsonConvertUtil;
     
    @Service
    public class OrderService {
     
        @Autowired
        private OrderMapper orderMapper;
         
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
         
        @Autowired
        private RabbitOrderSender rabbitOrderSender;
         
        public void createOrder(Order order) throws Exception {
            // 使用当前时间当做订单创建时间(为了模拟一下简化)
            Date orderTime = new Date();
            // 插入业务数据
            orderMapper.insert(order);
            // 插入消息记录表数据
            BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
            // 消息唯一ID
            brokerMessageLog.setMessageId(order.getMessageId());
            // 保存消息整体 转为JSON 格式存储入库
            brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));
             // 设置消息状态为0 表示发送中
            brokerMessageLog.setStatus("0");
             // 设置消息未确认超时时间窗口为 一分钟 
            brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT));
            brokerMessageLog.setCreateTime(new Date());
            brokerMessageLog.setUpdateTime(new Date());
            brokerMessageLogMapper.insert(brokerMessageLog);
            // 发送消息
            rabbitOrderSender.sendOrder(order);
        }
         
    }
    MQ消息发送核心代码:
    
    package com.bfxy.springboot.producer;
     
    import java.util.Date;
     
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    import com.bfxy.springboot.constant.Constants;
    import com.bfxy.springboot.entity.Order;
    import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
    import com.bfxy.springboot.mapper.OrderMapper;
     
    @Component
    public class RabbitOrderSender {
     
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;  
         
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
         
        //回调函数: confirm确认
        final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationData: " + correlationData);
                String messageId = correlationData.getId();
                if(ack){
                    //如果confirm返回成功 则进行更新
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
                } else {
                    //失败则进行具体的后续操作:重试 或者补偿等手段
                    System.err.println("异常处理...");
                }
            }
        };
         
        //发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(confirmCallback);
            //消息唯一ID
            CorrelationData correlationData = new CorrelationData(order.getMessage_id());
            rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData);
        }
         
    }
    

    消息重试、最大努力尝试策略(定时任务):

    package com.bfxy.springboot.task;
     
    import java.util.Date;
    import java.util.List;
     
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
     
    import com.bfxy.springboot.constant.Constants;
    import com.bfxy.springboot.entity.BrokerMessageLog;
    import com.bfxy.springboot.entity.Order;
    import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
    import com.bfxy.springboot.producer.RabbitOrderSender;
    import com.bfxy.springboot.utils.FastJsonConvertUtil;
     
    @Component
    public class RetryMessageTasker {
     
         
        @Autowired
        private RabbitOrderSender rabbitOrderSender;
         
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
         
        @Scheduled(initialDelay = 5000, fixedDelay = 10000)
        public void reSend(){
            //pull status = 0 and timeout message 
            List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
            list.forEach(messageLog -> {
                if(messageLog.getTryCount() >= 3){
                    //update fail message 
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constants.ORDER_SEND_FAILURE, new Date());
                } else {
                    // resend 
                    brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());
                    Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);
                    try {
                        rabbitOrderSender.sendOrder(reSendOrder);
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.err.println("-----------异常处理-----------");
                    }
                }            
            });
        }
    }
    
    

    测试订单发送

     
     

    代码如下

     @Autowired
        private RabbitOrderSender rabbitOrderSender;
         
        @Test
        public void testSender2() throws Exception {
             Order order = new Order();
             order.setId("2018080400000001");
             order.setName("测试订单");
             order.setMessage_id(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
             rabbitOrderSender.sendOrder(order);
        }
    
    

    监控台查看消息

     
     

     

    发送成功! 现在测试 发送订单并且入库(业务库和消息记录库)

     @Autowired
        private OrderService orderService;
         
        @Test
        public void testCreateOrder() throws Exception {
             Order order = new Order();
             order.setId("2018080400000002");
             order.setName("测试创建订单");
             order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
            orderService.createOrder(order);
        }
    
    
    • 发送成功 并且入库OK:业务表 和 消息记录表均有数据 且status状态=1 为成功!

    • 业务表:


       
       
    • 消息记录表


       
       
    • 测试失败情况:修改路由KEY为 无法路由即可!


       
       
    • 这样消息就算失败的情况了。然后ACK的时候就会走异常处理,消息记录表如下:


       
       
    • 最后我们测试重试策略:直接启动生产者应用,开启定时任务,重试几次后,库表信息变化如下:


       
       
    • 最终重试3次 失败结果更新 status = 2

     
     
    1人点赞
     
    JaveEE
     
     


    作者:__method__
    链接:https://www.jianshu.com/p/9feddd4af8ee
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
    如果有来生,要做一片树叶。 春天恋上枝,炎夏恋上水。 深秋恋上土,东来化作泥。 润物细无声,生生世世恋红尘。
  • 相关阅读:
    哈夫曼树
    MongoDB
    Node.js 搭建Web
    Node.js
    HDFS编程
    scp
    MapRecude
    级数
    (转)MySQL百万级数据库优化
    ssh
  • 原文地址:https://www.cnblogs.com/shujiying/p/12697577.html
Copyright © 2011-2022 走看看