zoukankan      html  css  js  c++  java
  • RabbitMQ之交换机

    1. 交换机类型

      rabbitmq常见有四种交换机类型: direct, topic, fanout, headers.

      一般headers都不用,工作中用得较多的是fanout,它会将消息推送到所有绑定在此交换机上的队列中,效率也是这几种交换机中最高的。

      交换机是啥? 感觉跟网关差不多,就是路由、转发消息.

      下面具体说说几种交换机的使用

    2. 交换机的使用

    2.1 direct 交换机

      direct: 直连   相当于 1 对 1.  

      生产者----> direct exchange ---> queue ----> 消费者

      

    // 生产端代码
    public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "direct_exchange";
                    // 5. 路由key, 消费端必须与其一致
            String routingKey = "direct.routingKey";
            //5 发送
            
            String msg = "this is direct exchange test... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
            
        }
    // 消费端代码
    public static void main(String[] args) throws Exception {
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明交换机名称 ,注意它必须与生产端一致
            String exchangeName = "direct_exchange";
            String exchangeType = "direct";
            // 队列名称随便取,意思意思 ,绑定就行
            String queueName = "direct_queue";
            // 必须与生产端一致
            String routingKey = "direct.routingKey";
            
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }

       消息只能路由到一个 队列中。。。。

    2.2 topic 交换机

      direct 类型的交换机只能一对一的传递消息,而topic类型的交换机就牛逼了, 它支持糊糊匹配,啥意思呢?

      在direct类型的交换机中,生产端和消息端的routingKey必须一样一样的,否则就不能拿到消息。 而topic类型就不一样了,譬如: 生产端的routingKey是 zheng.qin.feng,消息端的routingKey可以弄成以下这些都行: 

      zheng.#

      #.feng

      zheng.qin.*

      总之,对于topic类型的交换机而言,一切都是看routingKey,如果消息端队列与交换机绑定时routingKey同生产端与交换机绑定的routingKey有一定龌蹉关系,那么消息最终就会投递到该队列中。

      注意: 消息都是存储在队列中的,交换机只会转发消息,不会存储

          # : 任意词匹配

          * : 单词匹配

      

        /**
         * 生产端
         */ 
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "topic_exchange";
            String routingKey1 = "user.mmp";
            String routingKey2 = "user.exchange";
            String routingKey3 = "user.test";
            //5 发送
            String msg = "abc lfkdfjdlfjdlfkdlkfdlf";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
            channel.close();  
            connection.close();  
        }
        
    /**
         *  消费者
         */
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "topic_exchange";
            String exchangeType = "topic";
            String queueName = "topic_queue";
            String routingKey = "user.*";
            // 1 声明交换机 
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 2 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 3 建立交换机和队列的绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }

       消息根据routingKey来决定路由到哪些队列中。。。。。

    2.3 fanout交换机

      这个没啥说的,跟routingKey木有关系,只要是绑定到fanout类型交换机上的队列,都能拿到消息。

      简单的讲: 就是1 对多 的关系,, 比 topic类型的交换机还是滥情,毕竟topic交换机看不上丑女,fanout呢,是个女人都行。

        // 生产者
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "hahahah";
                channel.basicPublish(exchangeName, "", null , msg.getBytes());             
            }
            channel.close();  
            connection.close();  
        }
    // 消息者
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //不设置路由键,设置也可以,无所谓呀, 反正都行
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            //循环获取消息  
            while(true){  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }

       尼玛,只要绑定到此交换机上的队列,都会被路由。。。。

        

      

      

      

  • 相关阅读:
    《Excel效率手册:早做完,不加班》
    《“胡”说IC——菜鸟工程师完美进阶》
    《UNIX环境高级编程(第3版)》
    《学习R》
    《Netty权威指南》
    《啊哈!算法》
    《流程的永恒之道》
    《算法帝国》
    Guava-Optional可空类型
    guava之Joiner 和 Splitter
  • 原文地址:https://www.cnblogs.com/z-qinfeng/p/11575651.html
Copyright © 2011-2022 走看看