转自:http://www.cnblogs.com/4----/p/6593486.html
1.简介
上一章介绍了direct类型的exchange,并用它实现了一个伪广播(Queue绑定多个routingKey)日志系统,现在,考虑另一个问题,我们的日志系统不仅要分级别级别(error,info)记录日志,还需要通过发送日志的系统来匹配,比如说有一个“核心”系统,它发出的所有级别日志,都需要记录到硬盘,其他系统只需要把error级别的日志记录到硬盘。
如果用上一章的direct该怎么做呢?
- 给routingKey分层,变成类似这样的字符串:核心.info,核心.error,其他.info,其他.error
- Q1绑定routingKey:核心.info,核心.error,其他.error,记录所有核心日志,记录其他error日志
- Q2绑定routingKey:核心.info,其他.info,打印所有info日志
需求实现了,这时,项目经理说,两个日志级别太不好管理了,我们加个debug级别吧!
你的内心这样的
是时候学习一下Topic Exchange了
2.Topic Exchange
topic exchange对routingKey是有要求的,必须是一个关键字的列表才能发挥正常作用,用“.”分割每个关键字,你可以定义任意的层级,唯一的限制是最大长度为255字节。
上述需求,我们可以把routingKey的规则定义为 “<系统>.<日志级别>”,这个规则是抽象的,也就是说,是在你脑子里的,并没有地方去具体的把它绑定到exchange上,发送消息和绑定队列完全可以不按这个规则来,只会影响消息是否能分发到对应的队列上。
用“.”分割同样不是强制要求,routingKey里不包含这个字符也不会报错,“.”只会影响topic中对routingKey的分层解析,果不用它,那么topic的表现和direct一致
topic与direct的重要区别就是,它有两个关键字
- “*”星号,代表一个词,比如上述规则:*.error 表示所有系统的error级别的日志
- “#”井号,代表零个或多个词,比如上述规则: *.# 表示所有系统的所有消息,与单独一个#是等效的,core.# 表示核心系统的所有日志,它和 core.* 的区别是,即使以后规则改为 <系统>.<日志级别>.<其他条件>.<其他条件>.……,core.# 依然可以完成匹配,而 core.* 则无法匹配 core.info.xxx.xxx
第一条很好理解,第二条有点长,不会是骗人的吧?我们来实验一下
首先把把logs的type声明成topic,注意在控制台把上一章的direct类型的logs删除掉
channel.exchangeDeclare("logs","topic");
把Consumer的routingKey提取出来,方便后面测试
1 /** 2 * 获取一个临时队列,并绑定到相应的routingKey上,消费消息 3 */ 4 public void consume(String routingKey) { 5 try { 6 String queueName = channel.queueDeclare().getQueue(); 7 //临时队列绑定的routingKey是外部传过来的 8 channel.queueBind(queueName, "logs", routingKey); 9 Consumer consumer = new DefaultConsumer(channel) { 10 @Override 11 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 12 byte[] body) throws IOException { 13 String message = new String(body, "UTF-8"); 14 logger.debug(" [D] 打印日志:"+message); 15 } 16 }; 17 //这里自动确认为true,接收到消息后该消息就销毁了 18 channel.basicConsume(queueName, true, consumer); 19 } catch (IOException e) { 20 e.printStackTrace(); 21 } 22 }
测试1
Consumer的临时队列绑定到logs上,routingKey设置为 a.#
1 public static void main( String[] args ) 2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.#"); 5 }
Sender的发送到logs上,routingKey设置为 a.b
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.b"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
运行,开始发送消息
成功消费
测试2
Consumer的临时队列绑定到logs上,routingKey设置为 a.*
1 public static void main( String[] args ) 2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.*"); 5 }
Sender不变
运行,开始发送消息
消费成功
测试3
Consumer继续绑定到a.*
Sender的发送到logs上,routingKey设置为 a.b.c
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.b.c"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
运行,开始发送消息
a.*监听不到消息
测试4
Consumer改为监听 a.#
1 public static void main( String[] args ) 2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.#"); 5 }
继续往a.b.c发消息
a.# 消费成功
测试5
下面测点特殊的
Consumer绑定到 a.b
public static void main( String[] args ) { LogConsumer consumer = new LogConsumer(); consumer.consume("a.b"); }
Sender发送到a.*
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.*"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
发送成功
消费不到
测试6
Sender发送到a.#,Consumer还是监听 a.b
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.#"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
发送成功
a.#也收不到消息
测试结果
通过以上六个测试,我们发现,topic中的通配符,只有在Queue绑定的时候才能起到通配符的作用,如果在发布消息的时候使用通配符,将作为普通的字符处理,发送的routingKey=a.* 并不能把消息发送到routingKey=a.b的Queue上,a.#同理,也不能把消息发送到routingKey=a.b.c的Queue上
3.实战
为了体现出#的作用,我们给一开始的需求增加一点难度,规则定位三层,<系统>.<级别>.<类型>,其中类型有两种,common和important
需求是,Q1监听core系统的所有日志和其他系统所有所有级别的important类型的日志以及error级的日志;Q2监听所有系统的info日志
首先改造一下Sender,让它可以通过多线程发送不同系统的日志消息
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.Random; 6 7 public class App { 8 9 //声明一个类型MAP 10 private static Map<Integer, String> typeMap; 11 //声明一个Random 12 private static Random random; 13 14 //在静态代码块中初始化 15 static { 16 typeMap = new HashMap<>(); 17 typeMap.put(0, "important"); 18 typeMap.put(1, "common"); 19 random = new Random(System.currentTimeMillis()); 20 } 21 22 /** 23 * 获取一个随机类型的routingKey 24 * @param system 系统名 25 * @param level 日志级别 26 * @return routingKey 27 */ 28 public static String getRoutingKey(String system, String level) { 29 return new StringBuilder() 30 .append(system).append(".") 31 .append(level).append(".") 32 .append(typeMap.get(random.nextInt(2))) 33 .toString(); 34 } 35 36 /** 37 * 新建一个线程,发送指定系统的消息 38 * @param system 39 */ 40 public static void createSender(final String system){ 41 new Thread(new Runnable() { 42 //new一个Sender 43 private LogSender sender = new LogSender(); 44 45 @Override 46 public void run() { 47 while(true){ 48 long now = System.currentTimeMillis(); 49 //通过当前时间生成错误级别 50 boolean info = now % 2 == 0; 51 //生成routingKey 52 String routingKey = getRoutingKey(system, info?"info":"error"); 53 //发送消息 54 String msg = routingKey+"["+now+"]"; 55 sender.sendMessage(msg, routingKey); 56 try { 57 //随机睡500-1000毫秒 58 int sleepTime = random.nextInt(1000); 59 Thread.sleep(sleepTime<500?500:sleepTime); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 }).start(); 66 } 67 68 public static void main(String[] args) throws InterruptedException { 69 //开始发送core系统消息 70 createSender("core"); 71 //开始发送biz系统消息 72 createSender("biz"); 73 } 74 75 }
上述代码可以用两个线程发送两个系统的随机日志消息
下面实现消费者
Q1监听core系统的所有日志和其他系统所有级别的important类型的日志以及error级的日志
把这句话拆分一下
- core系统所有的日志,core.#
- 其他系统所有级别的important类型的日志,*.*.important
- 其他系统所有error级的日志,*.error.*
那么,我们只要给Q1绑定这三个routingKey就可以了,绑定多个routingKey我们在上一张已经验证过了
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class LogConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class); 20 private ConnectionFactory factory; 21 private Connection connection; 22 private Channel channel; 23 24 /** 25 * 在构造函数中获取连接 26 */ 27 public LogConsumer() { 28 super(); 29 try { 30 factory = new ConnectionFactory(); 31 factory.setHost("127.0.0.1"); 32 connection = factory.newConnection(); 33 channel = connection.createChannel(); 34 // 声明exchange,防止生产者没启动,exchange不存在 35 channel.exchangeDeclare("logs","topic"); 36 } catch (Exception e) { 37 logger.error(" [X] INIT ERROR!", e); 38 } 39 } 40 41 /** 42 * 提供个关闭方法,现在并没有什么卵用 43 * 44 * @return 45 */ 46 public boolean closeAll() { 47 try { 48 this.channel.close(); 49 this.connection.close(); 50 } catch (IOException | TimeoutException e) { 51 logger.error(" [X] CLOSE ERROR!", e); 52 return false; 53 } 54 return true; 55 } 56 57 /** 58 * 获取一个临时队列,并绑定到相应的routingKey上,消费消息 59 */ 60 public void consume() { 61 try { 62 String queueName = channel.queueDeclare().getQueue(); 63 //core系统所有的日志 64 channel.queueBind(queueName, "logs", "core.#"); 65 //其他系统所有级别的important类型的日志 66 channel.queueBind(queueName, "logs", "*.*.important"); 67 //其他系统所有error级的日志 68 channel.queueBind(queueName, "logs", "*.error.*"); 69 70 Consumer consumer = new DefaultConsumer(channel) { 71 @Override 72 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 73 byte[] body) throws IOException { 74 String message = new String(body, "UTF-8"); 75 logger.debug(" [Q1] 打印日志:"+message); 76 } 77 }; 78 //这里自动确认为true,接收到消息后该消息就销毁了 79 channel.basicConsume(queueName, true, consumer); 80 } catch (IOException e) { 81 e.printStackTrace(); 82 } 83 } 84 }
Q2监听所有系统的info日志
Q2应该绑定 *.info.*
1 //所有系统的info日志 2 channel.queueBind(queueName, "logs", "*.info.*");
把Q1 Q2打包成可执行jar,运行结果如下
可以看到需求已经正确实现了
4.结束语
到这里,RabbitMQ中四中exchagne类型:direct、topic、fanout、headers已经介绍了三种最常用的
headers不是很常用,放到Alternate Exchanges中一起介绍,后面还会介绍RabbitMQ的其他特性,如:TTL、Lazy Queue、Exchange To Exchange、Dead Lettering等,敬请期待