zoukankan      html  css  js  c++  java
  • 官网英文版学习——RabbitMQ学习笔记(六)Routing

    有选择的接收消息。

            上一节我们使用的是fanout exchange来实现消息的发布/订阅模式,这并没有给我们带来多大的灵活性——它只能够让人盲目地进行广播。而本节我们采用direct类型的交换器来实现有选择的接收消息。直接交换器背后的路由算法很简单——消息传递到绑定键与消息的路由键完全匹配的队列。


            如上这个设置中,我们可以看到与它绑定的两个队列的直接交换X。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定键为绿色。

            在这个设置里面,一个被发布到交换器里的带有橙色路由键的消息将被路由到队列Q1中,带有黑色或绿色路由键的消息将去往队列Q2中,所有其他消息将被舍弃掉。

            下面这个是多重绑定,该绑定相似与上一节的发布/订阅模式


            使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键黑。在这种情况下,直接交换将表现为fanout类型,并将消息广播给所有匹配的队列。带有black路由键的消息将被发送到Q1和Q2。

          接下来,我们将采用这个模型,代替fanout来发送消息,以这种方式进行,接收程序放可以选择性的接收消息。

    该篇与上篇区别仅在于,发布和订阅方的交换器的类型变为direct,同时设置发布消息的路由键,本篇将其设置为“error”,将订阅方其中一个队列的路由键设置为“error”,另一个设置为“bug”,则运行后,只有路由键和队列名称相同的一方能够收到消息,另一个bug路由键的收不到消息。

    代码如下(在上篇代码上改造,修改的代码加上了下划线以便区分):

    发布方:

    package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Publish {
    	
    	private static final String EXCHANGE_NAME = "exchangeB";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		创建连接
    		Connection connetion = factory.newConnection();
    //		获得信道
    		Channel channel = connetion.createChannel();
    //		声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct类型的交换器)
    		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    		String message = "555,2,2,33,66";
    //		发送消息,将第二项参数routingkey修改为error
    		channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
    		System.out.println(" [x] Sent '" + message + "'");
    		channel.close();
    		connetion.close();
    	}
    
    }
    

    订阅方一,路由键为“error”

    package com.rabbitmq.HelloWorld;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Subscribe {
    	
    	private static final String EXCHANGE_NAME = "exchangeB";
    	private static final String QUEUE_NAME = "queueA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		创建连接
    		Connection connetion = factory.newConnection();
    //		获得信道
    		Channel channel = connetion.createChannel();
    //		声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct的交换器)
    		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //		声明一个队列,在此采用临时队列
    		String queueName = channel.queueDeclare().getQueue();
    //		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    //		队列和交换器进行绑定,并设定路由键为error
    		channel.queueBind(queueName, EXCHANGE_NAME, "error");
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    				// TODO Auto-generated method stub
    				String message = new String(body,"utf-8");
    				System.out.println("[x] received'"+message+"'");
    			}
    		};
    		channel.basicConsume(queueName, consumer);
    	}
    
    }
    

    订阅方二(路由键为“bug”)

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Subscribe {
    	
    	private static final String EXCHANGE_NAME = "exchangeB";
    	private static final String QUEUE_NAME = "queueA";
    
    	public static void main(String[] args) throws IOException, TimeoutException {
    		// TODO Auto-generated method stub
    //		创建工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.10.185");
    		factory.setUsername("admin");
    		factory.setPassword("123456");
    		factory.setPort(5672);
    //		创建连接
    		Connection connetion = factory.newConnection();
    //		获得信道
    		Channel channel = connetion.createChannel();
    //		声明交换器(声明了一个名字位exchangeA,修改fanout类型为direct类型的交换器�?
    		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //		声明�?个队列,在此采用临时队列
    		String queueName = channel.queueDeclare().getQueue();
    //		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    //		队列和交换器进行绑定,未设定路由键
    		channel.queueBind(queueName, EXCHANGE_NAME, "bug");
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    		Consumer consumer = new DefaultConsumer(channel){
    			@Override
    			public void handleDelivery(String consumerTag, Envelope envelope,
    					BasicProperties properties, byte[] body) throws IOException {
    				// TODO Auto-generated method stub
    				String message = new String(body,"utf-8");
    				System.out.println("[x] received'"+message+"'");
    			}
    		};
    		channel.basicConsume(queueName, consumer);
    	}
    
    }
    

    运行结果:

    下面这个是在另一个项目中,没有引进日志打印的包,红色忽略即可

       效果已经呈现了。

     

    有选择的接收消息。

            上一节我们使用的是fanout exchange来实现消息的发布/订阅模式,这并没有给我们带来多大的灵活性——它只能够让人盲目地进行广播。而本节我们采用direct类型的交换器来实现有选择的接收消息。直接交换器背后的路由算法很简单——消息传递到绑定键与消息的路由键完全匹配的队列。


            如上这个设置中,我们可以看到与它绑定的两个队列的直接交换X。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定键为绿色。

            在这个设置里面,一个被发布到交换器里的带有橙色路由键的消息将被路由到队列Q1中,带有黑色或绿色路由键的消息将去往队列Q2中,所有其他消息将被舍弃掉。

            下面这个是多重绑定,该绑定相似与上一节的发布/订阅模式


            使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键黑。在这种情况下,直接交换将表现为fanout类型,并将消息广播给所有匹配的队列。带有black路由键的消息将被发送到Q1和Q2。

          接下来,我们将采用这个模型,代替fanout来发送消息,以这种方式进行,接收程序放可以选择性的接收消息。

    该篇与上篇区别仅在于,发布和订阅方的交换器的类型变为direct,同时设置发布消息的路由键,本篇将其设置为“error”,将订阅方其中一个队列的路由键设置为“error”,另一个设置为“bug”,则运行后,只有路由键和队列名称相同的一方能够收到消息,另一个bug路由键的收不到消息。

    代码如下(在上篇代码上改造,修改的代码加上了下划线以便区分):

    发布方:

    [java] view plain copy
     
    1. package com.rabbitmq.HelloWorld;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.concurrent.TimeoutException;  
    5.   
    6. import com.rabbitmq.client.Channel;  
    7. import com.rabbitmq.client.Connection;  
    8. import com.rabbitmq.client.ConnectionFactory;  
    9.   
    10. public class Publish {  
    11.       
    12.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
    13.   
    14.     public static void main(String[] args) throws IOException, TimeoutException {  
    15.         // TODO Auto-generated method stub  
    16. //      创建工厂  
    17.         ConnectionFactory factory = new ConnectionFactory();  
    18.         factory.setHost("192.168.10.185");  
    19.         factory.setUsername("admin");  
    20.         factory.setPassword("123456");  
    21.         factory.setPort(5672);  
    22. //      创建连接  
    23.         Connection connetion = factory.newConnection();  
    24. //      获得信道  
    25.         Channel channel = connetion.createChannel();  
    26. //      <u>声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct类型的交换器)  
    27.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
    28.         String message = "555,2,2,33,66";  
    29. //      <u>发送消息,将第二项参数routingkey修改为error  
    30.         channel.basicPublish(EXCHANGE_NAME, "error"null, message.getBytes());</u>  
    31.         System.out.println(" [x] Sent '" + message + "'");  
    32.         channel.close();  
    33.         connetion.close();  
    34.     }  
    35.   
    36. }  

    订阅方一,路由键为“error”

    [java] view plain copy
     
    1. package com.rabbitmq.HelloWorld;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.concurrent.TimeoutException;  
    5.   
    6. import com.rabbitmq.client.Channel;  
    7. import com.rabbitmq.client.Connection;  
    8. import com.rabbitmq.client.ConnectionFactory;  
    9. import com.rabbitmq.client.Consumer;  
    10. import com.rabbitmq.client.DefaultConsumer;  
    11. import com.rabbitmq.client.Envelope;  
    12. import com.rabbitmq.client.AMQP.BasicProperties;  
    13.   
    14. public class Subscribe {  
    15.       
    16.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
    17.     private static final String QUEUE_NAME = "queueA";  
    18.   
    19.     public static void main(String[] args) throws IOException, TimeoutException {  
    20.         // TODO Auto-generated method stub  
    21. //      创建工厂  
    22.         ConnectionFactory factory = new ConnectionFactory();  
    23.         factory.setHost("192.168.10.185");  
    24.         factory.setUsername("admin");  
    25.         factory.setPassword("123456");  
    26.         factory.setPort(5672);  
    27. //      创建连接  
    28.         Connection connetion = factory.newConnection();  
    29. //      获得信道  
    30.         Channel channel = connetion.createChannel();  
    31. //      <u>声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct的交换器)  
    32.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
    33. //      声明一个队列,在此采用临时队列  
    34.         String queueName = channel.queueDeclare().getQueue();  
    35. //      channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    36. //      队列和交换器进行绑定,并设定路由键为error  
    37.         <u>channel.queueBind(queueName, EXCHANGE_NAME, "error");</u>  
    38.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    39.         Consumer consumer = new DefaultConsumer(channel){  
    40.             @Override  
    41.             public void handleDelivery(String consumerTag, Envelope envelope,  
    42.                     BasicProperties properties, byte[] body) throws IOException {  
    43.                 // TODO Auto-generated method stub  
    44.                 String message = new String(body,"utf-8");  
    45.                 System.out.println("[x] received'"+message+"'");  
    46.             }  
    47.         };  
    48.         channel.basicConsume(queueName, consumer);  
    49.     }  
    50.   
    51. }  

    订阅方二(路由键为“bug”)

    [java] view plain copy
     
    1. import java.io.IOException;  
    2. import java.util.concurrent.TimeoutException;  
    3.   
    4. import com.rabbitmq.client.Channel;  
    5. import com.rabbitmq.client.Connection;  
    6. import com.rabbitmq.client.ConnectionFactory;  
    7. import com.rabbitmq.client.Consumer;  
    8. import com.rabbitmq.client.DefaultConsumer;  
    9. import com.rabbitmq.client.Envelope;  
    10. import com.rabbitmq.client.AMQP.BasicProperties;  
    11.   
    12. public class Subscribe {  
    13.       
    14.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
    15.     private static final String QUEUE_NAME = "queueA";  
    16.   
    17.     public static void main(String[] args) throws IOException, TimeoutException {  
    18.         // TODO Auto-generated method stub  
    19. //      创建工厂  
    20.         ConnectionFactory factory = new ConnectionFactory();  
    21.         factory.setHost("192.168.10.185");  
    22.         factory.setUsername("admin");  
    23.         factory.setPassword("123456");  
    24.         factory.setPort(5672);  
    25. //      创建连接  
    26.         Connection connetion = factory.newConnection();  
    27. //      获得信道  
    28.         Channel channel = connetion.createChannel();  
    29. //      <u>声明交换器(声明了一个名字位exchangeA,修改fanout类型为direct类型的交换器�?  
    30.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
    31. //      声明�?个队列,在此采用临时队列  
    32.         String queueName = channel.queueDeclare().getQueue();  
    33. //      channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
    34. //      <u>队列和交换器进行绑定,未设定路由键  
    35.         channel.queueBind(queueName, EXCHANGE_NAME, "bug");</u>  
    36.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    37.         Consumer consumer = new DefaultConsumer(channel){  
    38.             @Override  
    39.             public void handleDelivery(String consumerTag, Envelope envelope,  
    40.                     BasicProperties properties, byte[] body) throws IOException {  
    41.                 // TODO Auto-generated method stub  
    42.                 String message = new String(body,"utf-8");  
    43.                 System.out.println("[x] received'"+message+"'");  
    44.             }  
    45.         };  
    46.         channel.basicConsume(queueName, consumer);  
    47.     }  
    48.   
    49. }  

    运行结果:

    下面这个是在另一个项目中,没有引进日志打印的包,红色忽略即可

       效果已经呈现了。

  • 相关阅读:
    HDU-3790 最短路径问题(双重权值)
    Graph (floyd)
    POJ-3259 Wormholes(判断负环、模板)
    HDU-1317 XYZZY
    HDU-1548 A strange lift(单源最短路 或 BFS)
    最小生成树(模板 prim)
    【面试】386- JavaScript 面试 20 个核心考点
    【Koa】385- koa框架的快速入门与使用
    【小程序】384- 如何一人五天开发完复杂小程序(前端必看)
    【React】383- React Fiber:深入理解 React reconciliation 算法
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9193428.html
Copyright © 2011-2022 走看看