zoukankan      html  css  js  c++  java
  • RabbitMQ死信队列

    摘自:https://www.cnblogs.com/toov5/p/10288260.html

    关于RabbitMQ死信队列

    死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

    消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

    消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

    队列超载


    变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

    生产者   -->  消息 --> 交换机  --> 队列  --> 变成死信  --> DLX交换机 -->队列 --> 消费者


    什么是死信呢?什么样的消息会变成死信呢?

    消息被拒绝(basic.reject或basic.nack)并且requeue=false.

    消息TTL过期

    队列达到最大长度(队列满了,无法再添加数据到mq中)


    应用场景分析

    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    死信队列 听上去像 消息“死”了 ,其实也有点这个意思,
    死信队列 是 当消息在一个队列 因为下列原因:
    1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    2.消息TTL过期
    3.队列达到最大长度(队列满了,无法再添加数据到mq中)
    应用场景分析
    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息


    如何使用死信交换机呢?

    定义业务(普通)队列的时候指定参数

    x-dead-letter-exchange: 用来设置死信后发送的交换机

    x-dead-letter-routing-key:用来设置死信的routingKey

    如果高并发情况到来  某一个队列比如邮件队列满了 或者异常  或者消息过期 或者消费者拒绝消息

    邮件队列 绑定一个死信交换机  一旦邮件队列满了的情况下  为了防止数据丢失情况   消息不再邮件队列存放了 放到死信交换机 然后交给死信邮件队列  最终交给 死信消费者


     步骤:

     1、 创建 死信交换机  死信队列  以及绑定

        之前的队列没有绑定死信队列和死信交换机 不能做更改绑定死信交互机

        之前创建好的邮件队列 删除掉  已经创建好的队列不能做更改  交换机也清理掉

       config:

    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    //Fanout 类型 发布订阅模式
    @Component
    public class FanoutConfig {
    
        /**
         * 定义死信队列相关信息
         */
        public final static String deadQueueName = "dead_queue";
        public final static String deadRoutingKey = "dead_routing_key";
        public final static String deadExchangeName = "dead_exchange";
        /**
         * 死信队列 交换机标识符
         */
        public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
        /**
         * 死信队列交换机绑定键标识符
         */
        public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    
        // 邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
    
        // 短信队列
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        // fanout 交换机
        private String EXCHANGE_NAME = "fanoutExchange";
    
        // 1.定义邮件队列
        @Bean
        public Queue fanOutEamilQueue() {
            // 将普通队列绑定到死信队列交换机上
            Map<String, Object> args = new HashMap<>(2);
            args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
            args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
            Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
            return queue;
        }
    
        // 2.定义短信队列
        @Bean
        public Queue fanOutSmsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }
    
        // 2.定义交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }
    
        // 3.队列与交换机绑定邮件队列
        @Bean
        Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
        }
    
        // 4.队列与交换机绑定短信队列
        @Bean
        Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
        }
    
        /**
         * 创建配置死信邮件队列
         * 
         * @return
         */
        @Bean
        public Queue deadQueue() {
            Queue queue = new Queue(deadQueueName, true);
            return queue;
        }
       /*
        * 创建死信交换机
        */
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(deadExchangeName);
        }
       /*
        * 死信队列与死信交换机绑定
        */
        @Bean
        public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
            return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
        }
    
    }
    

     


      

    生产者  timestamp 设置为0 

    package com.itmayiedu.rabbitmq;
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", 0);
            String jsonString = jsonObject.toJSONString();
            System.out.println("jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一  
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID 
            amqpTemplate.convertAndSend(queueName, message);
        }
    }
    

      

    @RabbitListener(queues = "fanout_email_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            
            JSONObject jsonObject = JSONObject.parseObject(msg);
            Integer timestamp = jsonObject.getInteger("timestamp");
            
            try {
                int result  = 1/timestamp;
                System.out.println("result"+result);
                // // 手动ack
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                // 手动签收
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //拒绝消费消息(丢失消息) 给死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
            
            System.out.println("执行结束....");
        }
    

      


    添加死信队列的消费者,并启动后:

    package com.itmayiedu.rabbitmq;
    
    import java.util.Map;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    //死信队列
    @Component
    public class FanoutDeadEamilConsumer {    
        
        @RabbitListener(queues = "dead_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("死信邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
           // 手动签收
           channel.basicAck(deliveryTag, false);
            
            System.out.println("执行结束....");
        }    
        
    }
    

      

  • 相关阅读:
    AcWing 1027. 方格取数 dp
    AcWing 1014. 登山 dp
    acwing 482. 合唱队形 dp
    LeetCode 1463. 摘樱桃II dp
    LeetCode 100. 相同的树 树的遍历
    LeetCode 336. 回文对 哈希
    LeetCode 815. 公交路线 最短路 哈希
    算法问题实战策略 DARPA大挑战 二分
    算法问题实战策略 LUNCHBOX 贪心
    AcWing 1100. 抓住那头牛 BFS
  • 原文地址:https://www.cnblogs.com/xinruyi/p/11220525.html
Copyright © 2011-2022 走看看