Topic exchange(主题转发器)
发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。
绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。如下图:
MQ工厂连接类Connection

1 package com.mmr.rabbitmq.util; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 8 public class ConnectionUtils { 9 /** 10 * @desc 获取Mq 的链接 11 * @author zp 12 * @throws IOException 13 * @date 2018-7-19 14 */ 15 public static Connection getConnection() throws IOException { 16 // 1.定义一个链接工厂 17 ConnectionFactory factroy = new ConnectionFactory(); 18 19 // 2.设置服务地址 20 factroy.setHost("127.0.0.1"); 21 22 // 3.设置端口号 23 factroy.setPort(5672); 24 25 // 4.vhost 设置数据库 26 factroy.setVirtualHost("vhtest"); 27 28 // 5.设置用户名 29 factroy.setUsername("jerry"); 30 31 // 6. 设置密码 32 factroy.setPassword("123456"); 33 34 // 7.返回链接 35 return factroy.newConnection(); 36 } 37 }
MQ消息生产者类Send

1 package com.mmr.rabbitmq.topic; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 9 public class Send { 10 private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic"; 11 public static void main(String[] args) throws IOException { 12 // 创建连接 13 Connection connection = ConnectionUtils.getConnection(); 14 15 // 获取通道 16 Channel channel = connection.createChannel(); 17 18 // 设置路由键 19 channel.exchangeDeclare(EXCHANGE_TOPIC_NAME, "topic"); 20 21 // 发送消息 22 String msg = "商品......."; 23 24 // String routingKey = "goods.delete"; 25 26 channel.basicPublish(EXCHANGE_TOPIC_NAME, "goods.add", null, msg.getBytes()); 27 System.out.println("send ---"+msg); 28 channel.close(); 29 connection.close(); 30 31 } 32 }
MQ消息消费者类Recv1只处理good.add good.edit ;Recv2处理good.#

1 package com.mmr.rabbitmq.topic; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv1 { 14 private static final String QUEUE_NAME="test_queue_topic1"; 15 private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic"; 16 public static void main(String[] args) throws IOException { 17 // 创建连接 18 Connection connection = ConnectionUtils.getConnection(); 19 20 // 创建通道 21 final Channel channel = connection.createChannel(); 22 23 // 队列声明 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 绑定交换机 27 channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.add"); 28 channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.edit"); 29 // 每次 30 channel.basicQos(1); 31 32 // 消费者 33 Consumer consumer = new DefaultConsumer(channel){ 34 @Override 35 public void handleDelivery(String consumerTag, Envelope envelope, 36 BasicProperties properties, byte[] body) throws IOException { 37 // TODO Auto-generated method stub 38 // 接收消息 39 String msg = new String(body,"utf-8"); 40 System.out.println("【1】Recv1 msg"+msg); 41 try { 42 Thread.sleep(2000); 43 } catch (Exception e) { 44 // TODO: handle exception 45 e.printStackTrace(); 46 }finally{ 47 System.out.print("【1】Recv1 done"); 48 //回执单 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 boolean autoAck = false; 54 channel.basicConsume(QUEUE_NAME, autoAck,consumer ); 55 } 56 }

1 package com.mmr.rabbitmq.topic; 2 3 import java.io.IOException; 4 5 import com.mmr.rabbitmq.util.ConnectionUtils; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.Consumer; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 public class Recv2 { 14 private static final String QUEUE_NAME="test_queue_topic2"; 15 private static final String EXCHANGE_TOPIC_NAME = "test_exchange_topic"; 16 public static void main(String[] args) throws IOException { 17 // 创建连接 18 Connection connection = ConnectionUtils.getConnection(); 19 20 // 创建通道 21 final Channel channel = connection.createChannel(); 22 23 // 队列声明 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 26 // 绑定交换机 27 channel.queueBind(QUEUE_NAME, EXCHANGE_TOPIC_NAME, "goods.#"); 28 29 // 每次 30 channel.basicQos(1); 31 32 // 消费者 33 Consumer consumer = new DefaultConsumer(channel){ 34 @Override 35 public void handleDelivery(String consumerTag, Envelope envelope, 36 BasicProperties properties, byte[] body) throws IOException { 37 // TODO Auto-generated method stub 38 // 接收消息 39 String msg = new String(body,"utf-8"); 40 System.out.println("【2】Recv2 msg"+msg); 41 try { 42 Thread.sleep(2000); 43 } catch (Exception e) { 44 // TODO: handle exception 45 e.printStackTrace(); 46 }finally{ 47 System.out.println("【2】Recv2 done"); 48 //回执单 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 boolean autoAck = false; 54 channel.basicConsume(QUEUE_NAME, autoAck,consumer ); 55 } 56 }
运行上述代码,可以获得 当消息产生发送过来是good.add/good.edit 那么消息消费者1和消费者2都会接收到消息并且处理,但是当消息生产者发送过来的是good.delete或者good.select等的时候只有消费者2(Recv2)接收到消息处理通知。