zoukankan      html  css  js  c++  java
  • 轻松搞定RabbitMQ(四)——发布/订阅

    转自 http://blog.csdn.net/xiaoxian8023/article/details/48729479

     翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

           在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。这次我们做一些完全不同的事儿——将消息发送给多个消费者。这种模式叫做“发布/订阅”。

           为了说明这个模式,我们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。

           如果在日志系统中每一个接受者(订阅者)都会的得到消息的拷贝。那样的话,我们可以运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另一个接受者(订阅者)程序,打印日志到屏幕上。

           说白了,发表日志消息将被广播给所有的接收者。


    Exchanges(转发器)

           前面的博文汇总,我们都是基于一个队列发送和接受消息。现在介绍一下完整的消息传递模式。

           RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。

           相反,生产者只能发送消息给转发器,转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

           

           类型有:Direct、Topic、Headers和Fanout。我们关注最后一个。现在让我们创建一个该类型的转发器,定义如下:

    [java] view plain copy
     
    1. channel.exchangeDeclare("logs", "fanout");  

           fanout转发器非常简单,从名字就可以看出,它是广播接受到的消息给所有的队列。而这正好符合日志系统的需求。

    Nameless exchange(匿名转发)

           之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码是:

    [java] view plain copy
     
    1. channel.basicPublish("", "hello", null, message.getBytes());  

           第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息通过队列的routingKey路由到指定的队列中去,如果存在的话。

           现在我们可以指定转发器的名字了:

    [java] view plain copy
     
    1. channel.basicPublish( "logs", "", null, message.getBytes());  


    Temporary queues(临时队列)

           你可能还记得之前我们用队列时,会指定一个名字。队列有名字对我们来说是非常重要的——我们需要为消费者指定同一个队列。

           但这并不是我们的日志系统所关心的。我们要监听所有日志消息,而不仅仅是一类日志。我们只对对当前流动的消息感兴趣。解决这些问题,我盟需要完成两件事。

           首先,每当我盟连接到RabbitMQ时,需要一个新的空队列。为此我们需要创建一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给我们。

           其次,一旦消费者断开连接,队列将自动删除。

    我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

    [java] view plain copy
     
    1. String queueName = channel.queueDeclare().getQueue();  

    queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。


    Bindings(绑定)

           

           我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列的我们叫它Binding。

    [java] view plain copy
     
    1. channel.queueBind(queueName, "logs", "");  

    这样,日志转发器将附加到日志队列上去。


    完整的例子:

    发送端代码(生产者)EmitLog.java

    [java] view plain copy
     
    1. public class EmitLog {  
    2.     private final static String EXCHANGE_NAME = "logs";  
    3.   
    4.     public static void main(String[] args) throws IOException {  
    5.         /** 
    6.          * 创建连接连接到MabbitMQ 
    7.          */  
    8.         ConnectionFactory factory = new ConnectionFactory();  
    9.         // 设置MabbitMQ所在主机ip或者主机名  
    10.         factory.setHost("127.0.0.1");  
    11.         // 创建一个连接  
    12.         Connection connection = factory.newConnection();  
    13.         // 创建一个频道  
    14.         Channel channel = connection.createChannel();  
    15.         // 指定转发——广播  
    16.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    17.   
    18.         for(int i=0;i<3;i++){  
    19.             // 发送的消息  
    20.             String message = "Hello World!";  
    21.             channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
    22.             System.out.println(" [x] Sent '" + message + "'");  
    23.         }  
    24.   
    25.         // 关闭频道和连接  
    26.         channel.close();  
    27.         connection.close();  
    28.     }  
    29. }  

    消费者1 ReceiveLogs2Console.java

    [java] view plain copy
     
    1. public class ReceiveLogs2Console {  
    2.     private static final String EXCHANGE_NAME = "logs";  
    3.   
    4.     public static void main(String[] argv) throws IOException, InterruptedException {  
    5.         ConnectionFactory factory = new ConnectionFactory();  
    6.         factory.setHost("127.0.0.1");  
    7.         // 打开连接和创建频道,与发送端一样  
    8.         Connection connection = factory.newConnection();  
    9.         final Channel channel = connection.createChannel();  
    10.   
    11.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    12.         // 声明一个随机队列  
    13.         String queueName = channel.queueDeclare().getQueue();  
    14.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    15.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    16.           
    17.         // 创建队列消费者  
    18.         final Consumer consumer = new DefaultConsumer(channel) {  
    19.               @Override  
    20.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
    21.                 String message = new String(body, "UTF-8");  
    22.                 System.out.println(" [x] Received '" + message + "'");  
    23.               }  
    24.             };  
    25.             channel.basicConsume(queueName, true, consumer);  
    26.     }  
    27. }  

    消费者2 ReceiveLogs2File.java

    [java] view plain copy
     
    1. public class ReceiveLogs2File {  
    2.     private static final String EXCHANGE_NAME = "logs";  
    3.   
    4.     public static void main(String[] argv) throws IOException, InterruptedException {  
    5.         ConnectionFactory factory = new ConnectionFactory();  
    6.         factory.setHost("127.0.0.1");  
    7.         // 打开连接和创建频道,与发送端一样  
    8.         Connection connection = factory.newConnection();  
    9.         final Channel channel = connection.createChannel();  
    10.   
    11.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    12.         // 声明一个随机队列  
    13.         String queueName = channel.queueDeclare().getQueue();  
    14.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    15.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    16.           
    17.         // 创建队列消费者  
    18.         final Consumer consumer = new DefaultConsumer(channel) {  
    19.               @Override  
    20.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
    21.                 String message = new String(body, "UTF-8");  
    22.                 print2File(message);  
    23. //              System.out.println(" [x] Received '" + message + "'");  
    24.               }  
    25.             };  
    26.             channel.basicConsume(queueName, true, consumer);  
    27.     }  
    28.       
    29.     private static void print2File(String msg) {  
    30.         try {  
    31.             String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();  
    32.             String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());  
    33.             File file = new File(dir, logFileName + ".log");  
    34.             FileOutputStream fos = new FileOutputStream(file, true);  
    35.             fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + " ").getBytes());  
    36.             fos.flush();  
    37.             fos.close();  
    38.         } catch (FileNotFoundException e) {  
    39.             e.printStackTrace();  
    40.         } catch (IOException e) {  
    41.             e.printStackTrace();  
    42.         }  
    43.     }    
    44. }  

           可以看到我们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。

           生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

           2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。


           注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

           看看最终的结果吧:

  • 相关阅读:
    linux中的等待队列
    MapReduce中的作业调度
    hdfs: 数据流(二)
    hdfs: 一个分布式文件系统(一)
    记住这一天
    Partitioning, Shuffle and sort
    从wordcount 开始 mapreduce (C++hadoop streaming模式)
    iOS9 请求出现App Transport Security has blocked a cleartext HTTP (http://)
    Xcode7 下iphone6、6s进行屏幕适配
    隐藏系统的uitabbar
  • 原文地址:https://www.cnblogs.com/Damon-Luo/p/7837659.html
Copyright © 2011-2022 走看看