zoukankan      html  css  js  c++  java
  • RabbitMQ交换机规则实例

      RabbitMQ Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。下面分别以实例的方式对这几种exchange进行讲解。

    direct

      首先我们以路由的方式对消息进行过滤,代码如下:

    生产者

     1 public class RoutingSendDirect {
     2 
     3     private static final String EXCHANGE_NAME = "direct_test";
     4 
     5     private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
     6 
     7     public static void main(String[] args) throws IOException, TimeoutException {
     8         ConnectionFactory connectionFactory = new ConnectionFactory();
     9         connectionFactory.setHost("localhost");
    10         Connection connection = connectionFactory.newConnection();
    11         Channel channel = connection.createChannel();
    12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    13         for(String key : routingKeys){
    14             String message = "RoutingSendDirect Send the message level:" + key;
    15             channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());
    16             System.out.println("RoutingSendDirect Send"+key +"':'" + message);
    17         }
    18         channel.close();
    19         connection.close();
    20     }
    21 }

    消费者

     1 public class ReceiveDirect1 {
     2     // 交换器名称
     3     private static final String EXCHANGE_NAME = "direct_test";
     4     // 路由关键字
     5     private static final String[] routingKeys = new String[]{"info" ,"warning"};
     6 
     7     public static void main(String[] args) throws IOException, TimeoutException {
     8         ConnectionFactory connectionFactory = new ConnectionFactory();
     9         connectionFactory.setHost("localhost");
    10         Connection connection = connectionFactory.newConnection();
    11         Channel channel = connection.createChannel();
    12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    13         //获取匿名队列名称
    14         String queueName=channel.queueDeclare().getQueue();
    15         for(String key : routingKeys){
    16             channel.queueBind(queueName,EXCHANGE_NAME,key);
    17             System.out.println("ReceiveDirect1 exchange:"+EXCHANGE_NAME+"," +
    18                     " queue:"+queueName+", BindRoutingKey:" + key);
    19         }
    20 
    21         System.out.println("ReceiveDirect1  Waiting for messages");
    22         Consumer consumer = new DefaultConsumer(channel){
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25                 String msg = new String(body,"UTF-8");
    26                 System.out.println("ReceiveDirect1 Received '" + envelope.getRoutingKey() + "':'" + msg + "'");
    27             }
    28         };
    29 
    30         channel.basicConsume(queueName, true, consumer);
    31     }
    32 }
     1 public class ReceiveDirect2 {
     2     // 交换器名称
     3     private static final String EXCHANGE_NAME = "direct_test";
     4     // 路由关键字
     5     private static final String[] routingKeys = new String[]{"error"};
     6 
     7     public static void main(String[] args) throws IOException, TimeoutException {
     8         ConnectionFactory connectionFactory = new ConnectionFactory();
     9         connectionFactory.setHost("localhost");
    10         Connection connection = connectionFactory.newConnection();
    11         Channel channel = connection.createChannel();
    12         channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    13         //获取匿名队列名称
    14         String queueName=channel.queueDeclare().getQueue();
    15         for(String key : routingKeys){
    16             channel.queueBind(queueName,EXCHANGE_NAME,key);
    17             System.out.println("ReceiveDirect2 exchange:"+EXCHANGE_NAME+"," +
    18                     " queue:"+queueName+", BindRoutingKey:" + key);
    19         }
    20 
    21         System.out.println("ReceiveDirect2  Waiting for messages");
    22         Consumer consumer = new DefaultConsumer(channel){
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25                 String msg = new String(body,"UTF-8");
    26                 System.out.println("ReceiveDirect2 Received '" + envelope.getRoutingKey() + "':'" + msg + "'");
    27             }
    28         };
    29 
    30         channel.basicConsume(queueName, true, consumer);
    31     }
    32 }

    运行结果如下:

     1 RoutingSendDirect Sendinfo':'RoutingSendDirect Send the message level:info
     2 RoutingSendDirect Sendwarning':'RoutingSendDirect Send the message level:warning
     3 RoutingSendDirect Senderror':'RoutingSendDirect Send the message level:error
     4 
     5 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:info
     6 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:warning
     7 ReceiveDirect1  Waiting for messages
     8 ReceiveDirect1 Received 'info':'RoutingSendDirect Send the message level:info'
     9 ReceiveDirect1 Received 'warning':'RoutingSendDirect Send the message level:warning'
    10 
    11 ReceiveDirect2 exchange:direct_test, queue:amq.gen-i3NY12l3DqWjGapaBOCdwQ, BindRoutingKey:error
    12 ReceiveDirect2  Waiting for messages
    13 ReceiveDirect2 Received 'error':'RoutingSendDirect Send the message level:error'

    fanout

      fanout和别的MQ的发布/订阅模式类似,实例如下:

    生产者  

     1 public class Pub {
     2     private static final String EXCHANGE_NAME = "logs";
     3     public static void main(String[] args) throws IOException, TimeoutException {
     4         ConnectionFactory factory=new ConnectionFactory();
     5         factory.setHost("localhost");
     6         Connection connection=factory.newConnection();
     7         Channel channel=connection.createChannel();
     8         //fanout表示分发,所有的消费者得到同样的队列信息
     9         channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    10         //分发信息
    11         for (int i=0;i<5;i++){
    12             String message="Hello World"+i;
    13             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
    14             System.out.println("Pub Sent '" + message + "'");
    15         }
    16         channel.close();
    17         connection.close();
    18     }
    19 }

    消费者

    public class Sub {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            //产生一个随机的队列名称
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定
    
            System.out.println("Sub Waiting for messages");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Sub Received '" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);//队列会自动删除
        }
    }

    Topics

    这种应该属于模糊匹配,* :可以替代一个词,#:可以替代0或者更多的词,现在我们继续看看代码来理解

    生产者  

     1 public class TopicSend {
     2     private static final String EXCHANGE_NAME = "topic_logs";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         Connection connection = null;
     6         Channel channel = null;
     7         try{
     8             ConnectionFactory factory=new ConnectionFactory();
     9             factory.setHost("localhost");
    10             connection=factory.newConnection();
    11             channel=connection.createChannel();
    12 
    13             //声明一个匹配模式的交换机
    14             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    15             //待发送的消息
    16             String[] routingKeys=new String[]{
    17                     "quick.orange.rabbit",
    18                     "lazy.orange.elephant",
    19                     "quick.orange.fox",
    20                     "lazy.brown.fox",
    21                     "quick.brown.fox",
    22                     "quick.orange.male.rabbit",
    23                     "lazy.orange.male.rabbit"
    24             };
    25             //发送消息
    26             for(String severity :routingKeys){
    27                 String message = "From "+severity+" routingKey' s message!";
    28                 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    29                 System.out.println("TopicSend Sent '" + severity + "':'" + message + "'");
    30             }
    31         }catch (Exception e){
    32             e.printStackTrace();
    33             if (connection!=null){
    34                 channel.close();
    35                 connection.close();
    36             }
    37         }finally {
    38             if (connection!=null){
    39                 channel.close();
    40                 connection.close();
    41             }
    42         }
    43     }
    44 }

    消费者 

     1 public class ReceiveLogsTopic1 {
     2     private static final String EXCHANGE_NAME = "topic_logs";
     3 
     4     public static void main(String[] args) throws IOException, TimeoutException {
     5         ConnectionFactory factory = new ConnectionFactory();
     6         factory.setHost("localhost");
     7         Connection connection = factory.newConnection();
     8         Channel channel = connection.createChannel();
     9 
    10         //声明一个匹配模式的交换机
    11         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    12         String queueName = channel.queueDeclare().getQueue();
    13         //路由关键字
    14         String[] routingKeys = new String[]{"*.orange.*"};
    15         //绑定路由
    16         for (String routingKey : routingKeys) {
    17             channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    18             System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);
    19         }
    20         System.out.println("ReceiveLogsTopic1 Waiting for messages");
    21 
    22         Consumer consumer = new DefaultConsumer(channel) {
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25                 String message = new String(body, "UTF-8");
    26                 System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
    27             }
    28         };
    29         channel.basicConsume(queueName, true, consumer);
    30     }
    31 }
     1  public class ReceiveLogsTopic2 {
     2     private static final String EXCHANGE_NAME = "topic_logs";
     3 
     4     public static void main(String[] argv) throws IOException, TimeoutException {
     5         ConnectionFactory factory = new ConnectionFactory();
     6         factory.setHost("localhost");
     7         Connection connection = factory.newConnection();
     8         Channel channel = connection.createChannel();
     9 //      声明一个匹配模式的交换器
    10         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    11         String queueName = channel.queueDeclare().getQueue();
    12         // 路由关键字
    13         String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
    14 //      绑定路由关键字
    15         for (String bindingKey : routingKeys) {
    16             channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    17             System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
    18         }
    19 
    20         System.out.println("ReceiveLogsTopic2 Waiting for messages");
    21 
    22         Consumer consumer = new DefaultConsumer(channel) {
    23             @Override
    24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException  {
    25                 String message = new String(body, "UTF-8");
    26                 System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'");
    27             }
    28         };
    29         channel.basicConsume(queueName, true, consumer);
    30     }
    31 }
  • 相关阅读:
    Nodejs与ES6系列3:generator对象
    Nodejs与ES6系列2:Promise对象
    Nodejs与ES6系列1:变量声明
    Nodejs与ES6系列4:ES6中的类
    angular单元测试与自动化UI测试实践
    javascript 异步模块加载 简易实现
    javascript模式 (3)——工厂模式和装饰模式
    TP手册学习第四内置天
    TP手册学习第三天
    tp5命令行基础
  • 原文地址:https://www.cnblogs.com/senlinyang/p/8440627.html
Copyright © 2011-2022 走看看