zoukankan      html  css  js  c++  java
  • Native RabbitMQ Backup Exchange

    备用交换器,在消息无法路由的时候可以使用失败者通知,还可以使用备用交换器来路由消息。如果备用交换器和mandatory一起开启,那么不会有失败者通知了,因为消息还是可以路由的。

    生产者

    /**
     * @author zhangjianbing
     * @since 2020/9/7
     */
    @SuppressWarnings("Duplicates")
    public class BeProducer {
    
        public final static String EXCHANGE_NAME = "normal_exchange";
    
        public final static String BACKUP_EXCHANGE_NAME = "backup_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 创建连接工厂
            ConnectionFactory connectionFactory = RabbitConnectionFactory.getFactory();
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
    
            // 声明备用交换器
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
            // 声明成FANOUT交换器,接收所有消息
            channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, null);
    
            // 声明主交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, argsMap);
    
            // 随机队列
            // String queueName = channel.queueDeclare().getQueue();
            // 声明一个普通的队列
            String queueName = "NORMAL.CALLBACK.QUEUE";
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 声明备用队列
            String backupQueueName = "BACKUP.CALLBACK.QUEUE";
            channel.queueDeclare(backupQueueName, true, false, false, null);
    
            // 设置路由键,将队列和交换器通过路由键绑定在一起
            String routeKey = "normal_routeKey";
            channel.queueBind(queueName, EXCHANGE_NAME, routeKey);
    
            // 备用交换器binding
            channel.queueBind(backupQueueName, BACKUP_EXCHANGE_NAME, "");
    
            // 消息
            String message = "hello rabbit message queue";
            // 发送消息(其中两条不能路由)
            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "zhangsan", null, message.getBytes());
            channel.basicPublish(EXCHANGE_NAME, "lisi", null, message.getBytes());
            System.out.println("消息发送成功。。。。。。");
    
            // 关闭信道和连接
            channel.close();
            connection.close();
    
        }
    
    }
    

    消费者

    /**
     * @author zhangjianbing
     * @since 2020/9/7
     */
    @SuppressWarnings("Duplicates")
    public class BeConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = RabbitConnectionFactory.getFactory();
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
    
            System.out.println("正在等待消息。。。。。。");
    
            // 声明一个消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("【路由键】:" + envelope.getRoutingKey() + "【消息内容】:" + new String(body, StandardCharsets.UTF_8));
                }
            };
    
            // 指定队列去消费
            String queueName = "NORMAL.CALLBACK.QUEUE";
            // 消费者正式开始在指定队列上消费[队列名称、自动提交、消费者]
            channel.basicConsume(queueName, true, consumer);
    
        }
    
    }
    

    备用消费者

    /**
     * @author zhangjianbing
     * @since 2020/9/7
     */
    @SuppressWarnings("Duplicates")
    public class BackupConsumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = RabbitConnectionFactory.getFactory();
            // 创建连接
            Connection connection = connectionFactory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
    
            System.out.println("正在等待消息。。。。。。");
    
            // 声明一个消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    System.out.println("【路由键】:" + envelope.getRoutingKey() + "【消息内容】:" + new String(body, StandardCharsets.UTF_8));
                }
            };
    
            // 指定队列去消费
            String queueName = "BACKUP.CALLBACK.QUEUE";
            // 消费者正式开始在指定队列上消费[队列名称、自动提交、消费者]
            channel.basicConsume(queueName, true, consumer);
    
        }
    
    }
    

    测试结果

    生产者:
    消息发送成功。。。。。。
    
    消费者:
    正在等待消息。。。。。。
    【路由键】:normal_routeKey【消息内容】:hello rabbit message queue
    
    备用消费者:
    正在等待消息。。。。。。
    【路由键】:zhangsan【消息内容】:hello rabbit message queue
    【路由键】:lisi【消息内容】:hello rabbit message queue
    
  • 相关阅读:
    SharePoint 2010 网站备份还原简单介绍
    SharePoint 2010 常用技巧及方法总结
    Javascript 中的window.parent ,window.top,window.self 详解
    SharePoint2013 此产品的试用期已结束
    SharePoint 2013 showModalDialog 弹出模式窗口
    SharePoint文档库,如何在新窗口打开中的文件
    sharepoint2013- Office web app server2013详细的安装和部署
    SharePoint 2013 入门教程之入门手册
    SharePoint 解决打开浏览器自动登录
    解决IIS进程回收后第一次访问慢的问题
  • 原文地址:https://www.cnblogs.com/zhangjianbing/p/13629711.html
Copyright © 2011-2022 走看看