zoukankan      html  css  js  c++  java
  • RabbitMQ-从基础到实战(4)— 消息的交换(中)

    转载请注明出处

    0.目录

    RabbitMQ-从基础到实战(1)— Hello RabbitMQ

    RabbitMQ-从基础到实战(2)— 防止消息丢失

    RabbitMQ-从基础到实战(3)— 消息的交换(上)

    RabbitMQ-从基础到实战(5)— 消息的交换(下)

    RabbitMQ-从基础到实战(6)— 与Spring集成

    1.简介

    本章节和官方教程相似度较高,英文好的可以移步官方教程

    在上一章的例子中,我们创建了一个消费者,生产日志消息,广播给两个消费者,对消息进行不同的处理。这一节,我们将对它进行扩展,实现一些更加高级的功能,例如:使消费者A只接受error级别的日志保存到硬盘,消费者B接收所有级别的消息进行打印。

    本文中涉及到的所有概念(包括前面几章),都将摒弃个人经验,以官方文档为基础进行讲解,在书写本文的同时,也是我对RabbitMQ的重新学习。

    2.绑定

    回顾一下上一章的队列绑定代码

    // 把刚刚获取的队列绑定到logs这个交换中心上,
    channel.queueBind(queueName, "logs", "");

    这段代码在消费者中,为什么生产者没有?因为在RabbitMQ中消息是发送到交换中心(exchange)的,这在上一张已经重点强调过。

    上述代码可以理解成,queueName这个队列对logs这个exchange中的消息感兴趣,routingKey是""

    在发送消息的basicPublish方法中,也有一个参数叫做routingKey,没错,他们是有关联的,下面会介绍

    在不同的exchange类型中,routingKey扮演的角色也相应的不同,比如上一章我们使用的fanout(扇出,多贴切的名字,想象一下WOW中盗贼的刀扇)将忽略routingKey,所有绑定在fanout类型的exchange上的队列,都将接收到该exchange上的所有消息。

    3.Direct Exchange

    fanout类型的exchange没有给我们太多的灵活性,direct类型的echange非常简单,会匹配消息发布时的routingKey和queue的routingKey,完全相等则把消息放入该队列。

    image

    如上图,Q1绑定了orange,Q2绑定了black和green,就可以实现不同级别的日志用不同的消费者进行处理

    我们看到Q2绑定了两个routingKey,难道第二次绑定不会把第一次绑定覆盖掉吗?

    实践出真正,我们来试一下

    image

    1. 声明一个名为logs的exchange,类型换为direct,让它通过routingKey的完全匹配去分发消息
    2. 然后把消息发送到名为logs的exchange上,routingKey是外面传进来的

    改造一下发送方法,轮流发送info和error信息

    image

    给Consummer队列绑定两个routingKey

    image

    激动人心的时刻到来了,跑一把

    Duang,报错了

    image

    报错信息:

    inequivalent arg 'type' for exchange 'logs' in vhost '/': received 'direct' but current is 'fanout', class-id=40, method-id=10)

    大意就是logs exchange已经被声明称fanout了,不能再声明成direct类型,RabbitMQ的队列声明方法和exchange声明方法都是幂等的,如果没有,就创建,如果有,参数相同,就不管,如果有了还用不同的参数重新声明,就报错

    进入RabbitMQ控制台把logs删除,重新执行

    image

    逆袭成功,消费一下看看

    image

    成功了,一个队列可以绑定多个routingKey,这里注意先启动消费者,因为前面的代码里我们用的是临时队列,断开连接后,队列就删除了,如果先启动生产者,exchange接到消息后发现没有队列对它感兴趣,就任性的把消息给丢掉了。

    一个队列可以绑定多个routingKey,反之,一个routingKey也可以绑定多个队列,如下图,感兴趣的朋友可以自己试一下

    image

    如果绑定在一个direct类型的exchange上的队列都使用同一个routingKey,那它就是一个fanout

    4.实战

    要实现本章的需求,即Q1只接收error级别的日志写到硬盘上,Q2接收error和info级别的日志打印出来

    用direct类型的exchange来实现这个需求非常简单,Q1绑定error,Q2绑定error和info即可,缺点是Q2需要绑定N个routingKey,N=日志级别数量,我们可以用一些编程的技巧来规避它

    Sender的代码上面已经改好了,把exchange换为direct,注意删除原exchange,不再赘述

    Q2绑定所有日志级别,我们用一个Enum来规避手动绑定

    定义一个Enum

    1 public enum LogType{
    2     error,info;
    3 }

    用foreach语法糖进行循环绑定

    1 //绑定所有类型
    2 for(LogType logType: LogType.values()){
    3     channel.queueBind(queueName, "logs", logType.name());
    4 }

    foreach是单线程的,这里也可以装个逼用一下JAVA8的lambda,由于lambda是并行处理,所以外围的try catch无效,需要在内层重新抓取异常,而且不能抛出,反而显得代码很不美好,装逼失败

    1 IntStream.range(0, LogType.values().length).forEach(n->{
    2     try {
    3         channel.queueBind(queueName, "logs", LogType.values()[n].name());
    4     } catch (IOException e) {
    5         e.printStackTrace();
    6     }
    7 });

    把另外一个Consumer改成只绑定error队列

    1 channel.queueBind(queueName, "logs", LogType.error.name());

    然后,改造一下发送消息的地方,一开始我们用了一个while还有一组if else,看起来比较挫,别人看你代码的时候,就不会觉得你很厉害,这样和你不写博客一样,对你的工作是没有好处的,我们把它改的高端一点

    1 while(true){
    2     boolean info = ++i%2==0;
    3     String type = info?LogType.info.name():LogType.error.name();
    4     sender.sendMessage(type +" message: "+i, type);
    5     Thread.sleep(1000);
    6 }

    对比一下

    image

    是不是觉得自己厉害了很多?这就是编码的艺术(得意脸)

    好了,这一章没有太多内容,跑一下看看结果

    左边的Consumer1,消费了info和error级别的日志,右边的Consumer2,只消费了error级别的日志

    5.结束语

    这一章主要是介绍了RabbitMQ中direct类型的exchange,下一章将跟着官方教程的进度继续介绍topic类型的exchange,以及下下章介绍用RabbitMQ实现RPC调用。之后则会介绍RabbitMQ与Spring的集成等与真实开发环境更相关的技术。

  • 相关阅读:
    python中的编码问题
    CVPR2018 Tutorial 之 Visual Recognition and Beyond
    hdu 1376 Octal Fractions
    hdu 1329 Hanoi Tower Troubles Again!
    hdu 1309 Loansome Car Buyer
    hdu 1333 Smith Numbers
    hdu 1288 Hat's Tea
    hdu 1284 钱币兑换问题
    hdu 1275 两车追及或相遇问题
    hdu 1270 小希的数表
  • 原文地址:https://www.cnblogs.com/4----/p/6590459.html
Copyright © 2011-2022 走看看