zoukankan      html  css  js  c++  java
  • 轻松搞定RabbitMQ(六)——主题

    转自 http://blog.csdn.net/xiaoxian8023/article/details/48806871

       翻译地址:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

           在上一篇博文中,我们进一步改良了日志系统。使用Direct类型的转换器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解,请阅读:轻松搞定RabbitMQ(四)——发布/订阅

           虽然使用Direct类型的转换器改进了日志系统。但它仍然有一定的局限性——不能根据多重条件进行路由选择。

           在我们的日志系统中,我们可能不仅仅根据日志严重性订阅日志,也想根据发送源订阅。你可能从unix工具syslog了解过这个概念,它可以根据严重性(info/warning/crit…)和设备(auth/cron/kern…)转发日志。

           这将给我们更多的灵活性——我们可以订阅来自元‘cron’的致命错误日志,同时也可以订阅‘kern’的所有日志。

           为了在我们的日志系统中实现上述需求,我们需要了解更复杂的主题类型的转发器——Topic Exchange。


    Topic exchange(主题转发器)

           发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

           绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

           举一个简单例子,如下图:


           在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

           我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

           路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

           如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

           另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

           注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。


    完整实例

           我们将主题类型的转发器应用到日志系统中,路由格式为:“<设备>.<严重级别>”。

    发送端(EmitLogTopic.java)

    [java] view plain copy
     
    1. public class EmitLogDirect {  
    2.     private final static String EXCHANGE_NAME = "topic_logs";  
    3.   
    4.     public static void main(String[] args) throws IOException {  
    5.         /** 
    6.          * 创建连接连接到MabbitMQ 
    7.          */  
    8.         ConnectionFactory factory = new ConnectionFactory();  
    9.         // 设置MabbitMQ所在主机ip或者主机名  
    10.         factory.setHost("127.0.0.1");  
    11.         // 创建一个连接  
    12.         Connection connection = factory.newConnection();  
    13.         // 创建一个频道  
    14.         Channel channel = connection.createChannel();  
    15.         // 指定转发——广播  
    16.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
    17.   
    18.         //所有设备和日志级别  
    19.         String[] facilities ={"auth","cron","kern","auth.A"};  
    20.         String[] severities={"error","info","warning"};  
    21.           
    22.         for(int i=0;i<4;i++){  
    23.             for(int j=0;j<3;j++){  
    24.             //每一个设备,每种日志级别发送一条日志消息  
    25.             String routingKey = facilities[i]+"."+severities[j%3];  
    26.               
    27.             // 发送的消息  
    28.             String message =" Hello World!"+Strings.repeat(".", i+1);  
    29.             //参数1:exchange name  
    30.             //参数2:routing key  
    31.             channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());  
    32.             System.out.println(" [x] Sent [" + routingKey +"] : '"+ message + "'");  
    33.             }  
    34.         }  
    35.         // 关闭频道和连接  
    36.         channel.close();  
    37.         connection.close();  
    38.     }  
    39. }  

    消费者1(ReceiveLogs2Console.java)

    [java] view plain copy
     
    1. public class ReceiveLogs2Console {  
    2.     private static final String EXCHANGE_NAME = "topic_logs";  
    3.   
    4.     public static void main(String[] argv) throws IOException, InterruptedException {  
    5.         ConnectionFactory factory = new ConnectionFactory();  
    6.         factory.setHost("127.0.0.1");  
    7.         // 打开连接和创建频道,与发送端一样  
    8.         Connection connection = factory.newConnection();  
    9.         final Channel channel = connection.createChannel();  
    10.   
    11.         //声明exchange  
    12.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
    13.         // 声明一个随机队列  
    14.         String queueName = channel.queueDeclare().getQueue();  
    15.   
    16.         String[] routingKeys ={"auth.*","*.info","#.warning"};//关注所有的授权日志、所有info和waring级别的日志  
    17.         for (String routingKey : routingKeys) {  
    18.             //关注所有级别的日志(多重绑定)  
    19.             channel.queueBind(queueName, EXCHANGE_NAME, routingKey);  
    20.         }  
    21.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    22.           
    23.         // 创建队列消费者  
    24.         final Consumer consumer = new DefaultConsumer(channel) {  
    25.               @Override  
    26.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
    27.                 String message = new String(body, "UTF-8");  
    28.                 System.out.println(" [x] Received ["  + envelope.getRoutingKey() + "] :'" + message + "'");  
    29.               }  
    30.             };  
    31.             channel.basicConsume(queueName, true, consumer);  
    32.     }  
    33. }  

    消费者2(ReceiveLogs2File.java)

    [java] view plain copy
     
    1. public class ReceiveLogs2File {  
    2.     private static final String EXCHANGE_NAME = "topic_logs";  
    3.   
    4.     public static void main(String[] argv) throws IOException, InterruptedException {  
    5.         ConnectionFactory factory = new ConnectionFactory();  
    6.         factory.setHost("127.0.0.1");  
    7.         // 打开连接和创建频道,与发送端一样  
    8.         Connection connection = factory.newConnection();  
    9.         final Channel channel = connection.createChannel();  
    10.   
    11.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
    12.         // 声明一个随机队列  
    13.         String queueName = channel.queueDeclare().getQueue();  
    14.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    15.   
    16.         String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。  
    17.         channel.queueBind(queueName, EXCHANGE_NAME, severity);  
    18.           
    19.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    20.           
    21.         // 创建队列消费者  
    22.         final Consumer consumer = new DefaultConsumer(channel) {  
    23.               @Override  
    24.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
    25.                 String message = new String(body, "UTF-8");  
    26.                 //记录日志到文件:  
    27.                 print2File( "["+ envelope.getRoutingKey() + "] "+message);  
    28.               }  
    29.             };  
    30.             channel.basicConsume(queueName, true, consumer);  
    31.     }  
    32.       
    33.     private static void print2File(String msg) {  
    34.         try {  
    35.             String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();  
    36.             String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());  
    37.             File file = new File(dir, logFileName + ".log");  
    38.             FileOutputStream fos = new FileOutputStream(file, true);  
    39.             fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + " ").getBytes());  
    40.             fos.flush();  
    41.             fos.close();  
    42.         } catch (FileNotFoundException e) {  
    43.             e.printStackTrace();  
    44.         } catch (IOException e) {  
    45.             e.printStackTrace();  
    46.         }  
    47.     }    
    48. }  

           最终结果:

  • 相关阅读:
    万能清除
    CSS追加笔记
    函数追加笔记
    jQuery实现放大镜效果
    jQuery获取元素的方法
    JavaScript
    jQuery三级联动
    DOM追加笔记
    数据结构与算法之美01
    RPC架构简介与原理
  • 原文地址:https://www.cnblogs.com/Damon-Luo/p/7838120.html
Copyright © 2011-2022 走看看