zoukankan      html  css  js  c++  java
  • RabbitMQ死信队列

    摘自:https://www.cnblogs.com/toov5/p/10288260.html

    关于RabbitMQ死信队列

    死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 因为下列原因:

    消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

    消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

    队列超载


    变成了 “死信” 后    被重新投递(publish)到另一个Exchange   该Exchange 就是DLX     然后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就可以重新消费     说白了 就是  没有被消费的消息  换个地方重新被消费

    生产者   -->  消息 --> 交换机  --> 队列  --> 变成死信  --> DLX交换机 -->队列 --> 消费者


    什么是死信呢?什么样的消息会变成死信呢?

    消息被拒绝(basic.reject或basic.nack)并且requeue=false.

    消息TTL过期

    队列达到最大长度(队列满了,无法再添加数据到mq中)


    应用场景分析

    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    死信队列 听上去像 消息“死”了 ,其实也有点这个意思,
    死信队列 是 当消息在一个队列 因为下列原因:
    1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    2.消息TTL过期
    3.队列达到最大长度(队列满了,无法再添加数据到mq中)
    应用场景分析
    在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息


    如何使用死信交换机呢?

    定义业务(普通)队列的时候指定参数

    x-dead-letter-exchange: 用来设置死信后发送的交换机

    x-dead-letter-routing-key:用来设置死信的routingKey

    如果高并发情况到来  某一个队列比如邮件队列满了 或者异常  或者消息过期 或者消费者拒绝消息

    邮件队列 绑定一个死信交换机  一旦邮件队列满了的情况下  为了防止数据丢失情况   消息不再邮件队列存放了 放到死信交换机 然后交给死信邮件队列  最终交给 死信消费者


     步骤:

     1、 创建 死信交换机  死信队列  以及绑定

        之前的队列没有绑定死信队列和死信交换机 不能做更改绑定死信交互机

        之前创建好的邮件队列 删除掉  已经创建好的队列不能做更改  交换机也清理掉

       config:

    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    //Fanout 类型 发布订阅模式
    @Component
    public class FanoutConfig {
    
        /**
         * 定义死信队列相关信息
         */
        public final static String deadQueueName = "dead_queue";
        public final static String deadRoutingKey = "dead_routing_key";
        public final static String deadExchangeName = "dead_exchange";
        /**
         * 死信队列 交换机标识符
         */
        public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
        /**
         * 死信队列交换机绑定键标识符
         */
        public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    
        // 邮件队列
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
    
        // 短信队列
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        // fanout 交换机
        private String EXCHANGE_NAME = "fanoutExchange";
    
        // 1.定义邮件队列
        @Bean
        public Queue fanOutEamilQueue() {
            // 将普通队列绑定到死信队列交换机上
            Map<String, Object> args = new HashMap<>(2);
            args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
            args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
            Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
            return queue;
        }
    
        // 2.定义短信队列
        @Bean
        public Queue fanOutSmsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }
    
        // 2.定义交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }
    
        // 3.队列与交换机绑定邮件队列
        @Bean
        Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
        }
    
        // 4.队列与交换机绑定短信队列
        @Bean
        Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
        }
    
        /**
         * 创建配置死信邮件队列
         * 
         * @return
         */
        @Bean
        public Queue deadQueue() {
            Queue queue = new Queue(deadQueueName, true);
            return queue;
        }
       /*
        * 创建死信交换机
        */
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(deadExchangeName);
        }
       /*
        * 死信队列与死信交换机绑定
        */
        @Bean
        public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
            return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
        }
    
    }
    

     


      

    生产者  timestamp 设置为0 

    package com.itmayiedu.rabbitmq;
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.alibaba.fastjson.JSONObject;
    
    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void send(String queueName) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("email", "xx@163.com");
            jsonObject.put("timestamp", 0);
            String jsonString = jsonObject.toJSONString();
            System.out.println("jsonString:" + jsonString);
            // 设置消息唯一id 保证每次重试消息id唯一  
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
                    .setMessageId(UUID.randomUUID() + "").build(); //消息id设置在请求头里面 用UUID做全局ID 
            amqpTemplate.convertAndSend(queueName, message);
        }
    }
    

      

    @RabbitListener(queues = "fanout_email_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            
            JSONObject jsonObject = JSONObject.parseObject(msg);
            Integer timestamp = jsonObject.getInteger("timestamp");
            
            try {
                int result  = 1/timestamp;
                System.out.println("result"+result);
                // // 手动ack
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                // 手动签收
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                //拒绝消费消息(丢失消息) 给死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
            
            System.out.println("执行结束....");
        }
    

      


    添加死信队列的消费者,并启动后:

    package com.itmayiedu.rabbitmq;
    
    import java.util.Map;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    //死信队列
    @Component
    public class FanoutDeadEamilConsumer {    
        
        @RabbitListener(queues = "dead_queue")
        public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
            // 获取消息Id
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("死信邮件消费者获取生产者消息msg:"+msg+",消息id"+messageId);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
           // 手动签收
           channel.basicAck(deliveryTag, false);
            
            System.out.println("执行结束....");
        }    
        
    }
    

      

  • 相关阅读:
    BootstrapValidator 解决多属性被同时校验问题《转》
    SSRS 浮动表头设置
    ToString(string format)输出格式简述
    配置AutoMapper映射规则《转》
    IE浏览器上传图片预览兼容(IE 7 8 9 10 11)
    SQL : IN 和 Exists 的区别
    BitArray简单例子
    Rx.net 例子——(1)基础
    C# Pinvoke判断是UEFI模式还是BIOS模式
    wpf Route Event Code Snippet
  • 原文地址:https://www.cnblogs.com/xinruyi/p/11220525.html
Copyright © 2011-2022 走看看