zoukankan      html  css  js  c++  java
  • rabbitmq系列四 之路由

    1、路由

      在上一个的教程中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。

      在本次教程中,我们向该系统添加一些特性,比如,我只需要严重错误(erroe级别)的部分日志打印到磁盘文件中,但是同时仍然把所有的日志打印到控制台。

    2、绑定

      在前面的例子中。我们已经用以下的代码创建了绑定。

    1 channel.queueBind(queueName, EXCHANGE_NAME, "");
    View Code

      绑定是指交换机(exchange)与队列(queue)之间的联系,也可以理解为,当某个队列和某个交换机进行了绑定,那么该队列对该交换机的消息感应器。

      绑定的时候,我们可以提供一个额外的参数routingKey,为了避免与basic_publish的参数混淆,我们可以把它叫做绑定键(binding key),以下是如何创建带绑定键的绑定的

    1 channel.queueBind(queueName, EXCHANGE_NAME, "black");
    View Code

      绑定键的意义取决于交换机的类型,我们之前使用过的扇型交换机(fanout exchanges)会忽略这个值。

    3、直接交换机

      上一篇写的日志系统广播所有的消费者(consumers),我们打算扩展它,基于日志的严重程度进行日志消息过滤,例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。

      上一篇中,我们使用的扇型交换机(fanout exchange)是没有足够的灵活性 —— 它能做的仅仅是广播。

      在这里我们要使用直接 交换机进行代替,它的算法很简单——交换机将对绑定键和路由键进行精确匹配,然后决定将消息发送到哪个队列。模型如下图所示。

                  

      从上图列子中可以看出,直接交换机x与两个队列Q1、Q2进行绑定,第一个队列使用了orange作为绑定键,第二个队列有两个绑定键,一个使用可black作为绑定键,另外一个使用了green作为绑定键。

      这样一来,我们以orange作为路由键将消息发布到交换机x,消息就会被路由到队列Q1,以black或者green作为路由键,消息将会路由到队列Q2。所有其他的消息将被丢弃。

    4、多个绑定

                        

      多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个X和Q1之间的绑定,使用black绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有black路由键的消息会同时发送到Q1和Q2。

    5、发送日志

      我们将在日志系统中使用这种模型,使用direct交换机代替fanout交换机。我们将用日志的级别作为路由键,这样接收日志的程序就可以根据严重级别来选择它想要处理的日志,现在我们来看看如何发送日志。

      我们先创建一个直接交换机:

    1 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    View Code

      然后发送一个消息:

    1 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    View Code

      其中severity的值为info、warning或者error其中的一个。

    6、订阅

      接收消息就像前面的教程差不多,有一个例外——我们将为我们感兴趣的日志级别分别创建一个绑定。代码如下:

    1 String queueName = channel.queueDeclare().getQueue();
    2 for(String severity : argv){
    3   channel.queueBind(queueName, EXCHANGE_NAME, severity);
    4 }
    View Code

    7、代码整合

                  

    发送消息的代码 EmitLogDirect.java 代码如下。

     1 package rabbitmq.direct;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 
     9 import rabbitmq.utils.ConnectionUtils;
    10 
    11 /*
    12  * 发送错误日志生产者
    13  */
    14 public class EmitLogDirect {
    15     private static final String EXCHANGE_NAME = "direct_logs";
    16     public static void main(String[] args) throws IOException, TimeoutException {
    17         //获取连接
    18         Connection connection = ConnectionUtils.getConnection();
    19         //创建管道
    20         Channel channel = connection.createChannel();
    21         //创建直接交换器
    22         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    23         //获取日志级别
    24         String severity = getSeverity(args);
    25         //获取消息
    26         String message = getMessage(args);
    27         //发送消息
    28         channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    29         System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    30         channel.close();
    31         connection.close();
    32     }
    33 }
    View Code

    接收消息ReceiveLogsDirect.java代码如下

     1 package rabbitmq.direct;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.AMQP;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.Consumer;
    10 import com.rabbitmq.client.DefaultConsumer;
    11 import com.rabbitmq.client.Envelope;
    12 
    13 import rabbitmq.utils.ConnectionUtils;
    14 
    15 public class ReceiveLogsDirect {
    16     private static final String EXCHANGE_NAME = "direct_logs";
    17 
    18     public static void main(String[] args) throws IOException, TimeoutException {
    19         // 获取连接
    20         Connection connection = ConnectionUtils.getConnection();
    21         // 创建管道
    22         Channel channel = connection.createChannel();
    23         // 创建直接交换器
    24         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    25         String queueName = channel.queueDeclare().getQueue();
    26         for (String severity : args) {
    27             channel.queueBind(queueName, EXCHANGE_NAME, severity);
    28         }
    29         Consumer consumer = new DefaultConsumer(channel) {
    30             @Override
    31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    32                     byte[] body) throws IOException {
    33                 String message = new String(body, "UTF-8");
    34                 System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
    35             }
    36         };
    37         channel.basicConsume(queueName, true, consumer);
    38     }
    39 }
    View Code

      

     

  • 相关阅读:
    【原创】编程题练习:反转字符串中的单词
    【最近的学习安排】
    【转载】判断两个链表是否相交、一个链表是否有环
    获取字符串字节长度
    如何找到GridView里的控件,建立GridViewRow对象
    Flex和.NET协同开发利器FluorineFx Flex与.NET互操作
    两款基于Visual Studio开发Flex的插件
    Mysql:向信号量添加给定计数将导致其超出它的最大计数错误
    Sql Server数据库触发器实例
    国外一些知名ASP.Net开源CMS系统
  • 原文地址:https://www.cnblogs.com/Hxinguan/p/9190476.html
Copyright © 2011-2022 走看看