zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ-消息可靠性投递

    本系列是学习SpringBoot整合RabbitMQ的练手,包含服务安装,RabbitMQ整合SpringBoot2.x,消息可靠性投递实现等三篇博客。

      学习路径:https://www.imooc.com/learn/1042 RabbitMQ消息中间件极速入门与实战 

      项目源码:https://github.com/ZbLeaning/Boot-RabbitMQ 


     

    设计一个消息可靠性投递方案,服务结构如下:

     组成:

      Sender+Confirm Listener :组成消息的生产者

      MQ Broker:消息的消费者,包含具体的MQ服务

      BIZ DB:业务数据数据库

      MSG DB:消息日志记录数据库(0:发送中、1:发送成功、2:发送失败)

    思路:

      以最常见的创建订单业务来举例,假设订单创建成功后需要去发短信通知用户

      1、先完成订单业务数据的存储,并记录这条操作日志(发送中)

      2、生产者发送一条消息到消费者(异步)

      3、消费者成功消费后给给Confirm listener发送应答

      4、监听收到消息确认成功后,对消息日志表操作,修改之前的日志状态(发送成功)

      5、在消费端返回应答的过程中,可能发生网络异常导致生产者未收到应答消息,因此需要一个定时任务去捞取状态是发送中并已经超时的消息集合

      6、将捞取到的日志对应的消息,进行重发

      7、定时任务判断设置的消息最大重投次数,大于最大重投次数就判断消息发送失败,更新日志记录状态(发送失败)


     项目搭建

      Durid数据源配置文件

    //druid.properties
    ##下面为连接池的补充设置,应用到上面所有数据源中
    #初始化大小,最小,最大
    druid.initialSize=5
    druid.minIdle=10
    druid.maxActive=300
    #配置获取连接等待超时的时间
    druid.maxWait=60000
    #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
    druid.timeBetweenEvictionRunsMillis=60000
    #配置一个连接在池中最小生存的时间,单位是毫秒
    druid.minEvictableIdleTimeMillis=300000
    druid.validationQuery=SELECT 1 FROM DUAL
    druid.testWhileIdle=true
    druid.testOnBorrow=false
    druid.testOnReturn=false
    #打开PSCache,并且指定每个连接上PSCache的大小
    druid.poolPreparedStatements=true
    druid.maxPoolPreparedStatementPerConnectionSize=20
    #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
    druid.filters=stat,wall,log4j
    #通过connectProperties属性来打开mergeSql功能;慢SQL记录
    druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
    #合并多个DruidDataSource的监控数据
    druid.useGlobalDataSourceStat=true

      添加相应的数据源配置类、定时任务配置类、常量类

    package com.imooc.mq.config.database;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
    import org.springframework.stereotype.Component;
    
    /**
     * @Title: DruidDataSourceSettings
     * @Description: Druid数据源读取配置
     * @date 2019/1/2214:31
     */
    @Component
    @ConfigurationProperties(prefix = "spring.datasource")
    @PropertySource("classpath:druid.properties")
    public class DruidDataSourceSettings {
        private String driverClassName;
        private String url;
        private String username;
        private String password;
    
        @Value("${druid.initialSize}")
        private int initialSize;
    
        @Value("${druid.minIdle}")
        private int minIdle;
    
        @Value("${druid.maxActive}")
        private int maxActive;
    
        @Value("${druid.timeBetweenEvictionRunsMillis}")
        private long timeBetweenEvictionRunsMillis;
    
        @Value("${druid.minEvictableIdleTimeMillis}")
        private long minEvictableIdleTimeMillis;
    
        @Value("${druid.validationQuery}")
        private String validationQuery;
    
        @Value("${druid.testWhileIdle}")
        private boolean testWhileIdle;
    
        @Value("${druid.testOnBorrow}")
        private boolean testOnBorrow;
    
        @Value("${druid.testOnReturn}")
        private boolean testOnReturn;
    
        @Value("${druid.poolPreparedStatements}")
        private boolean poolPreparedStatements;
    
        @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")
        private int maxPoolPreparedStatementPerConnectionSize;
    
        @Value("${druid.filters}")
        private String filters;
    
        @Value("${druid.connectionProperties}")
        private String connectionProperties;
    
        @Bean
        public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){
            return new PropertySourcesPlaceholderConfigurer();
        }
    
        public String getDriverClassName() {
            return driverClassName;
        }
        public void setDriverClassName(String driverClassName) {
            this.driverClassName = driverClassName;
        }
        public String getUrl() {
            return url;
        }
        public void setUrl(String url) {
            this.url = url;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public int getInitialSize() {
            return initialSize;
        }
        public void setInitialSize(int initialSize) {
            this.initialSize = initialSize;
        }
        public int getMinIdle() {
            return minIdle;
        }
        public void setMinIdle(int minIdle) {
            this.minIdle = minIdle;
        }
        public int getMaxActive() {
            return maxActive;
        }
        public void setMaxActive(int maxActive) {
            this.maxActive = maxActive;
        }
        public long getTimeBetweenEvictionRunsMillis() {
            return timeBetweenEvictionRunsMillis;
        }
        public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {
            this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
        }
        public long getMinEvictableIdleTimeMillis() {
            return minEvictableIdleTimeMillis;
        }
        public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {
            this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
        }
        public String getValidationQuery() {
            return validationQuery;
        }
        public void setValidationQuery(String validationQuery) {
            this.validationQuery = validationQuery;
        }
        public boolean isTestWhileIdle() {
            return testWhileIdle;
        }
        public void setTestWhileIdle(boolean testWhileIdle) {
            this.testWhileIdle = testWhileIdle;
        }
        public boolean isTestOnBorrow() {
            return testOnBorrow;
        }
        public void setTestOnBorrow(boolean testOnBorrow) {
            this.testOnBorrow = testOnBorrow;
        }
        public boolean isTestOnReturn() {
            return testOnReturn;
        }
        public void setTestOnReturn(boolean testOnReturn) {
            this.testOnReturn = testOnReturn;
        }
        public boolean isPoolPreparedStatements() {
            return poolPreparedStatements;
        }
        public void setPoolPreparedStatements(boolean poolPreparedStatements) {
            this.poolPreparedStatements = poolPreparedStatements;
        }
        public int getMaxPoolPreparedStatementPerConnectionSize() {
            return maxPoolPreparedStatementPerConnectionSize;
        }
        public void setMaxPoolPreparedStatementPerConnectionSize(
                int maxPoolPreparedStatementPerConnectionSize) {
            this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;
        }
        public String getFilters() {
            return filters;
        }
        public void setFilters(String filters) {
            this.filters = filters;
        }
        public String getConnectionProperties() {
            return connectionProperties;
        }
        public void setConnectionProperties(String connectionProperties) {
            this.connectionProperties = connectionProperties;
        }
    }
    package com.imooc.mq.config.database;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.annotation.EnableTransactionManagement;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    import com.alibaba.druid.pool.DruidDataSource;
    /**
     * @Title: DruidDataSourceConfig
     * @Description: Druid数据源初始化
     *
     * EnableTransactionManagement 开启事务
     * @date 2019/1/2214:35
     */
    
    @Configuration
    @EnableTransactionManagement
    public class DruidDataSourceConfig {
        private static Logger logger = LoggerFactory.getLogger(com.imooc.mq.config.database.DruidDataSourceConfig.class);
        //注入数据源配置信息
        @Autowired
        private DruidDataSourceSettings druidSettings;
    
        public static String DRIVER_CLASSNAME;
    
        @Bean
        public static PropertySourcesPlaceholderConfigurer propertyConfigure() {
            return new PropertySourcesPlaceholderConfigurer();
        }
    
        /**
         * 创建DataSource
         * @return
         * @throws SQLException
         */
        @Bean
        public DataSource dataSource() throws SQLException {
            DruidDataSource ds = new DruidDataSource();
            ds.setDriverClassName(druidSettings.getDriverClassName());
            DRIVER_CLASSNAME = druidSettings.getDriverClassName();
            ds.setUrl(druidSettings.getUrl());
            ds.setUsername(druidSettings.getUsername());
            ds.setPassword(druidSettings.getPassword());
            ds.setInitialSize(druidSettings.getInitialSize());
            ds.setMinIdle(druidSettings.getMinIdle());
            ds.setMaxActive(druidSettings.getMaxActive());
            ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
            ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
            ds.setValidationQuery(druidSettings.getValidationQuery());
            ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
            ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
            ds.setTestOnReturn(druidSettings.isTestOnReturn());
            ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
            ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
            ds.setFilters(druidSettings.getFilters());
            ds.setConnectionProperties(druidSettings.getConnectionProperties());
            logger.info(" druid datasource config : {} ", ds);
            return ds;
        }
    
        /**
         * 开启事务
         * @return
         * @throws Exception
         */
        @Bean
        public PlatformTransactionManager transactionManager() throws Exception {
            DataSourceTransactionManager txManager = new DataSourceTransactionManager();
            txManager.setDataSource(dataSource());
            return txManager;
        }
    }
    package com.imooc.mq.config.database;
    
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    import org.springframework.core.io.support.ResourcePatternResolver;
    
    import javax.sql.DataSource;
    
    /**
     * @Title: MybatisDataSourceConfig
     * @Description: 整合mybatis和Druid
     * @date 2019/1/2214:39
     */
    @Configuration
    public class MybatisDataSourceConfig {
        @Autowired
        private DataSource dataSource;
    
        @Bean(name="sqlSessionFactory")
        public SqlSessionFactory sqlSessionFactoryBean() {
            SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            // 添加XML目录
            ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
            try {
                bean.setMapperLocations(resolver.getResources("classpath:mapping/*.xml"));
                SqlSessionFactory sqlSessionFactory = bean.getObject();
                sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
    
                return sqlSessionFactory;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        @Bean
        public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    }
    package com.imooc.mq.config.database;
    
    import org.mybatis.spring.mapper.MapperScannerConfigurer;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    /**
     * @Title: MybatisMapperScanerConfig
     * @Description: 扫码Mybatis
     * @AutoConfigureAfter(MybatisDataSourceConfig.class) 先加载数据源类,再加载该类
     * @date 2019/1/2214:43
     */
    @Configuration
    @AutoConfigureAfter(MybatisDataSourceConfig.class)
    public class MybatisMapperScanerConfig {
        @Bean
        public MapperScannerConfigurer mapperScannerConfigurer() {
            MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
            mapperScannerConfigurer.setSqlSessionFactoryBeanName("sqlSessionFactory");
            mapperScannerConfigurer.setBasePackage("com.imooc.mq.mapper");
            return mapperScannerConfigurer;
        }
    }
    package com.imooc.mq.config.task;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    /**
     * @Title: TaskSchedulerConfig
     * @Description: 定时任务配置
     * @date 2019/1/2214:46
     */
    @Configuration
    @EnableScheduling //启动定时任务
    public class TaskSchedulerConfig  implements SchedulingConfigurer {
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.setScheduler(taskScheduler());
        }
    
        /**
         * 定时任务线程池
         * @return
         */
        @Bean(destroyMethod = "shutdown")
        public Executor taskScheduler(){
            return Executors.newScheduledThreadPool(100);
        }
    }
    package com.imooc.mq.constant;
    
    /**
     * @Title: Constans
     * @Description: 常量
     * @date 2019/1/2214:50
     */
    public class Constans {
        /**
         * 发送中
         */
        public static final String ORDER_SENDING = "0";
    
        /**
         * 发送成功
         */
        public static final String ORDER_SEND_SUCCESS = "1";
    
        /**
         * 发送失败
         */
        public static final String ORDER_SEND_FAILURE = "2";
        /**
         * 分钟超时单位:min
         */
        public static final int ORDER_TIMEOUT = 1;
    }

     相应的mapper接口和mapper.xml文件配置

    package com.imooc.mq.mapper;
    
    import com.imooc.mq.entity.BrokerMessageLog;
    import org.apache.ibatis.annotations.Param;
    import org.springframework.stereotype.Repository;
    
    import java.util.Date;
    import java.util.List;
    
    /**
     * @Title: BrokerMessageLogMapper
     * @Description: 消息记录接口
     * @date 2019/1/2214:45
     */
    @Repository
    public interface BrokerMessageLogMapper {
        /**
         * 查询消息状态为0(发送中) 且已经超时的消息集合
         * @return
         */
        List<BrokerMessageLog> query4StatusAndTimeoutMessage();
    
        /**
         * 重新发送统计count发送次数 +1
         * @param messageId
         * @param updateTime
         */
        void update4ReSend(@Param("messageId")String messageId, @Param("updateTime") Date updateTime);
        /**
         * 更新最终消息发送结果 成功 or 失败
         * @param messageId
         * @param status
         * @param updateTime
         */
        void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime);
    
        int insertSelective(BrokerMessageLog record);
    }
    ------------------------------------------------------------------
    package com.imooc.mq.mapper;
    
    import com.imooc.mq.entity.Order;
    import org.springframework.stereotype.Repository;
    
    /**
     * @Title: OrderMapper
     * @Description: 订单接口
     * @date 2019/1/2214:45
     */
    @Repository
    public interface OrderMapper {
        int insert(Order record);
        int deleteByPrimaryKey(Integer id);
        int insertSelective(Order record);
        Order selectByPrimaryKey(Integer id);
        int updateByPrimaryKeySelective(Order record);
        int updateByPrimaryKey(Order record);
    }
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.imooc.mq.mapper.BrokerMessageLogMapper" >
        <resultMap id="BaseResultMap" type="com.imooc.mq.entity.BrokerMessageLog" >
            <id column="message_id" property="messageId" jdbcType="VARCHAR" />
            <result column="message" property="message" jdbcType="VARCHAR" />
            <result column="try_count" property="tryCount" jdbcType="INTEGER" />
            <result column="status" property="status" jdbcType="VARCHAR" />
            <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
            <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
            <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
        </resultMap>
        <sql id="Base_Column_List" >
        message_id, message, try_count, status, next_retry, create_time, update_time
      </sql>
    
        <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
            select
            <include refid="Base_Column_List" />
            from broker_message_log
            where message_id = #{messageId,jdbcType=VARCHAR}
        </select>
        <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
        delete from broker_message_log
        where message_id = #{messageId,jdbcType=VARCHAR}
      </delete>
        <insert id="insert" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
        insert into broker_message_log (message_id, message, try_count,
          status, next_retry, create_time,
          update_time)
        values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR}, #{tryCount,jdbcType=INTEGER},
          #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
          #{updateTime,jdbcType=TIMESTAMP})
      </insert>
        <insert id="insertSelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
            insert into broker_message_log
            <trim prefix="(" suffix=")" suffixOverrides="," >
                <if test="messageId != null" >
                    message_id,
                </if>
                <if test="message != null" >
                    message,
                </if>
                <if test="tryCount != null" >
                    try_count,
                </if>
                <if test="status != null" >
                    status,
                </if>
                <if test="nextRetry != null" >
                    next_retry,
                </if>
                <if test="createTime != null" >
                    create_time,
                </if>
                <if test="updateTime != null" >
                    update_time,
                </if>
            </trim>
            <trim prefix="values (" suffix=")" suffixOverrides="," >
                <if test="messageId != null" >
                    #{messageId,jdbcType=VARCHAR},
                </if>
                <if test="message != null" >
                    #{message,jdbcType=VARCHAR},
                </if>
                <if test="tryCount != null" >
                    #{tryCount,jdbcType=INTEGER},
                </if>
                <if test="status != null" >
                    #{status,jdbcType=VARCHAR},
                </if>
                <if test="nextRetry != null" >
                    #{nextRetry,jdbcType=TIMESTAMP},
                </if>
                <if test="createTime != null" >
                    #{createTime,jdbcType=TIMESTAMP},
                </if>
                <if test="updateTime != null" >
                    #{updateTime,jdbcType=TIMESTAMP},
                </if>
            </trim>
        </insert>
        <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
            update broker_message_log
            <set >
                <if test="message != null" >
                    message = #{message,jdbcType=VARCHAR},
                </if>
                <if test="tryCount != null" >
                    try_count = #{tryCount,jdbcType=INTEGER},
                </if>
                <if test="status != null" >
                    status = #{status,jdbcType=VARCHAR},
                </if>
                <if test="nextRetry != null" >
                    next_retry = #{nextRetry,jdbcType=TIMESTAMP},
                </if>
                <if test="createTime != null" >
                    create_time = #{createTime,jdbcType=TIMESTAMP},
                </if>
                <if test="updateTime != null" >
                    update_time = #{updateTime,jdbcType=TIMESTAMP},
                </if>
            </set>
            where message_id = #{messageId,jdbcType=VARCHAR}
        </update>
        <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.BrokerMessageLog" >
        update broker_message_log
        set message = #{message,jdbcType=VARCHAR},
          try_count = #{tryCount,jdbcType=INTEGER},
          status = #{status,jdbcType=VARCHAR},
          next_retry = #{nextRetry,jdbcType=TIMESTAMP},
          create_time = #{createTime,jdbcType=TIMESTAMP},
          update_time = #{updateTime,jdbcType=TIMESTAMP}
        where message_id = #{messageId,jdbcType=VARCHAR}
      </update>
    
    
        <select id="query4StatusAndTimeoutMessage" resultMap="BaseResultMap">
              <![CDATA[
              select message_id, message, try_count, status, next_retry, create_time, update_time
                  from broker_message_log bml
                  where status = '0'
                  and next_retry <= sysdate()
              ]]>
        </select>
    
        <update id="update4ReSend" >
        update broker_message_log bml
        set bml.try_count = bml.try_count + 1,
          bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}
      </update>
    
        <update id="changeBrokerMessageLogStatus" >
        update broker_message_log bml
        set bml.status = #{status,jdbcType=VARCHAR},
              bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}
      </update>
    
    </mapper>
    -------------------------------------------------------------
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.imooc.mq.mapper.OrderMapper" >
        <resultMap id="BaseResultMap" type="com.imooc.mq.entity.Order" >
            <id column="id" property="id" jdbcType="INTEGER" />
            <result column="name" property="name" jdbcType="VARCHAR" />
            <result column="message_id" property="messageId" jdbcType="VARCHAR" />
        </resultMap>
        <sql id="Example_Where_Clause" >
            <where >
                <foreach collection="oredCriteria" item="criteria" separator="or" >
                    <if test="criteria.valid" >
                        <trim prefix="(" suffix=")" prefixOverrides="and" >
                            <foreach collection="criteria.criteria" item="criterion" >
                                <choose >
                                    <when test="criterion.noValue" >
                                        and ${criterion.condition}
                                    </when>
                                    <when test="criterion.singleValue" >
                                        and ${criterion.condition} #{criterion.value}
                                    </when>
                                    <when test="criterion.betweenValue" >
                                        and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
                                    </when>
                                    <when test="criterion.listValue" >
                                        and ${criterion.condition}
                                        <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
                                            #{listItem}
                                        </foreach>
                                    </when>
                                </choose>
                            </foreach>
                        </trim>
                    </if>
                </foreach>
            </where>
        </sql>
        <sql id="Update_By_Example_Where_Clause" >
            <where >
                <foreach collection="example.oredCriteria" item="criteria" separator="or" >
                    <if test="criteria.valid" >
                        <trim prefix="(" suffix=")" prefixOverrides="and" >
                            <foreach collection="criteria.criteria" item="criterion" >
                                <choose >
                                    <when test="criterion.noValue" >
                                        and ${criterion.condition}
                                    </when>
                                    <when test="criterion.singleValue" >
                                        and ${criterion.condition} #{criterion.value}
                                    </when>
                                    <when test="criterion.betweenValue" >
                                        and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
                                    </when>
                                    <when test="criterion.listValue" >
                                        and ${criterion.condition}
                                        <foreach collection="criterion.value" item="listItem" open="(" close=")" separator="," >
                                            #{listItem}
                                        </foreach>
                                    </when>
                                </choose>
                            </foreach>
                        </trim>
                    </if>
                </foreach>
            </where>
        </sql>
        <sql id="Base_Column_List" >
        id, name, message_id
      </sql>
    
        <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
            select
            <include refid="Base_Column_List" />
            from t_order
            where id = #{id,jdbcType=INTEGER}
        </select>
        <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
        delete from t_order
        where id = #{id,jdbcType=INTEGER}
      </delete>
    
        <insert id="insert" parameterType="com.imooc.mq.entity.Order" >
        insert into t_order (id, name, message_id
          )
        values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{messageId,jdbcType=VARCHAR}
          )
      </insert>
        <insert id="insertSelective" parameterType="com.imooc.mq.entity.Order" >
            insert into t_order
            <trim prefix="(" suffix=")" suffixOverrides="," >
                <if test="id != null" >
                    id,
                </if>
                <if test="name != null" >
                    name,
                </if>
                <if test="messageId != null" >
                    message_id,
                </if>
            </trim>
            <trim prefix="values (" suffix=")" suffixOverrides="," >
                <if test="id != null" >
                    #{id,jdbcType=INTEGER},
                </if>
                <if test="name != null" >
                    #{name,jdbcType=VARCHAR},
                </if>
                <if test="messageId != null" >
                    #{messageId,jdbcType=VARCHAR},
                </if>
            </trim>
        </insert>
    
        <update id="updateByExampleSelective" parameterType="map" >
            update t_order
            <set >
                <if test="record.id != null" >
                    id = #{record.id,jdbcType=INTEGER},
                </if>
                <if test="record.name != null" >
                    name = #{record.name,jdbcType=VARCHAR},
                </if>
                <if test="record.messageId != null" >
                    message_id = #{record.messageId,jdbcType=VARCHAR},
                </if>
            </set>
            <if test="_parameter != null" >
                <include refid="Update_By_Example_Where_Clause" />
            </if>
        </update>
        <update id="updateByExample" parameterType="map" >
            update t_order
            set id = #{record.id,jdbcType=INTEGER},
            name = #{record.name,jdbcType=VARCHAR},
            message_id = #{record.messageId,jdbcType=VARCHAR}
            <if test="_parameter != null" >
                <include refid="Update_By_Example_Where_Clause" />
            </if>
        </update>
        <update id="updateByPrimaryKeySelective" parameterType="com.imooc.mq.entity.Order" >
            update t_order
            <set >
                <if test="name != null" >
                    name = #{name,jdbcType=VARCHAR},
                </if>
                <if test="messageId != null" >
                    message_id = #{messageId,jdbcType=VARCHAR},
                </if>
            </set>
            where id = #{id,jdbcType=INTEGER}
        </update>
        <update id="updateByPrimaryKey" parameterType="com.imooc.mq.entity.Order" >
        update t_order
        set name = #{name,jdbcType=VARCHAR},
          message_id = #{messageId,jdbcType=VARCHAR}
        where id = #{id,jdbcType=INTEGER}
      </update>
    </mapper>

    package com.imooc.mq.entity;
    
    import java.util.Date;
    
    /**
     * @Title: BrokerMessageLog
     * @Description: 消息记录
     * @date 2019/1/2214:29
     */
    public class BrokerMessageLog {
        private String messageId;
    
        private String message;
    
        private Integer tryCount;
    
        private String status;
    
        private Date nextRetry;
    
        private Date createTime;
    
        private Date updateTime;
    
        public BrokerMessageLog() {
        }
    
        public BrokerMessageLog(String messageId, String message, Integer tryCount, String status, Date nextRetry, Date createTime, Date updateTime) {
            this.messageId = messageId;
            this.message = message;
            this.tryCount = tryCount;
            this.status = status;
            this.nextRetry = nextRetry;
            this.createTime = createTime;
            this.updateTime = updateTime;
        }
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public Integer getTryCount() {
            return tryCount;
        }
    
        public void setTryCount(Integer tryCount) {
            this.tryCount = tryCount;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public Date getNextRetry() {
            return nextRetry;
        }
    
        public void setNextRetry(Date nextRetry) {
            this.nextRetry = nextRetry;
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    
        public Date getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(Date updateTime) {
            this.updateTime = updateTime;
        }
    }
    --------------------------------------------------------------
    package com.imooc.mq.entity;
    
    import java.io.Serializable;
    
    /**
     * @Title: Order
     * @Description: 订单
     * @date 2019/1/2210:18
     */
    public class Order implements Serializable {
        private String id;
        private String name;
        //存储消息发送的唯一标识
        private String messageId;
    
        public Order() {
        }
    
        public Order(String id, String name, String messageId) {
            this.id = id;
            this.name = name;
            this.messageId = messageId;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getMessageId() {
            return messageId;
        }
    
        public void setMessageId(String messageId) {
            this.messageId = messageId;
        }
    
    }

    现在开始按照设计思路写实现代码:

      1、首先我们把最核心了生产者写好,生产者组成有基本的消息投递,和监听

    package com.imooc.mq.producer;
    
    
    import com.imooc.mq.constant.Constans;
    import com.imooc.mq.entity.Order;
    import com.imooc.mq.mapper.BrokerMessageLogMapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @Title: RabbitOrderSender
     * @Description: 消息发送
     * @date 2019/1/2214:52
     */
    @Component
    public class RabbitOrderSender {
        private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
    
        /**
         * Broker应答后,会调用该方法区获取应答结果
         */
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                logger.info("correlationData:"+correlationData);
                String messageId = correlationData.getId();
                if (ack){
                    //如果返回成功,则进行更新
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date());
                }else {
                    //失败进行操作:根据具体失败原因选择重试或补偿等手段
                    logger.error("异常处理"+cause);
                }
            }
        };
    
        /**
         * 发送消息方法调用: 构建自定义对象消息
         * @param order
         * @throws Exception
         */
        public void sendOrder(Order order) throws Exception {
            // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
            rabbitTemplate.setConfirmCallback(confirmCallback);
            //消息唯一ID
            CorrelationData correlationData = new CorrelationData(order.getMessageId());
            rabbitTemplate.convertAndSend("order-exchange1", "order.ABC", order, correlationData);
        }
    }

      2、将定时任务逻辑写好

    package com.imooc.mq.task;
    
    import com.imooc.mq.constant.Constans;
    import com.imooc.mq.entity.BrokerMessageLog;
    import com.imooc.mq.entity.Order;
    import com.imooc.mq.mapper.BrokerMessageLogMapper;
    import com.imooc.mq.producer.RabbitOrderSender;
    import com.imooc.mq.utils.FastJsonConvertUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.List;
    
    /**
     * @Title: RetryMessageTasker
     * @Description: 定时任务
     * @date 2019/1/2215:45
     */
    @Component
    public class RetryMessageTasker {
        private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class);
        @Autowired
        private RabbitOrderSender rabbitOrderSender;
    
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
    
        /**
         * 定时任务
         */
        @Scheduled(initialDelay = 5000, fixedDelay = 10000)
        public void reSend(){
            logger.info("-----------定时任务开始-----------");
            //抽取消息状态为0且已经超时的消息集合
            List<BrokerMessageLog> list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
            list.forEach(messageLog -> {
                //投递三次以上的消息
                if(messageLog.getTryCount() >= 3){
                    //更新失败的消息
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date());
                } else {
                    // 重试投递消息,将重试次数递增
                    brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());
                    Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);
                    try {
                        rabbitOrderSender.sendOrder(reSendOrder);
                    } catch (Exception e) {
                        e.printStackTrace();
                        logger.error("-----------异常处理-----------");
                    }
                }
            });
        }
    
    }

      3、写好消费者的逻辑,直接用上一篇中的消费者代码,修改对应的exchange、queue、路由key就好

    package com.imooc.mq.consumer;
    
    import com.imooc.mq.entity.Order;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    
    /**
     * @Title: OrderReceiver
     * @Description: 消费
     * @date 2019/1/2211:03
     */
    @Component
    public class OrderReceiver {
        /**
         * @RabbitListener 消息监听,可配置交换机、队列、路由key
         * 该注解会创建队列和交互机 并建立绑定关系
         * @RabbitHandler 标识此方法如果有消息过来,消费者要调用这个方法
         * @Payload 消息体
         * @Headers 消息头
         * @param order
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "order-queue1",declare = "true"),
                exchange = @Exchange(name = "order-exchange1",declare = "true",type = "topic"),
                key = "order.ABC"
        ))
        @RabbitHandler
        public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,
                                   Channel channel) throws Exception{
            //消费者操作
            System.out.println("------收到消息,开始消费------");
            System.out.println("订单ID:"+order.getId());
    
            Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
            //现在是手动确认消息 ACK 
            channel.basicAck(deliveryTag,false);
        }
    }

      4、业务逻辑

    package com.imooc.mq.service;
    
    import com.imooc.mq.constant.Constans;
    import com.imooc.mq.entity.BrokerMessageLog;
    import com.imooc.mq.entity.Order;
    import com.imooc.mq.mapper.BrokerMessageLogMapper;
    import com.imooc.mq.mapper.OrderMapper;
    import com.imooc.mq.producer.RabbitOrderSender;
    import com.imooc.mq.utils.DateUtils;
    import com.imooc.mq.utils.FastJsonConvertUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.util.Date;
    
    /**
     * @Title: OrderService
     * @Description: 业务实现
     * @date 2019/1/2215:41
     */
    @Service
    public class OrderService {
        private static Logger logger = LoggerFactory.getLogger(OrderService.class);
        @Autowired
        private OrderMapper orderMapper;
    
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
    
        @Autowired
        private RabbitOrderSender rabbitOrderSender;
    
        public void createOrder(Order order)  {
            try {
                // 使用当前时间当做订单创建时间(为了模拟一下简化)
                Date orderTime = new Date();
                // 插入业务数据
                orderMapper.insert(order);
                // 插入消息记录表数据
                BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
                // 消息唯一ID
                brokerMessageLog.setMessageId(order.getMessageId());
                // 保存消息整体 转为JSON 格式存储入库
                brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));
                // 设置消息状态为0 表示发送中
                brokerMessageLog.setStatus("0");
                // 设置消息未确认超时时间窗口为 一分钟
                brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constans.ORDER_TIMEOUT));
                brokerMessageLog.setCreateTime(new Date());
                brokerMessageLog.setUpdateTime(new Date());
                brokerMessageLogMapper.insertSelective(brokerMessageLog);
                // 发送消息
                rabbitOrderSender.sendOrder(order);
            } catch (Exception e) {
                logger.error("订单业务异常{}",e);
            }
        }
    }

      5、测试

     /**
         * 测试订单创建
         */
        @Test
        public void createOrder(){
            Order order = new Order();
            order.setId("201901228");
            order.setName("测试订单");
            order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
            try {
                orderService.createOrder(order);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

      先启动消费者服务、再启动生产者服务让定时任务跑起来,最后启动测试方法。消息被消费成功后,日志记录状态被修改为1。测试消息重投的话需要制造一些异常情况,比如修改消费者端跟exchange,生产者找不到该交互机,拿不到回调,就会重试投递。

  • 相关阅读:
    git 镜像地址
    IntelliJ IDEA 2019 控制台中文乱码问题
    LINUX配置本地YUM源
    动态添加js的代码
    Java 多线程
    Java I/O系统
    Java 中的容器 Collection 和 Map
    Java 数组
    javaweb的四大作用域
    三层 转自http://git.oschina.net/tzhsweet/superui
  • 原文地址:https://www.cnblogs.com/zhangbLearn/p/10304976.html
Copyright © 2011-2022 走看看