zoukankan      html  css  js  c++  java
  • RabbitMQ死信队列

    摘自:https://www.cnblogs.com/toov5/p/10288260.html

    关于RabbitMQ死信队列

    死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

    消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

    消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

    队列超载


    变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

    生产者   -->  消息 --> 交换机  --> 队列  --> 变成死信  --> DLX交换机 -->队列 --> 消费者


    什么是死信呢?什么样的消息会变成死信呢?

    消息被拒绝(basic.reject或basic.nack)并且requeue=false.

    消息TTL过期

    队列达到最大长度(队列满了,无法再添加数据到mq中)


    应用场景分析

    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    死信队列 听上去像 消息“死”了 ,其实也有点这个意思,
    死信队列 是 当消息在一个队列 因为下列原因:
    1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    2.消息TTL过期
    3.队列达到最大长度(队列满了,无法再添加数据到mq中)
    应用场景分析
    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息


    如何使用死信交换机呢?

    定义业务(普通)队列的时候指定参数

    x-dead-letter-exchange: 用来设置死信后发送的交换机

    x-dead-letter-routing-key:用来设置死信的routingKey

    如果高并发情况到来  某一个队列比如邮件队列满了 或者异常  或者消息过期 或者消费者拒绝消息

    邮件队列 绑定一个死信交换机  一旦邮件队列满了的情况下  为了防止数据丢失情况   消息不再邮件队列存放了 放到死信交换机 然后交给死信邮件队列  最终交给 死信消费者


     步骤:

     1、 创建 死信交换机  死信队列  以及绑定

        之前的队列没有绑定死信队列和死信交换机 不能做更改绑定死信交互机

        之前创建好的邮件队列 删除掉  已经创建好的队列不能做更改  交换机也清理掉

       config:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    import java.util.HashMap;
    import java.util.Map;
     
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
     
    //Fanout 类型 发布订阅模式
    @Component
    public class FanoutConfig {
     
        /**
         * 定义死信队列相关信息
         */
        public final static String deadQueueName = "dead_queue";
        public final static String deadRoutingKey = "dead_routing_key";
        public final static String deadExchangeName = "dead_exchange";
        /**
         * 死信队列 交换机标识符
         */
        public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
        /**
         * 死信队列交换机绑定键标识符
         */
        public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
     
        // 邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
     
        // 短信队列
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        // fanout 交换机
        private String EXCHANGE_NAME = "fanoutExchange";
     
        // 1.定义邮件队列
        @Bean
        public Queue fanOutEamilQueue() {
            // 将普通队列绑定到死信队列交换机上
            Map<String, Object> args = new HashMap<>(2);
            args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
            args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
            Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
            return queue;
        }
     
        // 2.定义短信队列
        @Bean
        public Queue fanOutSmsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }
     
        // 2.定义交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }
     
        // 3.队列与交换机绑定邮件队列
        @Bean
        Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
        }
     
        // 4.队列与交换机绑定短信队列
        @Bean
        Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
        }
     
        /**
         * 创建配置死信邮件队列
         *
         * @return
         */
        @Bean
        public Queue deadQueue() {
            Queue queue = new Queue(deadQueueName, true);
            return queue;
        }
       /*
        * 创建死信交换机
        */
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(deadExchangeName);
        }
       /*
        * 死信队列与死信交换机绑定
        */
        @Bean
        public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
            return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
        }
     
    }

     


      

    生产者  timestamp 设置为0 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    package com.itmayiedu.rabbitmq;
     
    import java.util.UUID;
     
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
     
    import com.alibaba.fastjson.JSONObject;
     
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
     
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", 0);
            String jsonString = jsonObject.toJSONString();
            System.out.println("jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一 
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID
            amqpTemplate.convertAndSend(queueName, message);
        }
    }

      

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @RabbitListener(queues = "fanout_email_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
             
            JSONObject jsonObject = JSONObject.parseObject(msg);
            Integer timestamp = jsonObject.getInteger("timestamp");
             
            try {
                int result  = 1/timestamp;
                System.out.println("result"+result);
                // // 手动ack
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                // 手动签收
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //拒绝消费消息(丢失消息) 给死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
             
            System.out.println("执行结束....");
        }

      


    添加死信队列的消费者,并启动后:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    package com.itmayiedu.rabbitmq;
     
    import java.util.Map;
     
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
     
    import com.rabbitmq.client.Channel;
     
    //死信队列
    @Component
    public class FanoutDeadEamilConsumer {   
         
        @RabbitListener(queues = "dead_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("死信邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
           // 手动签收
           channel.basicAck(deliveryTag, false);
             
            System.out.println("执行结束....");
        }   
         
    }

      

  • 相关阅读:
    适配器和外观模式
    命令模式
    单件模式
    工厂模式
    装饰者模式
    观察者模式(发布--订阅模式)
    设计模式之策略模式
    C#学习笔记15
    C#学习笔记14
    lucky的时光助理-2017.02
  • 原文地址:https://www.cnblogs.com/tiancai/p/13161166.html
Copyright © 2011-2022 走看看