zoukankan      html  css  js  c++  java
  • 【RabbitMQ】 Routing

    Routing

    之前的章节里我们构建了一个简单的日志系统。我们可以广播所有的日志消息给所有的接收端。

    本节我们将给它添加一个新特性 - 我们将允许只订阅一个消息的子集。例如,我们只将关键的错误消息定位到文件中(以节省磁盘空间),同时仍然可以在控制台输出所有日志消息。

    Bindings

    在前面的例子中我们已经创建了绑定关系。回想代码如下:

    channel.queueBind(queueName, EXCHANGE_NAME, "");

    一个绑定是指一个交换机和一个队列之间的关系。可以简单的理解为:队列对交换机中的消息感兴趣。

    绑定需要一个额外的routingKey参数。为了避免和basic_publish中的参数混淆,我们现在称它为binding key。创建一个带有binding key的绑定:

    channel.queueBind(queueName, EXCHANGE_NAME, "black");

    binding key的意义跟交换机的类型有关。我们之前使用的fanout交换机就直接忽略了binding key。

    Direct交换机

    我们之前的日志系统广播所有的消息给所有的消费者。我们希望扩展它,以便它可以根据消息严重级别来过滤。例如我们可能想要一个程序仅将关键错误写入磁盘,从而避免写入警告或信息导致磁盘空间的浪费。

    我们曾经使用fanout交换机,它没有给我们很多的灵活性,仅仅就是无情的广播。

    我们将使用一个direct交换机替代它。direct交换机的路由算法很简单 - 一个将要进入队列的消息的routing key必须和这个队列的binding key完全吻合。

    为了说明这一点,请考虑一下设置:

    可以看到direct交换机绑定了两个队列。第一个队列绑定了一个叫做orange的binding key。第二个队列有两个绑定,一个black一个green。

    在这种设置下,以routing key为orange的消息将会被路由到队列Q1。而routing key为black和green的消息会被路由到Q2。所有其它消息会被丢弃。

    多绑定

    使用相同的binding key绑定多个队列是合法的。在我们的例子中,我们可以添加一个X和Q1之间的绑定,使用black作为binding key。这种情况下,direct交换机会像fanout一样广播消息给所有的匹配队列。一个带有routing key为black的消息将会被同时发送到Q1和Q2。

    发送日志

    我们将应用这种模型到我们的日志系统中。与fanout不同的是,我们将发送消息到一个direct交换机。需要提供日志级别作为routing key。这样接收程序就可以选择它希望接收的日志级别。首先先关注日志的发布:

    首先创建一个交换机:

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    准备发送消息:

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

    为了简化,我们假设severity可以是info,warning,error等。

    订阅

    接收消息就像前几节中讲的类似,但有一个例外 - 我们需要为每一个感兴趣的级别创建一个新的绑定。

    String queueName = channel.queueDeclare().getQueue();
    
    for(String severity : argv){    
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }

    Putting it all together

    EmitLogDirect.java

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLogDirect {
    
      private static final String EXCHANGE_NAME = "direct_logs";
    
      public static void main(String[] argv) throws Exception {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
        String severity = getSeverity(argv);
        String message = getMessage(argv);
    
        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    
        channel.close();
        connection.close();
      }
    
      private static String getSeverity(String[] strings){
        if (strings.length < 1)
                return "info";
        return strings[0];
      }
    
      private static String getMessage(String[] strings){
        if (strings.length < 2)
                return "Hello World!";
        return joinStrings(strings, " ", 1);
      }
    
      private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0 ) return "";
        if (length < startIndex ) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
      }
    }

    ReceiveLogsDirect.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsDirect {
    
      private static final String EXCHANGE_NAME = "direct_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
    
        if (argv.length < 1){
          System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
          System.exit(1);
        }
    
        for(String severity : argv){
          channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }
  • 相关阅读:
    C#使用表达式树实现对象复制
    用vbs将字符串复制到剪贴板
    C# 动态获取程序集信息
    关于ftp的主动模式(Active Mode)和被动模式(Passive Mode)
    %userprofile%\Local Settings文件夹拒绝访问怎么办
    在Winform框架的多文档界面中实现双击子窗口单独弹出或拖出及拽回的处理
    ABP VNext框架基础知识介绍(1)框架基础类继承关系
    ABP开发框架中分页查询排序的实现处理
    Vue&Element开发框架中增加工作流处理,工作流的各个管理页面的界面处理
    基于ABP开发框架的技术点分析和项目快速开发实现
  • 原文地址:https://www.cnblogs.com/shiyu404/p/6253289.html
Copyright © 2011-2022 走看看