zoukankan      html  css  js  c++  java
  • RabbitMQ exchange交换机类型

    Exchange:交互机,根据路由键转发消息到绑定的队列。

    Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。

    消费者

    package com.flying.rabbitmq.api.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct111";
            
            //表示声明了一个交换机
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系:
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }

    生产者

    package com.flying.rabbitmq.api.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer4DirectExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct111";
            //5 发送
            
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
            
        }
        
    }

    运行结果

    若 routingKey不相同 则消费者接受不到消息 消息将被丢弃

    Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。

    消费者:

    package com.flying.rabbitmq.api.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Consumer4TopicExchange {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.*"; //user.*只能接受 user.XXX    user.# 可接受任意 user.**.**
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType,false,false,null);
            //声明队列
            channel.queueDeclare(queueName,false,false,true,null);
            //建立交换机和队列的绑定关系
            channel.queueBind(queueName,exchangeName,routingKey);
            //durable是否持久化消息
            QueueingConsumer consumer=new QueueingConsumer(channel);
            channel.basicConsume(queueName,true,consumer);
            while (true){
                QueueingConsumer.Delivery delivery=consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("收到消息:" + msg);
            }
        }
    }

    生产者

    package com.flying.rabbitmq.api.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * topic  key进行模式匹配,
     */
    public class Producer4TopicExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4.声明
            String exchangeName="test_topic_exchange";
            String routingKey1="user.save";
            String routingKey2="user.update";
            String routingKey3="user.delete.abc";
            //5 发送
            String msg="hello Rabbitmq topic exchange message";
            channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
            channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
            channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());
            channel.close();
            connection.close();
        }
        
    }

    运行结果

    只接受到两条消息  user.delete.abc 发送的消息并没接收到

    Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key

    消费者

    package com.flying.rabbitmq.api.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    /**
     * fanout 直连 不用绑定routingKey
     */
    public class Consumer4FanoutExchange {
    
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }

    生产者

    package com.flying.rabbitmq.api.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer4FanoutExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                channel.basicPublish(exchangeName, "", null , msg.getBytes());             
            }
            channel.close();  
            connection.close();  
        }
        
    }
  • 相关阅读:
    mysql 查看表注解
    oracle 相关
    sql version control
    ccna
    msql 清库
    mybatisplus,application.properties 配置数据库密码加密
    驱动开发print无法输出问题
    bochs帮助
    以虚御虚用虚拟机调试vt程式
    ssm整合
  • 原文地址:https://www.cnblogs.com/lflying/p/11107220.html
Copyright © 2011-2022 走看看