zoukankan      html  css  js  c++  java
  • RabbitMQ 延迟队列实现

    1、延迟队列,可以通过rabbitmq自带机制实现:TTL+死信队列(通过设置消息或者队列的TTL,过期后进行消息的投递,从而达到delay的效果)。

      但存在问题:

        1)设置队列TTL:同一个队列的所有消息从入队列到TTL的时间,过期后会投递到相应死信交换机。这样如果消息的过期时间不尽相同,会创建n个不同TTL的队列。

        2)设置消息TTL:虽然每个消息的TTL不同,但是投递到相同队列,队列的先进先出原则,可能排在后边的消息早就过期了。

    2、插件: rabbitmq_delayed_message_exchange(好像3.5.7版本以上支持)

       原理:将消息发送到延迟交换机中,消息达到自己的延迟时间,则会被投递到相应队列。

      使用:mq安装插件rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

      代码层面:

        1、配置:

          @Slf4j@Configuration

    public class DelayedConfig {
    
        public static final String DELAYED_QUEUE="delayed.queue";
        public static final String DELAYED_ROUTING_KEY="delayed.key";
        public static final String DELAYED_EXCHANGE="delayed.exchange";
    
    
    
        @Bean
        public Queue delayedQueue(){
            return new Queue(DELAYED_QUEUE,true,false,false,null);
        }
    
        @Bean
        public Exchange delayedExchange(){
            Map<String, Object> agruments = new HashMap<String, Object>();
            agruments.put("x-delayed-type", ExchangeTypes.DIRECT);
         //1、创建CustomExchange
         //2、类型:x-delayed-message
         //3、属性配置:x-delayed-type
    return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,agruments); } @Bean public Binding delayedBinding(){ return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs(); } }

      2、客户端发送

      

      @GetMapping("/mq")
        public MessageResult mq() {
            Plan plan=new Plan();
            plan.setTs(new Date());
            plan.setName("asdasd");
            plan.setChooseids(Arrays.asList("1","2"));
         //将延迟时间放到消息头中
         //也可以构建MessageProperties实现
    rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE,DelayedConfig.DELAYED_ROUTING_KEY, JSONObject.toJSONString(plan),msg->{
                msg.getMessageProperties().setHeader("x-delay",10000);
                return msg;
            });
            return new MessageResult("保存成功", null, "");
        }
    Don’t hurry say have no choice, perhaps, next intersection will meet hope.
  • 相关阅读:
    学习笔记-Python基础19-结构化文件存储-xml
    学习笔记-Python基础18-asyncio异步、aiohttp、concurrent并发
    学习笔记-Python基础17-迭代器、生成器、协成
    学习笔记-Python基础16-多进程
    学习笔记-Python基础16-多线程
    Java中BigDecimal的一个除法异常
    JUnit单元测试入门
    Location search 属性
    struts2表单提交的乱码的问题的解决
    出现java.lang.NoSuchFieldException resourceEntries错误的解决方法
  • 原文地址:https://www.cnblogs.com/volare/p/14298409.html
Copyright © 2011-2022 走看看