zoukankan      html  css  js  c++  java
  • RabbitMQ-----死信队列

    1.什么是TTL?

    a. time to live 消息存活时间
    
    b. 如果消息在存活时间内未被消费,则会被清除
    
    c. RabbitMQ支持两种ttl设置
      -单独消息进行配置ttl
      -整个队列进行配置ttl(居多)

    2.什么是rabbitmq的死信队列?

    没有被及时消费的消息存放的队列

    3.什么是rabbitmq的死信交换机?

    Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机

    4.消息有哪几种情况成为死信?

    a. 消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false
    b. 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
    c. 队列的消息长度达到极限
    结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

    5.RabbitMQ管控台消息TTL测试

    a. 队列过期时间使用参数,对整个队列消息统一过期
    x-message-ttl
    单位ms(毫秒)
    b. 消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,则消息会还在队列里面) expiration 单位ms(毫秒)
    c. 两者都配置的话,时间短的先触发

    6.如图

     

    7.什么是延迟队列?

    种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息

    8.使用场景

    1. 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
    b. 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
    c. 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,
    消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

    9.业界的一些实现方式

    a. 定时任务高精度轮训
    
    b. 采用RocketMQ自带延迟消息功能
    
    c. RabbitMQ本身是不支持延迟队列的,怎么办?
    结合死信队列的特性,就可以做到延迟消息

    10.代码

    场景:

    客户提交商品订单后,需要在30分钟内完成支付,如未完成,则发送消息提醒订单失败

    a. rabbitmq配置类代码,配置普通/死信队列和交换机

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Rabbitmq配置类
     * 这里普通队列也可以叫它延时队列,是没有配置消费者(Listener)去监听的
     *
     * */
    @Configuration
    public class RabbitmqConfig {
    
        public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
        public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
        public static final String LOCK_MERCHANT_DEAD_ROUTING_KEY = "lock_merchant_dead_routing_key";
        public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
        public static final String NEW_MERCHANT_QUEUE= "new_merchant_queue";
        public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant.#";
    
        /**
         * 死信交换机(topic模式)
         *
         * */
        @Bean
        public Exchange lockMerchantDeadExchange(){
            //durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,
            // 如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,
            // 当rabbitmq重启之后会读取该数据库
            return ExchangeBuilder.topicExchange(LOCK_MERCHANT_DEAD_EXCHANGE).durable(true).build();
        }
    
        /**
         * 死信队列
         *
         * */
        @Bean
        public Queue lockMerchantDeadQueue() {
            return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
        }
    
        /**
         * 绑定死信交换机和死信队列
         * 这里不加@Qualifier的话会报错:there is more than one bean of “xxx” type
         * 因为死信交换机和普通交换机都配置了Exchange, 无法区分哪种作为参数
         * Queue同理
         *
         * */
        @Bean
        public Binding lockMerchantDeadBinding(@Qualifier("lockMerchantDeadExchange") Exchange exchange, @Qualifier("lockMerchantDeadQueue") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_DEAD_ROUTING_KEY).noargs();
        }
    
        /**
         * 普通交换机(topic模式)
         *
         * */
        @Bean
        public Exchange newMerchantExchange(){
            //durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,
            // 如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,
            // 当rabbitmq重启之后会读取该数据库
            return ExchangeBuilder.topicExchange(NEW_MERCHANT_EXCHANGE).durable(true).build();
        }
    
        /**
         * 普通队列
         *
         * */
        @Bean
        public Queue newMerchantQueue() {
            Map<String, Object> args = new HashMap<>();
            //消息过期后,进入死信交换机
            args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
            //消息过期后,进入死信交换机的路由键
            args.put("x-dead-letter-routing-key", LOCK_MERCHANT_DEAD_ROUTING_KEY);
            //消息过期时间 单位:毫秒 消息过期后,会从普通队列转入死信队列
            //这里方便测试设置10秒后消息过期
            args.put("x-message-ttl",10000);
            return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
        }
    
        /**
         * 绑定普通交换机和普通队列
         *
         * */
        @Bean
        public Binding newMerchantBinding(){
            return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                    NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null);
        }
    }
    View Code

    b. 监听死信队列代码

    import com.rabbitmq.client.Channel;
    import com.theng.shopuser.config.RabbitmqConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * 消费者监听死信队列
     *
     */
    @Component
    @RabbitListener(queues = RabbitmqConfig.LOCK_MERCHANT_DEAD_QUEUE)
    public class OrderMQListener {
    
        /**
         * body: 接收convertAndSend(String exchange, String routingKey, Object object)的object消息
         *
         * */
        @RabbitHandler
        public void messageHandler(String body, Message message, Channel channel) throws IOException {
    
            long msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("body: " + body);
            System.out.println("msgTag: " + msgTag);
            System.out.println("message: " + message.toString());
    
            //30分钟后,从body中获取买家信息再从数据库查询抢购到的商品订单是否处理 TODO
            //如果没有处理,则向商家发送提醒消息 TODO
    
            //告诉broker(消息队列服务器实体),消息已经被确认
            channel.basicAck(msgTag, false);
            //告诉broker,消息拒绝确认(可以拒绝多条,把比当前msgTag值小的也拒绝)
    //        channel.basicNack(msgTag, false, true);
            //告诉broker,消息拒绝确认(只能拒绝当前msgTag的这条)
    //        channel.basicReject(msgTag, true);
        }
    }
    View Code

    c. application.ym配置文件

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: 123456
        #虚拟主机 可在http://localhost:15672管理平台进行配置
        virtual-host: /dev
        #开启消息二次确认ConfirmCallback配置
        publisher-confirms: true
        #开启ReturnCallback配置
        publisher-returns: true
        #修改交换机改投消息递到队列失败策略
        #true:交换机处理消息到队列失败,则返回给生产者
        #和publisher-returns配合使用
        template:
          mandatory: true
        #消息手工确认ack
        listener:
          simple:
            acknowledge-mode: manual
    View Code

    d. 控制器代码

    @RestController
    @RequestMapping("/user-info")
    public class UserInfoController {
    
        @Autowired
        public RedisTemplate redisTemplate;  
    
        //消息生产者
        @GetMapping("/send")
        public Object testSend(){
            //object可存储买家信息
            rabbitTemplate.convertAndSend(RabbitmqConfig.NEW_MERCHANT_EXCHANGE, "new_merchant.create", "买家抢购成功,请及时处理订单!");
    
            Map<String, Object> map = new HashMap<>();
            map.put("code", 0);
            map.put("msg", "买家抢购成功,请在30分钟内提交订单!");
            return "success";
        }
    }
    View Code

    结果:

    生产者发送消息10秒后,消息会进入死信交换机,通过死信队列将订单过期消息发送给消费者

  • 相关阅读:
    Shell编程基础
    lenovo future leaer deveolpmetn program
    求1+2+...+n
    Linux下使用qq
    判断2个线段是否相交
    java大数相加
    Django路由系统
    Django框架
    HTTP协议及Django配置
    mysql索引
  • 原文地址:https://www.cnblogs.com/tianhengblogs/p/15384506.html
Copyright © 2011-2022 走看看