zoukankan      html  css  js  c++  java
  • RabbitMQ核心API

    一、Exchange 交换机

    2.交换机类型

       (一) Direct Exchange 直连

      2)Direct 生产者

        生产者只关心 Exchange

    package com.bfxy.rabbitmq.api.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Sender4DirectExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test_direct_routingKey";
            //5 发送
            
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());         
            
        }
        
    }
    View Code

      3)Direct 消费者

       消费者只关心队列

    package com.bfxy.rabbitmq.api.exchange.direct;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test_direct_routingKey";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    View Code

     4)控制台显示

      

      (二)Topic Exchange

         1、主题

      2、  生产者

          

    package com.bfxy.rabbitmq.api.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Sender4TopicExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            //5 发送
            
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
            channel.close();  
            connection.close();  
        }
        
    }
    View Code

    3、消费者1

      

    package com.bfxy.rabbitmq.api.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver4TopicExchange1 {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            //String routingKey = "user.*";
            String routingKey = "user.#";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //    参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            System.err.println("consumer1 start.. ");
            //    循环获取消息  
            while(true){  
                //    获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
            } 
        }
    }
    View Code

        消费者2

    package com.bfxy.rabbitmq.api.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver4TopicExchange2 {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.*";
    //        String routingKey = "user.#";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            System.err.println("consumer2 start .. ");
            //    循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
            } 
        }
    }
    View Code

    4、Exchange和Queue 最好在控制台中,先定义好。不会乱。

    (三)Fanout Exchange 广播模式

       

       2、生产者

       

    package com.bfxy.rabbitmq.api.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Sender4FanoutExchange {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 10; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                channel.basicPublish(exchangeName, "11" , null , msg.getBytes());             
            }
            channel.close();  
            connection.close();  
        }
        
    }
    View Code

     3.消费者

         消费者1

        

    package com.bfxy.rabbitmq.api.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver4FanoutExchange1 {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //    不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer); 
            System.err.println("--------------- consumer 1 --------------");
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    View Code

       消费者2

       

    package com.bfxy.rabbitmq.api.exchange.fanout;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver4FanoutExchange2 {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = "";    //    不设置路由键
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //durable 是否持久化消息
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            System.err.println("--------------- consumer 2 --------------");
            //循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    View Code

    两个消费者各消费了5条

    二、其他核心概念

      

     虚拟地址,有域的概念。 订单 /order ,用户 /user

  • 相关阅读:
    C#的委托
    解决.net core3.1使用docker部署在Ubuntu上连接sqlserver报error:35的问题
    【WPF学习】第三十六章 样式基础
    asp.net core 3.x 身份验证-3cookie身份验证原理
    C#设计模式学习笔记:(9)组合模式
    Asp.net Core MVC(三)UseMvc设置路由
    C#后台异步消息队列实现
    ASP.NET CORE 内置的IOC解读及使用
    VS2015发布WEB项目
    C#的冒泡排序
  • 原文地址:https://www.cnblogs.com/callbin/p/14542116.html
Copyright © 2011-2022 走看看