zoukankan      html  css  js  c++  java
  • RabbitMQ延迟队列

    1.  延迟队列的使用场景

    • 延迟消费
      (a)未按时支付的订单,30分钟过期之后取消订单
      (b)给活跃度低的用户间隔N天推送消息
      (c)新注册用户1分钟后发送邮件
    • 延迟重试
      消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试(设想支付宝的支付回调是怎么实现的)

    2.  实现延迟队列的方式有两种

    1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
    2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能(注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

     本文采用第二种延迟插件方式,安装过程见https://www.cnblogs.com/yangjiming/p/11024072.html

    1.  配置队列

    @Configuration
    public class DelayRabbitConfig {
    
        public final static String QUEUE_NAME = "delayed.goods.order";
        public final static String EXCHANGE_NAME = "delayedExchange";
        public final static Long DELAY_TIME=5000L;
        @Bean
        public Queue delayQueue() {
            return new Queue(DelayRabbitConfig.QUEUE_NAME);
        }
    
        // 配置默认的交换机
        @Bean
        public CustomExchange delayedExchange() {
            Map<String, Object> args = new HashMap<>(16);
            args.put("x-delayed-type", "direct");
            //参数二为类型:必须是x-delayed-message
            return new CustomExchange(DelayRabbitConfig.EXCHANGE_NAME, "x-delayed-message",true, false, args);
        }
    
        // 绑定队列到交换器
        @Bean
        public Binding binding() {
            return BindingBuilder
                    .bind(delayQueue())
                    .to((delayedExchange()))
                    .with(DelayRabbitConfig.QUEUE_NAME)
                    .noargs();
        }
    }

    2.  发送消息

    @Component
    public class DelaySender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void sendMsg(String msg) {
            LocalDateTime localDateTime = LocalDateTime.now();
    
            rabbitTemplate.convertAndSend(DelayRabbitConfig.EXCHANGE_NAME,
                    DelayRabbitConfig.QUEUE_NAME,
                    "helo "+msg+" 发送时间"+localDateTime,
                    message ->  {
                        message.getMessageProperties().setHeader("x-delay", DelayRabbitConfig.DELAY_TIME);
                        return message;
                    }
            );
        }
    }

    3. 消费消息

    @Component
    @RabbitListener(queues = DelayRabbitConfig.QUEUE_NAME)
    public class DelayReceiver {
    
        @RabbitHandler
        public void helloConsumer(String content) throws IOException {
            LocalDateTime localDateTime = LocalDateTime.now();
            System.out.println("接收时间:" + localDateTime);
            System.out.println("消息内容:" + content);
        }
    }

    4.  测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class DelayTest {
    
        @Autowired
        private DelaySender delaySender;
        @Test
        public void hello() throws Exception {
            delaySender.sendMsg("麦粒肿");
            Thread.sleep(5000);
        }
    }
  • 相关阅读:
    TASK1
    CSS再学
    Html再学
    Python的hasattr() getattr() setattr() 函数使用方法详解
    GET/POST/g和钩子函数(hook)
    cookie和session
    SQLAlchemy外键的使用
    jquery树形菜单插件treeView
    linux设置防火墙
    linux解压命令
  • 原文地址:https://www.cnblogs.com/yangjiming/p/11024069.html
Copyright © 2011-2022 走看看