zoukankan      html  css  js  c++  java
  • RabbitMQ 死信队列DLX

    死信队列的简单介绍

    利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx
    消息变成死信的以下几种情况
    消息被拒绝,并且requeue= false
    消息ttl过期
    队列达到最大的长度
    dlx也是一个正常的exchange,和一般的exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
    当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上,进而被路由到另一个队列。
    可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitMq3.0以前支持的immediate参数的功能。

    消费端:

    package com.flying.rabbitmq.api.dlx;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            // 这就是一个普通的交换机 和 队列 以及路由
            String exchangeName = "test_dlx_exchange";
            String routingKey = "dlx.#";
            String queueName = "test_dlx_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            
            Map<String, Object> agruments = new HashMap<String, Object>();
            agruments.put("x-dead-letter-exchange", "dlx.exchange");
            //这个agruments属性,要设置到声明队列上
            channel.queueDeclare(queueName, true, false, false, agruments);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //要进行死信队列的声明:
            channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
            channel.queueDeclare("dlx.queue", true, false, false, null);
            channel.queueBind("dlx.queue", "dlx.exchange", "#");
            
            channel.basicConsume(queueName, true, new MyConsumer(channel));
            
            
        }
    }

    自定义消费端:

    package com.flying.rabbitmq.api.dlx;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    public class MyConsumer extends DefaultConsumer {
    
    
        public MyConsumer(Channel channel) {
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    
    
    }

    生产端:

    package com.flying.rabbitmq.api.dlx;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_dlx_exchange";
            String routingKey = "dlx.save";
            
            String msg = "Hello RabbitMQ DLX Message";
            
            for(int i =0; i<1; i ++){
                
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .expiration("10000")
                        .build();
                channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
            }
            
        }
    }
  • 相关阅读:
    .net日期类与UNIX时间戳的相互转换,长数字
    钉钉的生日模块在哪
    js判断手机是苹果(IOS)还是安卓(android) H5手机端自适应宽高
    .net网站部署winserver2008R2 IIS只列出目录 浏览只显示目录浏览
    ajax有时请求不到数据 后台,有时收不到返回值的解决办法
    overflow不超出时不显示滚动条 属性解决内容未超出依然显示滚动条轨道的问题
    PB取datawindow生成的语句。要在datawindow的sqlpreview事件
    电脑C盘缓存路径在哪,清理C盘哪个文件夹可以删
    PB里执行写SQL语句
    SQL SERVER合并行。将多行数据合并成一行,字符串拼接
  • 原文地址:https://www.cnblogs.com/lflying/p/11107435.html
Copyright © 2011-2022 走看看