zoukankan      html  css  js  c++  java
  • 二、RabbitMQ之延时消息(2)

    rabbitmq安装延时插件 rabbitmq_delayed_message_exchange

    1.到官网https://www.rabbitmq.com/community-plugins.html,下载对应版本的rabbitmq_delayed_message_exchange

    2.将插件拷贝到rabbitmq的plugins目录下,我本地使用的docker启动的rabbitmq服务,使用命令 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 容器ID:/plugins

    3.进入容器内部,docker exec -it 5af /bin/bash, 进入plugins目录,查看是否拷贝成功 cd /plugins

    4.启用延时插件,执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

     5.打开web页面的Exchange模块,可以看见多了一种类型(PS:如果没有看见,可以重启一下服务)

     6.测试

    import org.springframework.amqp.core.*;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
     
    @Configuration
    public class MQExchangeConfig {
    
        public static final String CUSTOM_EXCHANGE_NAME = "custom-Exchange";
       
        public static final String QUEUEA_NAME = "queueA";
        public static final String QUEUEB_NAME = "queueB";
        public static final String ROUTING_KEY_A_NAME = "routingKeyA";
        public static final String ROUTING_KEY_B_NAME = "routingKeyB";
    
    
        @Bean
        CustomExchange customExchange() {
            Map<String, Object> args = new HashMap<String, Object>();
            //路由策略,必填项,参考ExchangeTypes
            args.put("x-delayed-type", "direct");
            return new CustomExchange(CUSTOM_EXCHANGE_NAME, "x-delayed-message", true, false, args);
        }
    
        @Bean
        Queue queueA() {
            return new Queue(QUEUEA_NAME);
        }
    
        @Bean
        Queue queueB() {
            return new Queue(QUEUEB_NAME);
        }
    
        @Bean
        Binding bindingAC(Queue queueA, CustomExchange customExchange) {
            return BindingBuilder.bind(queueA).to(customExchange).with(ROUTING_KEY_A_NAME).noargs();
        }
    
        @Bean
        Binding bindingBC(Queue queueB, CustomExchange customExchange) {
            return BindingBuilder.bind(queueB).to(customExchange).with(ROUTING_KEY_B_NAME).noargs();
        }
    
        //先初始化队列
        @Bean
        @ConditionalOnBean(Queue.class)
        MQExchangeConsumer mqExchangeConsumer() {
            return new MQExchangeConsumer();
        }
    }
    //发送消息,注意:延时时间设置:Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).
     rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a", message -> {
                        // 本质还是设置header的x-delay=10000,可以参考日志信息
                        message.getMessageProperties().setDelay(10000);
                        return message;
                    });
                    rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b", message -> {
                        message.getMessageProperties().setHeader("x-delay", 20000);
                        return message;
                    });

    //结果

    7.停用延时插件

    执行命令:rabbitmq-plugins disable rabbitmq_delayed_message_exchange。

    注:停用后延时未分发的消息将会丢失

    8.其他

    消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如100s数千条或数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果

  • 相关阅读:
    Python 15 爬虫(一)
    Python 14 Mysql数据库(二)
    Python 13 JQuery&Bootstrp
    Python 12 CSS&JavaScript&DOOM
    Python 11 HTML
    Python 10 MySQL数据库(一)
    Python 9 Redis
    Python 8 协程
    Python 7 并发编程
    SNMP协议详解
  • 原文地址:https://www.cnblogs.com/Hleaves/p/13594278.html
Copyright © 2011-2022 走看看