zoukankan      html  css  js  c++  java
  • RabbitMQ 发布/订阅

      我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

          为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

          在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。本质上来说,就是发布的日志消息会转发给所有的接收者。

      1、转发器(Exchanges)

      RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

      相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

      下面列出一些可用的转发器类型:

      Direct

      Topic

      Headers

      Fanout

      目前我们关注最后一个fanout,声明转发器类型的代码:

    1 channel.exchangeDeclare("logs","fanout");

      fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

      2、匿名转发器(nameless exchange)

      前面说到生产者只能发送消息给转发器(Exchange),我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

    1 channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

      第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

      现在我们可以指定消息发送到的转发器:

    1 channel.basicPublish( "logs","", null, message.getBytes());

      3、临时队列(Temporary queues)

      前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
      不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
      第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
      第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
      Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。

    1 String queueName = channel.queueDeclare().getQueue();

      一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

      4、绑定(Bindings)

       

      我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
      channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

      5、完整的例子
      
      发送端:
     1 public class EmitLog  
     2 {  
     3     private final static String EXCHANGE_NAME = "ex_log";  
     4   
     5     public static void main(String[] args) throws IOException  
     6     {  
     7         // 创建连接和频道  
     8         ConnectionFactory factory = new ConnectionFactory();  
     9         factory.setHost("localhost");  
    10         Connection connection = factory.newConnection();  
    11         Channel channel = connection.createChannel();  
    12         // 声明转发器和类型  
    13         channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
    14           
    15         String message = new Date().toLocaleString()+" : log something";  
    16         // 往转发器上发送消息  
    17         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
    18   
    19         System.out.println(" [x] Sent '" + message + "'");  
    20   
    21         channel.close();  
    22         connection.close();  
    23   
    24     }  
    25   
    26 }  

      接收端1:

     1 public class ReceiveLogsToSave  
     2 {  
     3     private final static String EXCHANGE_NAME = "ex_log";  
     4   
     5     public static void main(String[] argv) throws java.io.IOException,  
     6             java.lang.InterruptedException  
     7     {  
     8         // 创建连接和频道  
     9         ConnectionFactory factory = new ConnectionFactory();  
    10         factory.setHost("localhost");  
    11         Connection connection = factory.newConnection();  
    12         Channel channel = connection.createChannel();  
    13   
    14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    15         // 创建一个非持久的、唯一的且自动删除的队列  
    16         String queueName = channel.queueDeclare().getQueue();  
    17         // 为转发器指定队列,设置binding  
    18         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    19   
    20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    21   
    22         QueueingConsumer consumer = new QueueingConsumer(channel);  
    23         // 指定接收者,第二个参数为自动应答,无需手动应答  
    24         channel.basicConsume(queueName, true, consumer);  
    25   
    26         while (true)  
    27         {  
    28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    29             String message = new String(delivery.getBody());  
    30   
    31             print2File(message);  
    32         }  
    33   
    34     }  
    35   
    36     private static void print2File(String msg)  
    37     {  
    38         try  
    39         {  
    40             String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();  
    41             String logFileName = new SimpleDateFormat("yyyy-MM-dd")  
    42                     .format(new Date());  
    43             File file = new File(dir, logFileName+".txt");  
    44             FileOutputStream fos = new FileOutputStream(file, true);  
    45             fos.write((msg + "
    ").getBytes());  
    46             fos.flush();  
    47             fos.close();  
    48         } catch (FileNotFoundException e)  
    49         {  
    50             e.printStackTrace();  
    51         } catch (IOException e)  
    52         {  
    53             e.printStackTrace();  
    54         }  
    55     }  
    56 }  

      随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

     1 public class ReceiveLogsToConsole  
     2 {  
     3     private final static String EXCHANGE_NAME = "ex_log";  
     4   
     5     public static void main(String[] argv) throws java.io.IOException,  
     6             java.lang.InterruptedException  
     7     {  
     8         // 创建连接和频道  
     9         ConnectionFactory factory = new ConnectionFactory();  
    10         factory.setHost("localhost");  
    11         Connection connection = factory.newConnection();  
    12         Channel channel = connection.createChannel();  
    13   
    14         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    15         // 创建一个非持久的、唯一的且自动删除的队列  
    16         String queueName = channel.queueDeclare().getQueue();  
    17         // 为转发器指定队列,设置binding  
    18         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    19   
    20         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    21   
    22         QueueingConsumer consumer = new QueueingConsumer(channel);  
    23         // 指定接收者,第二个参数为自动应答,无需手动应答  
    24         channel.basicConsume(queueName, true, consumer);  
    25   
    26         while (true)  
    27         {  
    28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    29             String message = new String(delivery.getBody());  
    30             System.out.println(" [x] Received '" + message + "'");  
    31   
    32         }  
    33   
    34     }  
    35   
    36 }  

      随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

        现在把两个接收端运行,然后运行3次发送端:

      输出结果:

      发送端:

     [x] Sent '2014-7-10 16:04:54 : log something'

     [x] Sent '2014-7-10 16:04:58 : log something'

     [x] Sent '2014-7-10 16:05:02 : log something'

      接收端1:

      接收端2:

     [*] Waiting for messages. To exit press CTRL+C
     [x] Received '2014-7-10 16:04:54 : log something'
     [x] Received '2014-7-10 16:04:58 : log something'
     [x] Received '2014-7-10 16:05:02 : log something'

      这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

      参考文档:http://blog.csdn.net/lmj623565791/article/details/37657225

  • 相关阅读:
    HDU 1009 FatMouse' Trade
    HDU 2602 (简单的01背包) Bone Collector
    LA 3902 Network
    HDU 4513 吉哥系列故事——完美队形II
    LA 4794 Sharing Chocolate
    POJ (Manacher) Palindrome
    HDU 3294 (Manacher) Girls' research
    HDU 3068 (Manacher) 最长回文
    Tyvj 1085 派对
    Tyvj 1030 乳草的入侵
  • 原文地址:https://www.cnblogs.com/lcngu/p/5932204.html
Copyright © 2011-2022 走看看