zoukankan      html  css  js  c++  java
  • RabbitMQ 死信队列 + TTL介绍

    一、RabbitMQ的的死信队列+ TTL

    1、什么是TTL

    • time to live 消息存活时间
    • 如果消息在存活时间内未被消费,则会被清除
    • RabbitMQ支持两种ttl设置
      • 单独消息进行配置ttl
      • 整个队列进行配置ttl(居多)

    2、什么是rabbitmq的死信队列

    • 没有被及时消费的消息存放的队列

    3、什么是rabbitmq的死信交换机

    • Dead Letter Exchange(死信交换机,缩写: DLX)当消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。

    4、消息有哪几种情况成为死信

    • 消费者拒收消息basic.reject/ basic.nack,并且没有重新入队 requeue=false
    • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
    • 队列的消息长度达到极限
    • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

    二、RabbitMQ管控台消息TTL测试

    队列过期时间使用参数,对整个队列消息统⼀过期

    • x-message-ttl
    • 单位ms(毫秒)

    消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,消息还在队列里面)

    • expiration
    • 单位ms(毫秒)

    两者都配置的话,时间短的先触发

    1、RabbitMQ Web控制台测试

    新建死信交换机(和普通没区别)

    新建死信队列 (和普通没区别)

    死信交换机和队列绑定

     

    新建普通队列,设置过期时间、指定死信交换机

    测试:直接web控制台往product_qeueu发送消息即可,会看到消息先是在product_qeueu队列停留10秒(因为没有消费者消费),然后该消息从product_qeueu移入到dead_queue。

    三、RabbitMQ的延迟队列和应用场景

    1、什么是延迟队列

    ⼀种带有延迟功能的消息队列, Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某⼀个时间投递到Consumer进行消费,该消息即定时消息。

    2、使用场景

    • 通过消息触发⼀些定时任务,比如在某⼀固定时间点向用户发送提醒消息
    • 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
    • 消息生产和消费有时间窗⼝要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送⼀条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

    四、实战

    1、背景

    JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。

    2、代码开发

    死信交换机和死信队列开发,topic交换机和队列开发,绑定死信交换机

    package net.xdclass.xdclasssp.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitMQConfig {
        //死信队列
        public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    
        //死信交换机
        public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    
        //进入死信队列的路由key
        public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
    
        //创建死信交换机
        @Bean
        public Exchange lockMerchantDeadExchange() {
            return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
        }
    
        //创建死信队列
        @Bean
        public Queue lockMerchantDeadQueue() {
            return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
        }
    
        //绑定死信交换机和死信队列
        @Bean
        public Binding lockMerchantBinding() {
            return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
                    LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
        }
    
        //普通队列,绑定的个死信交换机
        public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    
        //普通的topic交换机
        public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    
        //路由key
        public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
    
        //创建普通交换机
        @Bean
        public Exchange newMerchantExchange() {
            return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
        }
    
        //创建普通队列
        @Bean
        public Queue newMerchantQueue() {
            Map<String, Object> args = new HashMap<>(3);
            //消息过期后,进入到死信交换机
            args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
            //消息过期后,进入到死信交换机的路由key
            args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
            //过期时间,单位毫秒
            args.put("x-message-ttl", 10000);
            return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
        }
    
        //绑定交换机和队列
        @Bean
        public Binding newMerchantBinding() {
            return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                    NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null);
        }
    }

    消息生产和消费

    • 消息生产
      • 投递到普通的topic交换机
      • 消息过期,进入死信交换机
    • 消息消费
      • 消费者监听死信交换机的队列

    MerchantAccountController 模拟请求

    @RestController
    @RequestMapping("/api/admin/merchant")
    public class MerchantAccountController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("check")
        public Object check(){
    
            //修改数据库的商家账号状态  TODO
    
            rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");
    
            Map<String,Object> map = new HashMap<>();
            map.put("code",0);
            map.put("msg","账号审核通过,请10秒内上传1个商品");
            return map;
        }
    }

    MerchantMQListener消费

    package net.xdclass.xdclasssp.mq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    @RabbitListener(queues = "lock_merchant_dead_queue")
    public class MerchantMQListener {
    
        @RabbitHandler
        public void messageHandler(String body, Message message, Channel channel) throws IOException {
    
            long msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag="+msgTag);
            System.out.println("body="+body);
            //做复杂业务逻辑  TODO
    
            //告诉broker,消息已经被确认
            channel.basicAck(msgTag,false);
        }
    }
  • 相关阅读:
    字节跳动--今日头条iOS客户端启动速度优化
    RSA加密
    几种浏览器
    Linux定时任务crontab无法执行
    Python报错ImportError: No Module Named Typing的解决
    微信小程序:A、B两个小程序相互跳转,出现点击A小程序底部导航栏菜单,第一次点击无法跳转B小程序,需要点击第二次才会触发跳转到B小程序
    c# core 生成随机图文验证码
    携程Apollo统一配置管理中心
    WPF程序中嵌入winForm窗体
    sqlserver 转 postgresql
  • 原文地址:https://www.cnblogs.com/jwen1994/p/14377549.html
Copyright © 2011-2022 走看看