zoukankan      html  css  js  c++  java
  • SpringBoot RabbitMQ 延迟队列代码实现

    场景

    用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列

    准备

    利用rabbitmq_delayed_message_exchange插件;

    首先下载该插件:https://www.rabbitmq.com/community-plugins.html

    然后把该插件放到rabbitmq安装目录plugins下;

    进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";

    关闭RabbitMQ服务,然后再启动(直接重启该插件可能会不生效)。

    SpringBoot RabbitMQ代码

    application.properties配置文件

    spring.application.name=spring-boot-rabbitmq
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=nut
    spring.rabbitmq.password=nut

    配置类

    注意这里的"x-delayed-type"和"x-delayed-message"

    /**
     * 延迟队列配置exchange
     */
    @Configuration
    public class DelayQueueConfig {
    
        public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
        public static final String DELAY_QUEUE = "DELAY_QUEUE";
        public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";
    
        @Bean("delayExchange")
        public Exchange delayExchange() {
            Map<String, Object> args = new HashMap<>(1);
    //       x-delayed-type    声明 延迟队列Exchange的类型
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
        }
    
        @Bean("delayQueue")
        public Queue delayQueue() {
            return QueueBuilder.durable(DELAY_QUEUE).build();
        }
    
        /**
         * 将延迟队列通过routingKey绑定到延迟交换器
         *
         * @return
         */
        @Bean
        public Binding delayQueueBindExchange() {
            return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
        }
    
    }

    生产者

    发送消息时,指定延迟的毫秒

    /**
     * 延迟队列发送者
     */
    @Component
    @Slf4j
    public class DelayQueueSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendDelayQueue(int number) {
            log.warn("延迟队列发送 : {} milliseconds", number);
            // 这里的Exchange可以是业务的Exchange,为了方便测试这里直接往死信Exchange里投递消息
            rabbitTemplate.convertAndSend(
                    DelayQueueConfig.DELAY_EXCHANGE,
                    DelayQueueConfig.DELAY_ROUTING_KEY,
                    number, (message) -> {
                        // 设置延迟的毫秒数
                        message.getMessageProperties().setDelay(number);
                        log.info("Now : {}", ZonedDateTime.now());
                        return message;
                    });
        }
    }

    消费者

    /**
     * 延迟队列消费者
     */
    @Component
    @Slf4j
    @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
    public class DelayQueueConsumer {
    
        @RabbitHandler
        public void receiveDelayMessage(Integer milliseconds){
            log.warn("DelayQueueConsumer Time : {}, and the millis : {}", ZonedDateTime.now(), milliseconds);
    
        }
    
    }

    测试

    先启动项目;

    然后在测试类中发送消息;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
    
        @Autowired
        private DelayQueueSender delayQueueSender;
    
    
        @Test
        public void testDelayQueueSender(){
            delayQueueSender.sendDelayQueue(5000);
        }
    }

    发送消息窗口:

    消费者受到消息:

    时间间隔证明延迟队列发送完成!

    参考:

    https://blog.csdn.net/linsongbin1/article/details/80178122

    https://blog.csdn.net/youjin/article/details/82586888

    https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange

    https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

  • 相关阅读:
    扑克牌顺子
    反转字符串
    左旋转字符串
    和为S的两个数
    C++中substr()详解
    STL库中的equal_range()
    和为S的连续正序列
    数组中只出现一次的数
    二叉树的深度
    mysql找安装路经,更改密码
  • 原文地址:https://www.cnblogs.com/theRhyme/p/10986409.html
Copyright © 2011-2022 走看看