zoukankan      html  css  js  c++  java
  • RabbitMQ入门:主题路由器(Topic Exchange)

    上一篇博文中,我们使用direct exchange 代替了fanout exchange,这次我们来看下topic exchange。

    一、Topic Exchange介绍

    topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准。

    direct路由器类似于sql语句中的精确查询;topic 路由器有点类似于sql语句中的模糊查询。

    还记得吗?我们在《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中对exchange的分类进行过介绍:

    Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
    Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
    Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
    Headers:我们可以不考虑它。

    下面是官网给出的工作模型(P代表生产者,X代表exhange,红色的Q代表队列,C代表消费者):

    我们来分析下这个模型。

    它发送的消息是用来描述动物的。路由键有三个单词:<speed>.<color>.<species>,第一个单词描述了速度,第二个描述了颜色,第三个描述了物种。
    有三个绑定键,Q1绑定键为*.orange.*(关注所有颜色为orange的动物); Q2的绑定键有两个,分别是*.*.rabbit(关注所有的兔子)和lazy.#(关注所有速度为lazy的动物)。

    因此,路由键为quick.orange.rabbit的消息将发送到Q1和Q2,路由键为quick.orange.fox的消息将发送到Q1,路由键为lazy.brown.fox的消息将发送到Q2。路由键为lazy.pink.rabbit的消息将发送到Q2,但是注意,它只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。

    如果有人手一抖发了个lazy.orange.male.rabbit这种四个单词的,这个怎么办呢? 由于它和lazy.#匹配,因此将发送到Q2。

    二、代码示例

    接下来我们看下代码

    1. 生产者
      public class LogTopicSender {
          // exchange名字
          public static String EXCHANGE_NAME = "topicExchange";
      
          public static void main(String[] args) {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              Connection connection = null;
              Channel channel = null;
              try {
                  // 1.创建连接和通道
                  connection = factory.newConnection();
                  channel = connection.createChannel();
      
                  // 2.为通道声明topic类型的exchange
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                  
                  // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
                  String routingKey = "info";
      //            String routingKey = "log4j.error";
      //            String routingKey = "logback.error";
      //            String routingKey = "log4j.warn";
                  String msg = " hello rabbitmq, I am " + routingKey;
                  channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                  System.out.println("product send a msg: " + msg);
              } catch (IOException e) {
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  e.printStackTrace();
              } finally {
                  // 4.关闭连接
      
                  if (connection != null) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
      
          }
      }
    2. 消费者
      public class LogTopicReciver {
      
          public static void main(String[] args) {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("localhost");
              Connection connection = null;
              Channel channel = null;
              try {
                  // 1.创建连接和通道
                  connection = factory.newConnection();
                  channel = connection.createChannel();
      
                  // 2.为通道声明topic类型的exchange
                  channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                  // 3.创建随机名字的队列
                  String queueName = channel.queueDeclare().getQueue();
      
                  // 4.建立exchange和队列的绑定关系
                  String[] bindingKeys = { "#" };
      //            String[] bindingKeys = { "log4j.*", "#.error" };
      //            String[] bindingKeys = { "*.error" };
      //            String[] bindingKeys = { "log4j.warn" };
                  for (int i = 0; i < bindingKeys.length; i++) {
                      channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]);
                      System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]);
                  }
      
                  // 5.通过回调生成消费者并进行监听
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope,
                              com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
      
                          // 获取消息内容然后处理
                          String msg = new String(body, "UTF-8");
                          System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]");
                      }
                  };
                  // 6.消费消息
                  channel.basicConsume(queueName, true, consumer);
      
              } catch (IOException e) {
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  e.printStackTrace();
              }
          }
      }
    3. 启动消费者,作为消费者1
    4. 分别将String[] bindingKeys = { "#" };改为String[] bindingKeys = { "log4j.*", "#.error" };/String[] bindingKeys = { "*.error" };/String[] bindingKeys = { "log4j.warn" };,然后启动作为消费者2、消费者3、消费者4
    5. 启动4次生产者,routing key分别为String routingKey = "info";、String routingKey = "log4j.error";、String routingKey = "logback.error";、String routingKey = "log4j.warn";
    6. 观察控制台log
      生产者:
      product send a msg:  hello rabbitmq, I am info
      product send a msg:  hello rabbitmq, I am log4j.error
      product send a msg:  hello rabbitmq, I am logback.error
      product send a msg:  hello rabbitmq, I am log4j.warn
      
      消费者1:
       **** LogTopicReciver keep alive ,waiting for #
      *********** LogTopicReciver get message :[ hello rabbitmq, I am info]
      *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
      *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
      *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]
      
      消费者2:
       **** LogTopicReciver keep alive ,waiting for log4j.*
       **** LogTopicReciver keep alive ,waiting for #.error
      *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
      *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
      *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消费者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消费者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]
    7. 观察RabbitMQ管理页面

  • 相关阅读:
    Python+Selenium自动化总结
    Python+Selenium自动化-定位一组元素,单选框、复选框的选中方法
    Python+Selenium自动化-模拟键盘操作
    【leetcode】1053. Previous Permutation With One Swap
    【leetcode】1052. Grumpy Bookstore Owner
    【leetcode】1034. Coloring A Border
    【leetcode】1042. Flower Planting With No Adjacent
    【leetcode】1035. Uncrossed Lines
    【leetcode】1048. Longest String Chain
    【leetcode】1047. Remove All Adjacent Duplicates In String
  • 原文地址:https://www.cnblogs.com/sam-uncle/p/9229007.html
Copyright © 2011-2022 走看看