zoukankan      html  css  js  c++  java
  • RabbitMQ-从基础到实战(5)— 消息的交换(下)

    转自:http://www.cnblogs.com/4----/p/6593486.html

    1.简介

    上一章介绍了direct类型的exchange,并用它实现了一个伪广播(Queue绑定多个routingKey)日志系统,现在,考虑另一个问题,我们的日志系统不仅要分级别级别(error,info)记录日志,还需要通过发送日志的系统来匹配,比如说有一个“核心”系统,它发出的所有级别日志,都需要记录到硬盘,其他系统只需要把error级别的日志记录到硬盘。

    如果用上一章的direct该怎么做呢?

    • 给routingKey分层,变成类似这样的字符串:核心.info,核心.error,其他.info,其他.error
    • Q1绑定routingKey:核心.info,核心.error,其他.error,记录所有核心日志,记录其他error日志
    • Q2绑定routingKey:核心.info,其他.info,打印所有info日志

    需求实现了,这时,项目经理说,两个日志级别太不好管理了,我们加个debug级别吧!

    你的内心这样的

    image

    是时候学习一下Topic Exchange了

    2.Topic Exchange

    topic exchange对routingKey是有要求的,必须是一个关键字的列表才能发挥正常作用,用“.”分割每个关键字,你可以定义任意的层级,唯一的限制是最大长度为255字节。

    上述需求,我们可以把routingKey的规则定义为 “<系统>.<日志级别>”,这个规则是抽象的,也就是说,是在你脑子里的,并没有地方去具体的把它绑定到exchange上,发送消息和绑定队列完全可以不按这个规则来,只会影响消息是否能分发到对应的队列上。

    用“.”分割同样不是强制要求,routingKey里不包含这个字符也不会报错,“.”只会影响topic中对routingKey的分层解析,果不用它,那么topic的表现和direct一致

    topic与direct的重要区别就是,它有两个关键字

    1. “*”星号,代表一个词,比如上述规则:*.error 表示所有系统的error级别的日志
    2. “#”井号,代表零个或多个词,比如上述规则: *.# 表示所有系统的所有消息,与单独一个#是等效的,core.# 表示核心系统的所有日志,它和 core.* 的区别是,即使以后规则改为 <系统>.<日志级别>.<其他条件>.<其他条件>.……,core.# 依然可以完成匹配,而 core.* 则无法匹配 core.info.xxx.xxx

    第一条很好理解,第二条有点长,不会是骗人的吧?我们来实验一下

    首先把把logs的type声明成topic,注意在控制台把上一章的direct类型的logs删除掉

    channel.exchangeDeclare("logs","topic");

    把Consumer的routingKey提取出来,方便后面测试

    复制代码
     1 /**
     2  * 获取一个临时队列,并绑定到相应的routingKey上,消费消息
     3  */
     4 public void consume(String routingKey) {
     5     try {
     6         String queueName = channel.queueDeclare().getQueue();
     7         //临时队列绑定的routingKey是外部传过来的
     8         channel.queueBind(queueName, "logs", routingKey);
     9         Consumer consumer = new DefaultConsumer(channel) {
    10             @Override
    11             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    12                     byte[] body) throws IOException {
    13                 String message = new String(body, "UTF-8");
    14                 logger.debug(" [D] 打印日志:"+message);
    15             }
    16         };
    17         //这里自动确认为true,接收到消息后该消息就销毁了
    18         channel.basicConsume(queueName, true, consumer);
    19     } catch (IOException e) {
    20         e.printStackTrace();
    21     }
    22 }
    复制代码

    测试1

    Consumer的临时队列绑定到logs上,routingKey设置为 a.#

    1 public static void main( String[] args )
    2 {
    3     LogConsumer consumer = new LogConsumer();
    4     consumer.consume("a.#");
    5 }

    Sender的发送到logs上,routingKey设置为 a.b

    复制代码
    1 public static void main( String[] args ) throws InterruptedException{
    2     LogSender sender = new LogSender();
    3     while(true){
    4         String routingKey = "a.b";
    5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
    6         Thread.sleep(1000);
    7     }
    8 }
    复制代码

    运行,开始发送消息

    成功消费

    测试2

    Consumer的临时队列绑定到logs上,routingKey设置为 a.*

    1 public static void main( String[] args )
    2 {
    3     LogConsumer consumer = new LogConsumer();
    4     consumer.consume("a.*");
    5 }

    Sender不变

    运行,开始发送消息

    消费成功

    测试3

    Consumer继续绑定到a.*

    Sender的发送到logs上,routingKey设置为 a.b.c

    复制代码
    1 public static void main( String[] args ) throws InterruptedException{
    2     LogSender sender = new LogSender();
    3     while(true){
    4         String routingKey = "a.b.c";
    5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
    6         Thread.sleep(1000);
    7     }
    8 }
    复制代码

    运行,开始发送消息

    a.*监听不到消息

    测试4

    Consumer改为监听 a.#

    1 public static void main( String[] args )
    2 {
    3     LogConsumer consumer = new LogConsumer();
    4     consumer.consume("a.#");
    5 }

    继续往a.b.c发消息

    a.# 消费成功

    测试5

    下面测点特殊的

    Consumer绑定到 a.b

    public static void main( String[] args )
    {
        LogConsumer consumer = new LogConsumer();
        consumer.consume("a.b");
    }

    Sender发送到a.*

    复制代码
    1 public static void main( String[] args ) throws InterruptedException{
    2     LogSender sender = new LogSender();
    3     while(true){
    4         String routingKey = "a.*";
    5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
    6         Thread.sleep(1000);
    7     }
    8 }
    复制代码

    发送成功

    消费不到

    测试6

    Sender发送到a.#,Consumer还是监听 a.b

    复制代码
    1 public static void main( String[] args ) throws InterruptedException{
    2     LogSender sender = new LogSender();
    3     while(true){
    4         String routingKey = "a.#";
    5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
    6         Thread.sleep(1000);
    7     }
    8 }
    复制代码

    发送成功

     

    a.#也收不到消息

    测试结果

    通过以上六个测试,我们发现,topic中的通配符,只有在Queue绑定的时候才能起到通配符的作用,如果在发布消息的时候使用通配符,将作为普通的字符处理,发送的routingKey=a.* 并不能把消息发送到routingKey=a.b的Queue上,a.#同理,也不能把消息发送到routingKey=a.b.c的Queue上

    3.实战

    为了体现出#的作用,我们给一开始的需求增加一点难度,规则定位三层,<系统>.<级别>.<类型>,其中类型有两种,common和important

    需求是,Q1监听core系统的所有日志和其他系统所有所有级别的important类型的日志以及error级的日志;Q2监听所有系统的info日志

    首先改造一下Sender,让它可以通过多线程发送不同系统的日志消息

    复制代码
     1 package com.liyang.ticktock.rabbitmq;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 import java.util.Random;
     6 
     7 public class App {
     8 
     9     //声明一个类型MAP
    10     private static Map<Integer, String> typeMap;
    11     //声明一个Random
    12     private static Random random;
    13 
    14     //在静态代码块中初始化
    15     static {
    16         typeMap = new HashMap<>();
    17         typeMap.put(0, "important");
    18         typeMap.put(1, "common");
    19         random = new Random(System.currentTimeMillis());
    20     }
    21 
    22     /**
    23      * 获取一个随机类型的routingKey
    24      * @param system 系统名
    25      * @param level 日志级别
    26      * @return routingKey
    27      */
    28     public static String getRoutingKey(String system, String level) {
    29         return new StringBuilder()
    30                 .append(system).append(".")
    31                 .append(level).append(".")
    32                 .append(typeMap.get(random.nextInt(2)))
    33                 .toString();
    34     }
    35     
    36     /**
    37      * 新建一个线程,发送指定系统的消息
    38      * @param system
    39      */
    40     public static void createSender(final String system){
    41         new Thread(new Runnable() {
    42             //new一个Sender
    43             private  LogSender sender = new LogSender();
    44             
    45             @Override
    46             public void run() {
    47                 while(true){
    48                     long now = System.currentTimeMillis();
    49                     //通过当前时间生成错误级别
    50                     boolean info = now % 2 == 0;
    51                     //生成routingKey
    52                     String routingKey = getRoutingKey(system, info?"info":"error");
    53                     //发送消息
    54                     String msg = routingKey+"["+now+"]";
    55                     sender.sendMessage(msg, routingKey);
    56                     try {
    57                         //随机睡500-1000毫秒
    58                         int sleepTime = random.nextInt(1000);
    59                         Thread.sleep(sleepTime<500?500:sleepTime);
    60                     } catch (InterruptedException e) {
    61                         e.printStackTrace();
    62                     }
    63                 }
    64             }
    65         }).start();
    66     }
    67 
    68     public static void main(String[] args) throws InterruptedException {
    69         //开始发送core系统消息
    70         createSender("core");
    71         //开始发送biz系统消息
    72         createSender("biz");
    73     }
    74     
    75 }
    复制代码

    上述代码可以用两个线程发送两个系统的随机日志消息

    下面实现消费者

    Q1监听core系统的所有日志和其他系统所有级别的important类型的日志以及error级的日志

     把这句话拆分一下

    • core系统所有的日志,core.#
    • 其他系统所有级别的important类型的日志,*.*.important
    • 其他系统所有error级的日志,*.error.*

    那么,我们只要给Q1绑定这三个routingKey就可以了,绑定多个routingKey我们在上一张已经验证过了

    复制代码
     1 package com.liyang.ticktock.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 import com.rabbitmq.client.AMQP;
    10 import com.rabbitmq.client.Channel;
    11 import com.rabbitmq.client.Connection;
    12 import com.rabbitmq.client.ConnectionFactory;
    13 import com.rabbitmq.client.Consumer;
    14 import com.rabbitmq.client.DefaultConsumer;
    15 import com.rabbitmq.client.Envelope;
    16 
    17 public class LogConsumer {
    18 
    19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
    20     private ConnectionFactory factory;
    21     private Connection connection;
    22     private Channel channel;
    23 
    24     /**
    25      * 在构造函数中获取连接
    26      */
    27     public LogConsumer() {
    28         super();
    29         try {
    30             factory = new ConnectionFactory();
    31             factory.setHost("127.0.0.1");
    32             connection = factory.newConnection();
    33             channel = connection.createChannel();
    34             // 声明exchange,防止生产者没启动,exchange不存在
    35             channel.exchangeDeclare("logs","topic");
    36         } catch (Exception e) {
    37             logger.error(" [X] INIT ERROR!", e);
    38         }
    39     }
    40 
    41     /**
    42      * 提供个关闭方法,现在并没有什么卵用
    43      * 
    44      * @return
    45      */
    46     public boolean closeAll() {
    47         try {
    48             this.channel.close();
    49             this.connection.close();
    50         } catch (IOException | TimeoutException e) {
    51             logger.error(" [X] CLOSE ERROR!", e);
    52             return false;
    53         }
    54         return true;
    55     }
    56 
    57     /**
    58      * 获取一个临时队列,并绑定到相应的routingKey上,消费消息
    59      */
    60     public void consume() {
    61         try {
    62             String queueName = channel.queueDeclare().getQueue();
    63             //core系统所有的日志
    64             channel.queueBind(queueName, "logs", "core.#");
    65             //其他系统所有级别的important类型的日志
    66             channel.queueBind(queueName, "logs", "*.*.important");
    67             //其他系统所有error级的日志
    68             channel.queueBind(queueName, "logs", "*.error.*");
    69             
    70             Consumer consumer = new DefaultConsumer(channel) {
    71                 @Override
    72                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    73                         byte[] body) throws IOException {
    74                     String message = new String(body, "UTF-8");
    75                     logger.debug(" [Q1] 打印日志:"+message);
    76                 }
    77             };
    78             //这里自动确认为true,接收到消息后该消息就销毁了
    79             channel.basicConsume(queueName, true, consumer);
    80         } catch (IOException e) {
    81             e.printStackTrace();
    82         }
    83     }
    84 }
    复制代码

    Q2监听所有系统的info日志

     Q2应该绑定 *.info.*

    1 //所有系统的info日志
    2 channel.queueBind(queueName, "logs", "*.info.*");

    把Q1 Q2打包成可执行jar,运行结果如下

    可以看到需求已经正确实现了

    4.结束语

    到这里,RabbitMQ中四中exchagne类型:direct、topic、fanout、headers已经介绍了三种最常用的

    headers不是很常用,放到Alternate Exchanges中一起介绍,后面还会介绍RabbitMQ的其他特性,如:TTL、Lazy Queue、Exchange To Exchange、Dead Lettering等,敬请期待

  • 相关阅读:
    001 云开发基础
    HttpClient 调用外部接口(简单实用)
    MD5加密(简单实现_可自行扩展)
    SAA C02考点梳理
    支配树
    CF1320E 题解
    dp套dp
    20210705模拟赛总结
    20210703模拟赛
    20210629模拟赛总结
  • 原文地址:https://www.cnblogs.com/sharpest/p/10428888.html
Copyright © 2011-2022 走看看