zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件(第二章)第三部分-笔记-Exchange 交换机

    Exchange 交换机

    • Exchange: 接收消息,并根据路由键转发消息到所绑定的队列

      交换机属性

    • Name: 交换机名称
    • Type: 交换机类型 direct、topic、fanout、headers
    • Durability: 是否需要持久化,true为持久化
    • Auto Delete: 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
    • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false --一般设置为false,除非你需要使用erlang语言扩展插件就可能需要将值设置为true来使用
    • Arguments: 扩展参数,用于扩展AMQP协议自制定化使用

      Durability Exchange (直连交换机)

    • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

       注:Direct模式可以使用RabbitMQ自带的Exchange: defalut Exchange,所有不需要将Exchange进行任何绑定(binging)操作,消息传递时,RoueKey必须

         完全匹配才会被队列接收,否则该消息会被抛弃。

      这里说明routingkey和queues里的队列名要相一致

      以下为直连交换机的代码

    package com.cx.temp.common.rabbitmq.direct;
    
    import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消费端(直连)
     */
    public class Consumer4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
            connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
    
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称,是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while (true) {
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
    
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 生产端(直连)
     */
    public class Producter4DirectExchange {
    
        public static void main(String[] args) throws Exception {
    
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
            //5 发送
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    
    }

     Topic Exchange (主题交换机)

    • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
    • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

      注:可以使用通配符进行模糊匹配

    符号 “#” 匹配一个或者多个词

    符号 “*” 匹配不多不少一个词

    例如:“log.#” 能匹配到 “log.info.oa”

       “log.*” 只会匹配到 “log.erro”

     

    以下为主题交换机的代码

    package com.cx.temp.common.rabbitmq.topic;
    
    import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 主题交换机-消费端
     */
    public class Consumer4TopicExchange {
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
            connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
    //        String routingKey = "user.#"; //user.save  user.update   user.delete.abc 可以接收到三个routingKey
            String routingKey = "user.*"; // user.save  user.update 可以接收到二个routingKey
    
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称,是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while (true) {
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 主题交换机-生产端
     */
    public class Procuder4TopicExchange {
    
        public static void main(String[] args) throws Exception {
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            //5 发送
            String msg = "Hello World RabbitMQ 4  Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
    
            channel.close();
            connection.close();
        }
    
    
    }
    String routingKey = "user.#"; //user.save  user.update   user.delete.abc 可以接收到三个routingKey,执行结果

    String routingKey = "user.*"; // user.save  user.update 可以接收到二个routingKey

     

    注:这时候还收到三条消息,原因是之前使用了user.#,还在应用中,需要将其解绑

     解绑后在重新执行生产者

     Fanout Exchange (广播交换机)

    • 不处理路由键,只需要简单的将队列绑定到交换机上
    • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上、
    • Fanout交换机转发消息是最快的

     

    以下为广播交换机的代码

    package com.cx.temp.common.rabbitmq.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 广播交换机-生产端
     */
    public class Procuder4FanoutExchange {
    
        public static void main(String[] args) throws Exception {
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for (int i = 0; i < 10; i++) {
                String msg = "Hello World RabbitMQ 4  Fanout Exchange Message ...";
                channel.basicPublish(exchangeName, "", null, msg.getBytes()); //这里配置成"",也可以配置成其他的,Fanout不会走路由键,所以配置了也无效
            }
    
            channel.close();
            connection.close();
        }
    
    
    }
    package com.cx.temp.common.rabbitmq.fanout;
    
    import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 广播交换机-消费端
     */
    public class Consumer4FanoutExchange {
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
            connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
            //声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = ""; // 不设置路由键
    
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称,是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息
            while (true) {
                //获取消息,如果没有消息,这一步将会一直阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
    
        }
    
    }

    执行结果

    此外还有以headers为路由的交换器,用途较少,这里就不介绍了



  • 相关阅读:
    Thymeleaf+Spring使用自己的工具类
    bootstrap 响应式布局
    bootstrap 流布局
    bootstrap 布局
    bootstrap 新建网页
    quick 定时更新函数
    acm hdoj 1157
    acm hdoj 今年暑假不ac
    quick removeTileMaptile
    quick schedule 的添加和移除
  • 原文地址:https://www.cnblogs.com/huihui-hui/p/14261607.html
Copyright © 2011-2022 走看看