zoukankan      html  css  js  c++  java
  • [心得体会]RabbitMQ

    • RabbitMQ是什么? 

    消息队列, 基于AMQP(高级消息队列), 使用Erlang语言编写, 收发消息使用

    • 有什么用? 有什么应用场景?

    1. 任务异步处理
    2. 应用程序解耦

    • 为什么使用RabbitMQ?

    1. 使用简单
    2. 基于AMQP
    3. 社区活动
    4, 高并发性能好(Erlang语言)
    5. springboot默认集成RabbitMQ

    • AMQP是什么?

    AMQP,即Advanced Message Queuing Protocol,
    一个提供统一消息服务的应用层标准高级消息队列协议,
    是应用层协议的一个开放标准,为面向消息的中间件设计。
    基于此协议的客户端与消息中间件可传递消息,
    并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
    5
     
    1
    AMQP,即Advanced Message Queuing Protocol,
    2
    一个提供统一消息服务的应用层标准高级消息队列协议,
    3
    是应用层协议的一个开放标准,为面向消息的中间件设计。
    4
    基于此协议的客户端与消息中间件可传递消息,
    5
    并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

    • AMQP的应用场景

    存储转发(多个消息发送者,单个消息接收者)。

    分布式事务(多个消息发送者,多个消息接收者)。

    发布订阅(多个消息发送者,多个消息接收者)。

    基于内容的路由(多个消息发送者,多个消息接收者)。

    文件传输队列(多个消息发送者,多个消息接收者)。

    点对点连接(单个消息发送者,单个消息接收者)。






    • RabbitMQ安装
    • Linux安装

    rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm
    1
     
    1
    rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm
    rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm
    1
     
    1
    rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm
    rpm -qa|grep rabbitmq
    1
     
    1
    rpm -qa|grep rabbitmq
    $ sudo chkconfig rabbitmq-server on  # 添加开机启动RabbitMQ服务
    $ sudo /sbin/service rabbitmq-server start # 启动服务
    $ sudo /sbin/service rabbitmq-server status  # 查看服务状态
    $ sudo /sbin/service rabbitmq-server stop   # 停止服务
    
    # 查看当前所有用户
    $ sudo rabbitmqctl list_users
    
    # 查看默认guest用户的权限
    $ sudo rabbitmqctl list_user_permissions guest
    
    # 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
    $ sudo rabbitmqctl delete_user guest
    
    # 添加新用户
    $ sudo rabbitmqctl add_user username password
    
    # 设置用户tag
    $ sudo rabbitmqctl set_user_tags username administrator
    
    # 赋予用户默认vhost的全部操作权限
    $ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
    
    # 查看用户的权限
    $ sudo rabbitmqctl list_user_permissions username
    
    26
     
    1
    $ sudo chkconfig rabbitmq-server on  # 添加开机启动RabbitMQ服务
    2
    $ sudo /sbin/service rabbitmq-server start # 启动服务
    3
    $ sudo /sbin/service rabbitmq-server status  # 查看服务状态
    4
    $ sudo /sbin/service rabbitmq-server stop   # 停止服务
    5
    6
    # 查看当前所有用户
    7
    $ sudo rabbitmqctl list_users
    8
    9
    # 查看默认guest用户的权限
    10
    $ sudo rabbitmqctl list_user_permissions guest
    11
    12
    # 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
    13
    $ sudo rabbitmqctl delete_user guest
    14
    15
    # 添加新用户
    16
    $ sudo rabbitmqctl add_user username password
    17
    18
    # 设置用户tag
    19
    $ sudo rabbitmqctl set_user_tags username administrator
    20
    21
    # 赋予用户默认vhost的全部操作权限
    22
    $ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
    23
    24
    # 查看用户的权限
    25
    $ sudo rabbitmqctl list_user_permissions username
    26

    开启web管理接口

    如果只从命令行操作RabbitMQ,多少有点不方便。幸好RabbitMQ自带了web管理界面,只需要启动插件便可以使用。

    $ sudo rabbitmq-plugins enable rabbitmq_management
    1
     
    1
    $ sudo rabbitmq-plugins enable rabbitmq_management

    然后通过浏览器访问

    http://localhost:15672

    输入用户名和密码访问web管理界面了。

    配置RabbitMQ

    关于RabbitMQ的配置,可以下载RabbitMQ的配置文件模板/etc/rabbitmq/rabbitmq.config, 然后按照需求更改即可。
    关于每个配置项的具体作用,可以参考官方文档
    更新配置后,别忘了重启服务哦!

    开启用户远程访问

    默认情况下,RabbitMQ的默认的guest用户只允许本机访问, 如果想让guest用户能够远程访问的话,只需要将配置文件中的loopback_users列表置为空即可,如下:

    {loopback_users, []}
    1
     
    1
    {loopback_users, []}

    另外关于新添加的用户,直接就可以从远程访问的,如果想让新添加的用户只能本地访问,可以将用户名添加到上面的列表, 如只允许admin用户本机访问。

    {loopback_users, ["admin"]}
    1
     
    1
    {loopback_users, ["admin"]}
    
    

    更新配置后,别忘了重启服务哦!

    • window安装方法

    1. 安装erlang

    创建 ERLANG_HOME=D:Program Fileserl9.3 和 %ERLANG_HOME%/bin 环境变量

    2. 安装RabbitMQ

    进入rabbitMQ安装目录, 执行bat批处理文件

    注意事项

    怎么没HelloWorld??? 别急, 在后面

    rabbitMQ消息确认

    1) 生产者发送消息到消息队列后, 消费者收到消息后, 消费者将会返回一个Ack进行响应, 确认消息是否传达完毕, 但是如果消费者意外关闭了, 生产者无法收取到Ack, 则RabbitMQ将会将这个消息保留下来并传达给另外的一个目标消费者
    2) 消息确认必须在相同的一个通道上
    channel.basicQos(1); // accept only one unack-ed message at a time (see below)
    
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
    
      System.out.println(" [x] Received '" + message + "'");
      try {
        doWork(message);
      } finally {
        System.out.println(" [x] Done");
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
    };
    boolean autoAck = false; // 手动Ack功能开始
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

    发现消息队列中的消息过多的话, 需要删除
    window 
    rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    linux
    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

    消息持久化

    将消息队列中的消息持久化到磁盘中, 但是它还是有可能丢失的, 当消息在消息队列中但是又没持久化到磁盘中时, RabbitMQ如果意外关闭了, 那么这个消息将丢失, 如果需要做到消息不丢失的话, 必须使用publisher confirms
    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);

    公平调度

    RabbitMQ无法知道哪个消费者是否有空, 都会按照顺序交给消费者, 如果有一个消费者太过于忙碌, RabbitMQ也不知道, 照样会发布消息给那个消费者
    解决方法: 
    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    prefetchCount = 1, 调用basicQos方法这个消费者就会只拿出一个消息进行操作, 其他的消息它都拒绝掉, 拒绝的消息将被滞留带消息队列中, 但是这个消息将会丢给新的消费者
    新的问题: 
    所有的消费者都忙碌了, 那么生产者产生的消息一直滞留在消息队列中, 这样消息队列会出现更多的消息, 直达消息队列爆了
    下面是第一阶段学习完成的最终代码: 
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class NewTask {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
            String message = String.join(" ", argv);
    
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
      }
    
    }


    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Worker {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        channel.basicQos(1);
    
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
      }
    
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
      }
    }

    第一阶段学习完成了, 现阶段, 我们总结下:

    1. 消息确认

    RabbitMQ 存在消息确认机制, 防止消息轻易丢失, 详细过程是:
    生产者标记生产的消息需要消费者ack(应答)的消息, 将消息体发送到消息队列中, 消费者收到消息后, 发现这个消息需要手动应答的消息后, 关闭自动消息应答, 将消息应答的权利交给程序员手动调用, 应答或者拒绝, 拒绝的话, 有两种情况, 一种是拒绝并将消息丢弃或者拒绝将消息重新放入消息队列中, 如果消费者在应答的时候突然崩溃, 则生产者不会讲该消息销毁掉, 而是重新进入消息队列中, 传达给其他消费者, 如果生产者突然崩了, 则需要靠持久化来解决

    2. 消息的持久化

    RabbitMQ为了防止消息在消息队列中丢失, 所以使用了, 消息持久化, 但是它不是实时持久化的, 有时间间隙还是容易丢失消息, 后面有解决方案

    3. 公平调度原则

    RabbitMQ为了防止一些消费者过于忙碌而另一些消费者过于空闲的情况, 引入了公平调度原则, 你忙着就不给你排任务了, 将任务丢给消息队列, 让闲着的人去那任务, 但是导致了新的问题, 就是消息队列中的消息一直滞留消息队列中, 导致消息队列过于庞大, 导致RabbitMQ崩溃, 解决方案引入新的消费者或者使用TTL

    至于啥是TTL???

    想打人, 又是专业又简约的单词.......................... 尽给我瞎整些专业名词...................
    TTL === Time-To-Live and Expiration
    人话就是: 给消息/队列加上寿命

    1. 如何给消息加上TTL?

    * 为消息添加TTL时间的方式有两种
    * (1) 为添加进入队列的每个消息都加上TTL
    * (2) 在消息即将公布的时候, 给消息添加上消息的ttl

    (1) 为添加进入队列的每个消息都加上TTL

    Map<String, Object> arg = new HashMap<>();
    arg.put("x-message-ttl", 60000);
    channel.queueDeclare(QUEUE, true, false, false, arg);

    (2) 在消息即将公布的时候, 给消息添加上消息的ttl

    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("600000").build();
    channel.basicPublish("my-exchange", QUEUE, properties, message.getBytes());

    • 同一个消息在不同消息队列中是否会同时寿终正寝?

    发送到多个队列中的同一个消息可能在不同的时刻消失, 一个消息队列中的消息死亡不会影响其他队列中的这个消息队列, TTL的时间单位是毫秒, 且这个参数的值不能是非负数的
    • 如何修改TTL的时间? 

    linux: 
    rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
    window:
    rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues
    上面的命令将是 60 秒
    还可以使用
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 60000);
    channel.queueDeclare("myqueue", false, false, false, args);
    临时修改60秒时间

    需要注意: 

    ①如果这个时候, 队列被加上TTL时间, 则不影响原本处于队列的那个消息的TTL失效时间
    ②如果同时给队列和消息加上TTL时间时, 则选择TTL较小的那个
    • 在消息即将公布的时候, 给消息添加上ttl时间

    2. 如何给队列加上TTL?

    (1) 给队列加上TTL的话, 只有队列在空闲或者消费者没有连接到队列的时候才会开始使用TTL的时间
    (2) 使用x-expires和expires可以设置队列的TTL
    (3) 单位是毫秒
    (4) 具体使用的方法
    linux: 
    rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues
    window:
    rabbitmqctl.bat set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
    也可以在消息队列声明的时候为队列加上TTL
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-expires", 1800000);
    channel.queueDeclare("myqueue", false, false, false, args);

    // 为添加进入队列的每个消息都加上TTL
    // Map<String, Object> arg = new HashMap<>();
    // arg.put("x-message-ttl", 60000);
    // 为队列添加TTL , 半个小时的时间
    Map<String, Object> arg = new HashMap<>();
    arg.put("x-expires", 1800000);
    channel.queueDeclare(QUEUE, true, false, false, arg);
    String message = "Hello World";
    // 在消息即将公布的时候, 给消息添加上消息的ttl
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("600000").build();
    channel.basicPublish("my-exchange", QUEUE, properties, message.getBytes());


    下面是我写的第一阶段的代码, 不包含TTL
    package com.xuecheng.manage_cms.rabbit;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;

    /**
    * @author zhengwei
    * @version 1.0.0
    * @date 2019/12/4 20:29
    * @msg
    **/
    public class Producer01 {

    private final static String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception {
    Connection connection = null;
    Channel channel = null;
    try {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();
    channel.queueDeclare(QUEUE, true, false, false, null);
    String message = "Hello World";
    // 将生产者的通道设置为持久化状态
    channel.basicPublish("", QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    System.out.println("[x] Sent'" + message + "'");
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    if (channel != null) {
    channel.close();
    }
    if (connection != null) {
    connection.close();
    }
    }
    }

    }

    package com.xuecheng.manage_cms.rabbit;

    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.nio.charset.StandardCharsets;

    /**
    * @author zhengwei
    * @version 1.0.0
    * @date 2019/12/4 20:50
    * @msg
    **/
    public class Consumer01 {

    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws Exception {
    func1();
    }

    public static void func1() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE, true, false, false, null);
    channel.basicQos(1);
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("receive message..." + message);
    try {
    doWork(message);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println(" [x] Done");
    // 应答
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    };
    // 开启消费者的自动 Ack 功能
    boolean autoAck = false;
    channel.basicConsume(QUEUE, autoAck, deliverCallback, (consumerTag) -> {
    System.out.println("consumerTag: " + consumerTag);
    });
    }

    public static void doWork(String task) throws InterruptedException {
    for (char ch : task.toCharArray()) {
    if (ch == '.') {
    Thread.sleep(1000);
    }
    }
    }

    public static void func() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE, true, false, false, null);
    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String exchange = envelope.getExchange();
    String routingKey = envelope.getRoutingKey();
    long deliveryTag = envelope.getDeliveryTag();
    String msg = new String(body, "utf-8");
    System.out.println("receive message..." + msg);
    channel.basicAck(envelope.getDeliveryTag(), false);
    //true if the rejected message should be requeued rather than discarded/dead-lettered
    // 如果拒绝的方式是 true 则这个消息不被丢弃, false的话直接丢弃消息
    channel.basicReject(envelope.getDeliveryTag(), true);
    }
    };
    channel.basicConsume(QUEUE, true, consumer);
    }
    }


    第二阶段学习

    前言, 了解学习第二阶段的目标是什么???

    上面第一阶段的学习, 我们最终创建并完成了, 将一个消息通过消息队列传递给另一个消费者, 接下来, 我们将学习的目标是, 将一个消息, 传递给多个消费者, 也就是传说中的消息的发布和订阅

    1. 发布和订阅

    1. Exchanges交换器

    (1) rabbitMQ的生产者不知道自己发送的消息是否成功, 它将自己的消息发送到exchanges交换器上, 交换器负责接收生产者的所有消息, 然后由他分配消息将传递给哪个队列并创建管道发送给消费者
    (2) 列出服务器上的所有交换器
    sudo rabbitmqctl list_exchanges
    exchanges和Queue之间使用routingKey进行路由选取哪个队列
    (3) 发布命名交换器
    channel.basicPublish( "logs", "", null, message.getBytes());

    2. Temporary queues

    是什么?

    临时队列就是一个非持久的、独占的、自动删除的队列, 

    作用 

    获取当前最新的消息, 而不是包括旧的信息的消息

    3. Bindings

    (1) 用于交换机和队列之间的绑定

    (2) 如何绑定:

    channel.queueBind(queueName, "logs", "");

    (3) 查看先有的绑定

    rabbitmqctl list_bindings

    4. Putting it all together


    public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = null;
    Channel channel = null;
    try {
    connection = factory.newConnection();
    channel = connection.createChannel();
    // 声明一个扇形交换机, 向所有的消费者发布消息
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String message = "zhazha Hello World";
    // 指定 logs(EXCHANGE_NAME) 交换器, 将消息发布出去
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    if (channel != null) {
    channel.close();
    }
    if (connection != null) {
    connection.close();
    }
    }
    }

    }


    public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    DeliverCallback deliverCallback = (consumerTag, message) -> {
    System.out.println("message = " + new String(message.getBody(), StandardCharsets.UTF_8));
    };
    channel.basicConsume(queueName, true, deliverCallback, (consumerTag) -> {
    });
    }

    }

    路由选择




    直接交换

    1. 是什么?
    是一个算法, 消息发送到与其binding key与消息routing key完成匹配的队列
    绑定键就是图中的orange, black, green, 直接交换就是图中的 direct 就是那个 X 
    核心: 关注routing key, 这个就是将消息传递到这个绑定键的东西
    多重绑定

    一个绑定键可以和多个queue进行绑定

    发出日志

    emmmmmmmmmmmm
    订阅

    绑定键 == 路由键 + 某些操作
    可以说绑定键 ≈ 路由键




    总结

    也就是说, 路由键相当于频道, 生产者就是新闻工作者, 消费者就是观众

    public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String routingKey = "info";
    String message = "zhazha";
    channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
    System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }

    }


    public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
    String[] argv = new String[]{"info", "warning", "error"};
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();
    for (String routingKey : argv) {
    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    });
    }

    }

    主题 -- Topics

    1. 主题交换机
    主题交换的 routing_key 不能随便命名, 可以是以quick.rabbit这种  "."的方式作为routing_key
    且这个routing_key还能使用 * 和 # 进行配对, * 是匹配一个单词使用, # 表示一群单词和字符匹配

    设置为“ quick.orange.rabbit”的路由键的消息将被传递到两个队列。 信息“ lazy.orange.elephant”也会发给他们两个。 另一方面,“ quick.orange.fox”只会出现在第一个队列中,而“ lazy.brown.fox”只出现在第二个队列中。 “ lazy.pink.rabbit”只会被传递到第二个队列一次,即使它匹配两个绑定。 “ quick.brown.fox”不符合任何装订,所以它会被丢弃。

    其他不匹配的全部丢弃










































  • 相关阅读:
    180322
    20180317
    C语言编译器
    PAT甲级_PAT Advanced Level 1002. A+B for Polynomials (25) C
    java中的装箱与拆箱
    PAT甲级_PAT Advanced Level 1002. A+B for Polynomials (25)
    Java(3)_Ideal 使用指南
    4.jmeter参数化实战
    1.jmeter搭建环境
    Python_异常机制及日志
  • 原文地址:https://www.cnblogs.com/bangiao/p/12418392.html
Copyright © 2011-2022 走看看