RabbitMQ Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。下面分别以实例的方式对这几种exchange进行讲解。
direct
首先我们以路由的方式对消息进行过滤,代码如下:
生产者
1 public class RoutingSendDirect { 2 3 private static final String EXCHANGE_NAME = "direct_test"; 4 5 private static final String[] routingKeys = new String[]{"info" ,"warning", "error"}; 6 7 public static void main(String[] args) throws IOException, TimeoutException { 8 ConnectionFactory connectionFactory = new ConnectionFactory(); 9 connectionFactory.setHost("localhost"); 10 Connection connection = connectionFactory.newConnection(); 11 Channel channel = connection.createChannel(); 12 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 13 for(String key : routingKeys){ 14 String message = "RoutingSendDirect Send the message level:" + key; 15 channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes()); 16 System.out.println("RoutingSendDirect Send"+key +"':'" + message); 17 } 18 channel.close(); 19 connection.close(); 20 } 21 }
消费者
1 public class ReceiveDirect1 { 2 // 交换器名称 3 private static final String EXCHANGE_NAME = "direct_test"; 4 // 路由关键字 5 private static final String[] routingKeys = new String[]{"info" ,"warning"}; 6 7 public static void main(String[] args) throws IOException, TimeoutException { 8 ConnectionFactory connectionFactory = new ConnectionFactory(); 9 connectionFactory.setHost("localhost"); 10 Connection connection = connectionFactory.newConnection(); 11 Channel channel = connection.createChannel(); 12 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 13 //获取匿名队列名称 14 String queueName=channel.queueDeclare().getQueue(); 15 for(String key : routingKeys){ 16 channel.queueBind(queueName,EXCHANGE_NAME,key); 17 System.out.println("ReceiveDirect1 exchange:"+EXCHANGE_NAME+"," + 18 " queue:"+queueName+", BindRoutingKey:" + key); 19 } 20 21 System.out.println("ReceiveDirect1 Waiting for messages"); 22 Consumer consumer = new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 String msg = new String(body,"UTF-8"); 26 System.out.println("ReceiveDirect1 Received '" + envelope.getRoutingKey() + "':'" + msg + "'"); 27 } 28 }; 29 30 channel.basicConsume(queueName, true, consumer); 31 } 32 }
1 public class ReceiveDirect2 { 2 // 交换器名称 3 private static final String EXCHANGE_NAME = "direct_test"; 4 // 路由关键字 5 private static final String[] routingKeys = new String[]{"error"}; 6 7 public static void main(String[] args) throws IOException, TimeoutException { 8 ConnectionFactory connectionFactory = new ConnectionFactory(); 9 connectionFactory.setHost("localhost"); 10 Connection connection = connectionFactory.newConnection(); 11 Channel channel = connection.createChannel(); 12 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 13 //获取匿名队列名称 14 String queueName=channel.queueDeclare().getQueue(); 15 for(String key : routingKeys){ 16 channel.queueBind(queueName,EXCHANGE_NAME,key); 17 System.out.println("ReceiveDirect2 exchange:"+EXCHANGE_NAME+"," + 18 " queue:"+queueName+", BindRoutingKey:" + key); 19 } 20 21 System.out.println("ReceiveDirect2 Waiting for messages"); 22 Consumer consumer = new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 String msg = new String(body,"UTF-8"); 26 System.out.println("ReceiveDirect2 Received '" + envelope.getRoutingKey() + "':'" + msg + "'"); 27 } 28 }; 29 30 channel.basicConsume(queueName, true, consumer); 31 } 32 }
运行结果如下:
1 RoutingSendDirect Sendinfo':'RoutingSendDirect Send the message level:info 2 RoutingSendDirect Sendwarning':'RoutingSendDirect Send the message level:warning 3 RoutingSendDirect Senderror':'RoutingSendDirect Send the message level:error 4 5 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:info 6 ReceiveDirect1 exchange:direct_test, queue:amq.gen-HsUrzbjzto-K7HeigXSEfQ, BindRoutingKey:warning 7 ReceiveDirect1 Waiting for messages 8 ReceiveDirect1 Received 'info':'RoutingSendDirect Send the message level:info' 9 ReceiveDirect1 Received 'warning':'RoutingSendDirect Send the message level:warning' 10 11 ReceiveDirect2 exchange:direct_test, queue:amq.gen-i3NY12l3DqWjGapaBOCdwQ, BindRoutingKey:error 12 ReceiveDirect2 Waiting for messages 13 ReceiveDirect2 Received 'error':'RoutingSendDirect Send the message level:error'
fanout
fanout和别的MQ的发布/订阅模式类似,实例如下:
生产者
1 public class Pub { 2 private static final String EXCHANGE_NAME = "logs"; 3 public static void main(String[] args) throws IOException, TimeoutException { 4 ConnectionFactory factory=new ConnectionFactory(); 5 factory.setHost("localhost"); 6 Connection connection=factory.newConnection(); 7 Channel channel=connection.createChannel(); 8 //fanout表示分发,所有的消费者得到同样的队列信息 9 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 10 //分发信息 11 for (int i=0;i<5;i++){ 12 String message="Hello World"+i; 13 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); 14 System.out.println("Pub Sent '" + message + "'"); 15 } 16 channel.close(); 17 connection.close(); 18 } 19 }
消费者
public class Sub { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定 System.out.println("Sub Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Sub Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer);//队列会自动删除 } }
Topics
这种应该属于模糊匹配,* :可以替代一个词,#:可以替代0或者更多的词,现在我们继续看看代码来理解
生产者
1 public class TopicSend { 2 private static final String EXCHANGE_NAME = "topic_logs"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 Connection connection = null; 6 Channel channel = null; 7 try{ 8 ConnectionFactory factory=new ConnectionFactory(); 9 factory.setHost("localhost"); 10 connection=factory.newConnection(); 11 channel=connection.createChannel(); 12 13 //声明一个匹配模式的交换机 14 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); 15 //待发送的消息 16 String[] routingKeys=new String[]{ 17 "quick.orange.rabbit", 18 "lazy.orange.elephant", 19 "quick.orange.fox", 20 "lazy.brown.fox", 21 "quick.brown.fox", 22 "quick.orange.male.rabbit", 23 "lazy.orange.male.rabbit" 24 }; 25 //发送消息 26 for(String severity :routingKeys){ 27 String message = "From "+severity+" routingKey' s message!"; 28 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); 29 System.out.println("TopicSend Sent '" + severity + "':'" + message + "'"); 30 } 31 }catch (Exception e){ 32 e.printStackTrace(); 33 if (connection!=null){ 34 channel.close(); 35 connection.close(); 36 } 37 }finally { 38 if (connection!=null){ 39 channel.close(); 40 connection.close(); 41 } 42 } 43 } 44 }
消费者
1 public class ReceiveLogsTopic1 { 2 private static final String EXCHANGE_NAME = "topic_logs"; 3 4 public static void main(String[] args) throws IOException, TimeoutException { 5 ConnectionFactory factory = new ConnectionFactory(); 6 factory.setHost("localhost"); 7 Connection connection = factory.newConnection(); 8 Channel channel = connection.createChannel(); 9 10 //声明一个匹配模式的交换机 11 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 12 String queueName = channel.queueDeclare().getQueue(); 13 //路由关键字 14 String[] routingKeys = new String[]{"*.orange.*"}; 15 //绑定路由 16 for (String routingKey : routingKeys) { 17 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); 18 System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey); 19 } 20 System.out.println("ReceiveLogsTopic1 Waiting for messages"); 21 22 Consumer consumer = new DefaultConsumer(channel) { 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 String message = new String(body, "UTF-8"); 26 System.out.println("ReceiveLogsTopic1 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); 27 } 28 }; 29 channel.basicConsume(queueName, true, consumer); 30 } 31 }
1 public class ReceiveLogsTopic2 { 2 private static final String EXCHANGE_NAME = "topic_logs"; 3 4 public static void main(String[] argv) throws IOException, TimeoutException { 5 ConnectionFactory factory = new ConnectionFactory(); 6 factory.setHost("localhost"); 7 Connection connection = factory.newConnection(); 8 Channel channel = connection.createChannel(); 9 // 声明一个匹配模式的交换器 10 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 11 String queueName = channel.queueDeclare().getQueue(); 12 // 路由关键字 13 String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; 14 // 绑定路由关键字 15 for (String bindingKey : routingKeys) { 16 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 17 System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey); 18 } 19 20 System.out.println("ReceiveLogsTopic2 Waiting for messages"); 21 22 Consumer consumer = new DefaultConsumer(channel) { 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { 25 String message = new String(body, "UTF-8"); 26 System.out.println("ReceiveLogsTopic2 Received '" + envelope.getRoutingKey() + "':'" + message + "'"); 27 } 28 }; 29 channel.basicConsume(queueName, true, consumer); 30 } 31 }