zoukankan      html  css  js  c++  java
  • Native RabbitMQ mandatory

    mandatorytrue表示开启失败通知机制,在交换器无法将消息路由到队列的时候会发生失败通知回调,如果声明了备用交换器以后,失败通知回调机制则会失效,是因为消息由备用交换器路由到了其它队列。

    生产者

    注意:这里第三个参数需要设置为true,channel.basicPublish

    /**
     * 发布者失败通知机制
     *
     * @author zhangjianbing
     * time 2020/09/02
     */
    @Slf4j
    public class ProducerMandatory {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("1.1.1.1");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 设置路由键
            String routeKey = "mandatory";
            // 消息
            String message = "hello rabbit message queue";
            // 发送消息(开启mandatory,第三个参数)
            channel.basicPublish(EXCHANGE_NAME, routeKey, true, null, message.getBytes());
    
            // 失败通知回调函数
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                log.info("【失败原因:】", replyText);
                log.info("【发送的消息:】", new String(body));
            });
    
            // 关闭信道和连接
            channel.close();
            connection.close();
        }
    
    }
    

    消费者

    /**
     * 原生消费者:
     *          仅仅关心去哪个队列消费
     *
     * create by zhangjianbing
     * time 2020年9月1日
     */
    @SuppressWarnings("Duplicates")
    public class ConsumerMandatory {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("10.1.3.37");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("beijing");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("beijing");
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
    
            System.out.println("正在等待消息。。。。。。");
    
            // 声明一个消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("【路由键】:" + envelope.getRoutingKey() + "【消息内容】:" + new String(body, StandardCharsets.UTF_8));
                }
            };
    
            // 指定队列去消费
            String queueName = "MANDATORY.CALLBACK.QUEUE";
            // 消费者正式开始在指定队列上消费[队列名称、自动提交、消费者]
            channel.basicConsume(queueName, true, consumer);
    
        }
    
    }
    
  • 相关阅读:
    Json的转换
    Object类型的转为String类型
    如何获取实体类中的属性和属性值
    Collections.sort 的日期排序
    idea 报错 :error:java:Compilation failed:internal java compiler error
    System.nanoTime与System.currentTimeMillis比较
    Java中instanceof和isInstance区别详解
    避免实例化特有工具类
    加载Properties文件工具类:LoadConfig
    详解SVN 的使用
  • 原文地址:https://www.cnblogs.com/zhangjianbing/p/13629319.html
Copyright © 2011-2022 走看看