zoukankan      html  css  js  c++  java
  • RabbitMQ之路由(Routing)【译】

    目录

    绑定(Binding)
    直接交换(Direct exchange)
    多个绑定(Multiple bindings)
    分发日志(Emitting logs)
    订阅(Subscribing)
    代码整合

    在上一节中,我们创建了一个简单的日志系统,可以广播消息到很多接收者。
    这一节,我们将在上一节的基础上加一个功能——订阅部分消息。例如,我们只将严重错误信息写入到日志文件保存在磁盘上,同时我们能将所有的日志都打印到屏幕上。

    绑定(Binding)

    我们之前已经创建了一个绑定:

    channel.queueBind(queueName, EXCHANGE_NAME, "");

    绑定是exchange和队列之间的联系,我们可以简单的理解为:队列对这个exchange中的消息感兴趣。
    绑定可以采取额外的routingKey参数,为了避免与basic_publish的参数混淆,我们称之为binding key,创建过程如下:

    channel.queueBind(queueName, EXCHANGE_NAME, "black");

    binding key的含义取决于exchange的类型,上一节中的fanout exchange忽略了这个值。

    直接交换(Direct exchange)

    上一节中我们广播所有的消息到所有的消费者,现在我们想根据消息的严重程度来筛选消息。我需要一个程序来将日志写入到磁盘,而不会在警告和信息日志上浪费磁盘空间。
    我们之前用的是fanout exchange,它并不是很灵活——唯一能做的事情就是盲目的广播。
    我们将使用一个direct exchange来代替,direct exchange背后的路由算法很简单——根据binding key和routing key的匹配,一个消息进入到指定队列。
    为了说明,请参考下图:


    从上图,我们可以看到,direct exchange X绑定了两个队列,第一个队列的绑定key是orange,第二个有两个绑定,一个black,一个green。
    这样我们发布一个routing key为orange的exchange,将会被路由到Q1这个队列,routing key为black或者green的消息将会进入到Q2,所有其他消息将会被丢弃。

    多个绑定(Multiple bindings)

    用相同的key绑定多个队列这是合法的,在我们的例子里面我们可以在X和Q1之间绑定一个key为black,这样的话,direct exchange将会和fanout一样,将消息广播给所有匹配的Key,一个路由key为black的消息将会被分发给Q1和Q2。

    分发日志(Emitting logs)

    我们将这种模式运用到我们的日志系统中,我们发送消息到direct exchange,利用routing key支持日志的级别,这种方式接收程序将能够选择它想接收的严重性。
    与往常一样,我们需要创建一个exchange:

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    为发送消息做准备:

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

    为了简单起见,我们假设“严总”

    订阅(Subscribing)

    接收消息的程序和之前差不多,有一个点不一样——我们将为每一个我们感兴趣的严重性建立一个新的绑定。

    String queueName = channel.queueDeclare().getQueue();
    
    for(String severity : argv){    
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }

    代码整合

    总体的架构图:


    EmitLogDirect.java

    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv)
                      throws java.io.IOException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
            String severity = getSeverity(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    
            channel.close();
            connection.close();
        }
        //..
    }

    ReceiveLogsDirect.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsDirect {
    
      private static final String EXCHANGE_NAME = "direct_logs";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
    
        if (argv.length < 1){
          System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
          System.exit(1);
        }
    
        for(String severity : argv){
          channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
          }
        };
        channel.basicConsume(queueName, true, consumer);
      }
    }

    原文地址:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

    代码地址:https://github.com/aheizi/hi-mq

    相关:
    1.RabbitMQ之HelloWorld
    2.RabbitMQ之任务队列
    3.RabbitMQ之发布订阅
    4.RabbitMQ之路由(Routing)
    5.RabbitMQ之主题(Topic)
    6.RabbitMQ之远程过程调用(RPC)

  • 相关阅读:
    (转)CentOS 和 Ubuntu 下的网络配置
    love 的Python 表示
    python mysqlLdb ImportError: DLL load failed: 找不到指定的模块
    elasticsearch7.11.1安装及使用小记
    python多进程代码示例
    在c++项目中使用高性能的rapidjson作为json处理库
    使用kenlm进行文本纠错
    供應商主檔建立流程
    SAP系統自帶范例
    内部订单作业流程
  • 原文地址:https://www.cnblogs.com/xieyulin/p/7070242.html
Copyright © 2011-2022 走看看