zoukankan      html  css  js  c++  java
  • SpringBoot RabbitMQ 延迟队列代码实现

    场景

    用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列

    准备

    利用rabbitmq_delayed_message_exchange插件;

    首先下载该插件:https://www.rabbitmq.com/community-plugins.html

    然后把该插件放到rabbitmq安装目录plugins下;

    进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";

    关闭RabbitMQ服务,然后再启动(直接重启该插件可能会不生效)。

    SpringBoot RabbitMQ代码

    application.properties配置文件

    spring.application.name=spring-boot-rabbitmq
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=nut
    spring.rabbitmq.password=nut

    配置类

    注意这里的"x-delayed-type"和"x-delayed-message"

    /**
     * 延迟队列配置exchange
     */
    @Configuration
    public class DelayQueueConfig {
    
        public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
        public static final String DELAY_QUEUE = "DELAY_QUEUE";
        public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";
    
        @Bean("delayExchange")
        public Exchange delayExchange() {
            Map<String, Object> args = new HashMap<>(1);
    //       x-delayed-type    声明 延迟队列Exchange的类型
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
        }
    
        @Bean("delayQueue")
        public Queue delayQueue() {
            return QueueBuilder.durable(DELAY_QUEUE).build();
        }
    
        /**
         * 将延迟队列通过routingKey绑定到延迟交换器
         *
         * @return
         */
        @Bean
        public Binding delayQueueBindExchange() {
            return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
        }
    
    }

    生产者

    发送消息时,指定延迟的毫秒

    /**
     * 延迟队列发送者
     */
    @Component
    @Slf4j
    public class DelayQueueSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendDelayQueue(int number) {
            log.warn("延迟队列发送 : {} milliseconds", number);
            // 这里的Exchange可以是业务的Exchange,为了方便测试这里直接往死信Exchange里投递消息
            rabbitTemplate.convertAndSend(
                    DelayQueueConfig.DELAY_EXCHANGE,
                    DelayQueueConfig.DELAY_ROUTING_KEY,
                    number, (message) -> {
                        // 设置延迟的毫秒数
                        message.getMessageProperties().setDelay(number);
                        log.info("Now : {}", ZonedDateTime.now());
                        return message;
                    });
        }
    }

    消费者

    /**
     * 延迟队列消费者
     */
    @Component
    @Slf4j
    @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
    public class DelayQueueConsumer {
    
        @RabbitHandler
        public void receiveDelayMessage(Integer milliseconds){
            log.warn("DelayQueueConsumer Time : {}, and the millis : {}", ZonedDateTime.now(), milliseconds);
    
        }
    
    }

    测试

    先启动项目;

    然后在测试类中发送消息;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
    
        @Autowired
        private DelayQueueSender delayQueueSender;
    
    
        @Test
        public void testDelayQueueSender(){
            delayQueueSender.sendDelayQueue(5000);
        }
    }

    发送消息窗口:

    消费者受到消息:

    时间间隔证明延迟队列发送完成!

    参考:

    https://blog.csdn.net/linsongbin1/article/details/80178122

    https://blog.csdn.net/youjin/article/details/82586888

    https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange

    https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

  • 相关阅读:
    剑指offer——最小的K个数和数组中第K大的元素
    Leetcode刷题指南链接整理
    160. Intersection of Two Linked Lists
    100. Same Tree
    92. Reverse Linked List II
    94. Binary Tree Inorder Traversal
    79. Word Search
    78,90,Subsets,46,47,Permutations,39,40 DFS 大合集
    0x16 Tire之最大的异或对
    0x16 Tire
  • 原文地址:https://www.cnblogs.com/theRhyme/p/10986409.html
Copyright © 2011-2022 走看看