zoukankan      html  css  js  c++  java
  • Spring Boot (26) RabbitMQ延迟队列

    延迟消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

    延迟队列

      订单业务: 在电商/点餐中,都有下单后30分钟内没有付款,就自动取消订单。

      短信通知:下单成功后60s之后给用户发送短信通知。

      失败重试:业务操作失败后,间隔一定的时间进行失败重试。

     这类业务的特点就是:非实时的,需要延迟处理,需要进行失败重试。一种比较笨的方法是采用定时任务,轮训数据库,方法简单好用,但性能低下,在高并发情况下容易弄死数据库,间隔时间不好设置,时间过大,影响精度,过小影响性能,而且做不到按超时的时间顺序处理。另一种就是用java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列,这东西的问题就是不支持分布式与持久化。

    RabbitMQ实现思路

      RabbitMQ队列本身是没有直接实现支持延迟队列的功能,但可以通过它的Time-To-Live Extensions和Dead Letter Exchange的特性模拟出延迟队列的功能。

    Time-To-Live Extensions

      RabbitMQ支持为队列或者消息设置TTL(time to live 存活时间)。TTL表明了一条消息可再队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么娇小的那个值会被取用。

    Dead Letter Exchange

      死信交换机,上文中提到设置了TTL的消息或队列最终会成为Dead Letter。如果为队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新发送到Dead Letter Exchange中,然后通过Dead Letter Exchange 路由到其他队列,即可实现延迟队列的功能。

    导入依赖

      pom.xml中加入spring-boot-starter-amqp的依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    属性配置

    spring:
      rabbitmq:
        username: david
        password: 123456
        host: localhost
        port: 5672
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual #手动ACK 不开启自动ACK模式,目的是防止报错后为正确处理消息丢失 默认为none

    定义队列

    package com.spring.boot.utils;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            //设置成功回调
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> System.out.println("消息发送成功:correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause));
            //设置失败回调
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> System.out.println("消息丢失:exchange:" + exchange + ",route:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText + ",message:" + message));
            return rabbitTemplate;
        }
    
        //延迟队列TTL名称
        public static final String REGISTER_DELAY_QUEUE = "dev.book.register.delay.queue";
        //DLX,死信发送到的交换机
        public static final String REGISTER_DELAY_EXCHANGE = "book.exchange";
        //路由名称
        public static final String DELAY_ROUTING_KEY = "all";
    
        public static final String REGISTER_QUEUE_NAME = "dev.book.register.queue";
        public static final String REGISTER_EXCHANGE_NAME = "dev.book.register.exchange";
        public static final String ROUTING_KEY = "all";
    
        /**
         * 返回一个延迟队列
         * @return
         */
        @Bean
        public Queue delayProcessQueue(){
            Map<String,Object> parms = new HashMap<>();
            //x-dead-letter-exchange声明了队列里的死信转发到DLX交换机
            parms.put("x-dead-letter-exchange",REGISTER_EXCHANGE_NAME);
            //x-dead-letter-routing-key 声明了这些死信在转发时携带的routing-key名称
            parms.put("x-dead-letter-routing-key",ROUTING_KEY);
            return new Queue(REGISTER_DELAY_QUEUE,true,false,false,parms);
        }
    
        @Bean
        public DirectExchange delayExchange(){
            return new DirectExchange(REGISTER_DELAY_EXCHANGE);
        }
    
        /**
         * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
         * 这是一个完整的匹配。如果一iGetter队列绑定到该交换机上要求路由建"dog",则只有被标记为"dog"的消息才会被转发,也不会转发dog。xxx 只会转发dog
         */
        @Bean
        public Binding dlxBinding(){
            return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
        }
    
        @Bean
        public Queue registerBookQueue(){
            return new Queue(REGISTER_QUEUE_NAME,true);
        }
    
        @Bean
        public TopicExchange registerBookTopicExchange(){
            return new TopicExchange(REGISTER_EXCHANGE_NAME);
        }
    
        @Bean
        public Binding registerBookBinding(){
            return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY);
        }
    
    }

    实体类

    package com.spring.boot.bean;
    
    import java.io.Serializable;
    
    public class Book implements Serializable{
        private Integer id;
        private String name;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }

    控制器

    创建BookController用于消息发送工作

    package com.spring.boot.controller;
    
    import com.spring.boot.bean.Book;
    import com.spring.boot.utils.RabbitConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    
    @RestController
    @RequestMapping("/books")
    public class BookController {
    
        //spring boot 2.x版本推荐构造器注入 而不是属性注入
        private final RabbitTemplate rabbitTemplate;
    
        @Autowired
        public BookController(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        @GetMapping("/defaultMessage")
        public void defaultMessage() {
            Book book = new Book();
            book.setId(1);
            book.setName("hello RabbitMQ");
    
            //添加延迟队列
            this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE,RabbitConfig.DELAY_ROUTING_KEY,book,message -> {
                //TODO 第一句是可要可不要,根据自己需要自行处理
                message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,Book.class.getName());
                //TODO 如果配置了 parms.put("x-message-ttl",5*1000);那么这一句也可以省略 根据具体的业务需要是提前声明还是发送时候自己控制时间
                message.getMessageProperties().setExpiration(5 * 1000 + "");
                return message;
            });
    
            System.out.println("发送时间 :"+LocalDateTime.now());
    
        }
    
    }

    消息消费者

      默认情况下spring-boot-data-amqp是自动ACK机制,不管是否执行成功都清除掉队列中的数据。手动ACK是为了当队列中的任务发生异常时,取消此操作,仍然保存在队列中

    package com.spring.boot.handler;
    
    import com.rabbitmq.client.Channel;
    import com.spring.boot.bean.Book;
    import com.spring.boot.utils.RabbitConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.text.MessageFormat;
    import java.time.LocalDateTime;
    
    @Component
    public class BookHandler {
    
        //TODO
        @RabbitListener(queues = {RabbitConfig.REGISTER_QUEUE_NAME})
        public void listenerDelayQueue(Book book, Message message,Channel channel) throws IOException {
    
            System.out.println(MessageFormat.format("[listenerDelayQueue 监听的消息] - [消费时间] - [{0}] - [{1}]", LocalDateTime.now(), book.toString()));
    
            try{
                // TODO 通知MQ 已经被消费完成 可以ACK了
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                //TODO 处理失败,重新压入MQ
                channel.basicRecover();
                e.printStackTrace();
            }
        }
    }

    启动程序,输入地址 http://localhost:8088/books/defaultMessage 进行测试

    发送时间 :2018-06-13T15:07:43.766
    消息发送成功:correlationData:null,ack:true,cause:null

        ------5秒后------

    [listenerDelayQueue 监听的消息] - [消费时间] - [2018-06-13T15:07:48.771] - [com.spring.boot.bean.Book@2887a177]

  • 相关阅读:
    HDU dp递推 母牛的故事 *
    自制权限框架(二)注解
    自制权限框架(二)注解
    自制权限框架(二)注解
    自制权限框架(二)注解
    iOS Sprite Kit教程之申请和下载证书
    iOS Sprite Kit教程之申请和下载证书
    iOS Sprite Kit教程之申请和下载证书
    白领创业做起了小龙虾的生意,如今公司日销售额达30万元
    她将小镇上的童装生意做到了线上,这位夫妻因此获得了大丰收
  • 原文地址:https://www.cnblogs.com/baidawei/p/9177914.html
Copyright © 2011-2022 走看看