zoukankan      html  css  js  c++  java
  • 【RabbitMQ】4、三种Exchange模式——订阅、路由、通配符模式

    前两篇博客介绍了两种队列模式,这篇博客介绍订阅、路由和通配符模式,之所以放在一起介绍,是因为这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,经过队列绑定交换机到达队列。

           

    一、订阅模式(Fanout Exchange):

       一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消 息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会 丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。

          

             

     

    示例代码:

    生产者:

    public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    //从连接中创建通道
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 消息内容
    String message = "商品已经新增,id = 1000";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
    }
    }

    消费者1:

    public class Recv {
    private final static String QUEUE_NAME = "test_queue_fanout_1";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, true, consumer);
    // 获取消息
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 前台系统: '" + message + "'");
    Thread.sleep(10);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    }


      消费者2的代码和消费者1的代码大致相同,只是队列的名称不一样,这样两个消费者有自己的队列,都可以接收到生产者发送的消息


      但是如果生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。所以使用这种订阅模式实现商品数据的同步并不合理。因此我们介绍下一种模式:路由模式。

     

    二、路由模式(Direct Exchange)

      这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。

           


     

    示例代码:

    生产者:
    public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    // 消息内容
    String message = "删除商品, id = 1001";
    channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
    }
    }

    消费者1:接收更新和删除消息

    public class Recv {
    private final static String QUEUE_NAME = "test_queue_direct_1";
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 前台系统: '" + message + "'");
    Thread.sleep(10);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    }

     

    消费者2:接收insert,update,delete的消息

    public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 搜索系统: '" + message + "'");
    Thread.sleep(10);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    }

      如果生产者发布了insert消息,那么消费者2可以收到,消费者 1收不到,如果发布了update或者delete消息,两个消费者都可以收到。如果发布ABC消息两个消费者都收不到,因为没有绑定这个键值。这种模式 基本满足了我们的需求,但是还不够灵活,下面介绍另外一个模式。

     

    三、通配符模式(Topic Exchange)

       基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词

           

    示例代码:

    生产者:

    public class Send {
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    // 消息内容
    String message = "删除商品,id = 1001";
    channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
    }
    }


    消费者1:

    public class Recv {
    private final static String QUEUE_NAME = "test_queue_topic_1";
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 前台系统: '" + message + "'");
    Thread.sleep(10);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    }



    消费者2:

    public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_topic_2";
    private final static String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] argv) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" 搜索系统: '" + message + "'");
    Thread.sleep(10);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    }


      消费者1是按需索取,并没有使用通配符模式,而是用的完全匹配,消费者2使用通配符模式,这样以item.开头的消息都会全部接收。

     

    小结:

      1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,可以实现一个消息被多个消费者 获取。

      2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到所有的消息,其余两种增加了路由键,并且第三种增加通配符,更加便利。

  • 相关阅读:
    oracle 数据库服务名怎么查
    vmware vsphere 6.5
    vSphere虚拟化之ESXi的安装及部署
    ArcMap中无法添加ArcGIS Online底图的诊断方法
    ArcGIS中字段计算器(高级计算VBScript、Python)
    Bad habits : Putting NOLOCK everywhere
    Understanding the Impact of NOLOCK and WITH NOLOCK Table Hints in SQL Server
    with(nolock) or (nolock)
    What is “with (nolock)” in SQL Server?
    Changing SQL Server Collation After Installation
  • 原文地址:https://www.cnblogs.com/eastday/p/9692451.html
Copyright © 2011-2022 走看看