zoukankan      html  css  js  c++  java
  • RabbitMQ (三) 发布/订阅 -摘自网络

    这篇博客中,我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

             为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

             在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。

             本质上来说,就是发布的日志消息会转发给所有的接收者。

    1、转发器(Exchanges)

    前面的博客中我们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面我们会引入Exchanges,展示RabbitMQ的完整的消息模型。

    RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

    相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

     

    下面列出一些可用的转发器类型:

    Direct

    Topic

    Headers

    Fanout

    目前我们关注最后一个fanout,声明转发器类型的代码:

    channel.exchangeDeclare("logs","fanout");

    fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

    2、匿名转发器(nameless exchange)

    前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

    channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

    现在我们可以指定消息发送到的转发器:

    channel.basicPublish( "logs","", null, message.getBytes());

    3、临时队列(Temporary queues)

    前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。 不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事: 第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。 第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。 Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。 String queueName = channel.queueDeclare().getQueue(); 一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

    4、绑定(Bindings)
    我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。 channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称
    5、完整的例子
    日志发送端:
    1. package com.zhy.rabbit._03_bindings_exchanges;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.Date;  
    5.   
    6. import com.rabbitmq.client.Channel;  
    7. import com.rabbitmq.client.Connection;  
    8. import com.rabbitmq.client.ConnectionFactory;  
    9.   
    10. public class EmitLog  
    11. {  
    12.     private final static String EXCHANGE_NAME = "ex_log";  
    13.   
    14.     public static void main(String[] args) throws IOException  
    15.     {  
    16.         // 创建连接和频道  
    17.         ConnectionFactory factory = new ConnectionFactory();  
    18.         factory.setHost("localhost");  
    19.         Connection connection = factory.newConnection();  
    20.         Channel channel = connection.createChannel();  
    21.         // 声明转发器和类型  
    22.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
    23.           
    24.         String message = new Date().toLocaleString()+" : log something";  
    25.         // 往转发器上发送消息  
    26.         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());  
    27.   
    28.         System.out.println(" [x] Sent '" + message + "'");  
    29.   
    30.         channel.close();  
    31.         connection.close();  
    32.   
    33.     }  
    34.   
    35. }  
    package com.zhy.rabbit._03_bindings_exchanges;
    
    import java.io.IOException;
    import java.util.Date;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog
    {
    	private final static String EXCHANGE_NAME = "ex_log";
    
    	public static void main(String[] args) throws IOException
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    		// 声明转发器和类型
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
    		
    		String message = new Date().toLocaleString()+" : log something";
    		// 往转发器上发送消息
    		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    
    		System.out.println(" [x] Sent '" + message + "'");
    
    		channel.close();
    		connection.close();
    
    	}
    
    }
    
    没什么太大的改变,声明队列的代码,改为声明转发器了,同样的消息的传递也交给了转发器。
    接收端1 :ReceiveLogsToSave.java:
    1. package com.zhy.rabbit._03_bindings_exchanges;  
    2.   
    3. import java.io.File;  
    4. import java.io.FileNotFoundException;  
    5. import java.io.FileOutputStream;  
    6. import java.io.IOException;  
    7. import java.text.SimpleDateFormat;  
    8. import java.util.Date;  
    9.   
    10. import com.rabbitmq.client.Channel;  
    11. import com.rabbitmq.client.Connection;  
    12. import com.rabbitmq.client.ConnectionFactory;  
    13. import com.rabbitmq.client.QueueingConsumer;  
    14.   
    15. public class ReceiveLogsToSave  
    16. {  
    17.     private final static String EXCHANGE_NAME = "ex_log";  
    18.   
    19.     public static void main(String[] argv) throws java.io.IOException,  
    20.             java.lang.InterruptedException  
    21.     {  
    22.         // 创建连接和频道  
    23.         ConnectionFactory factory = new ConnectionFactory();  
    24.         factory.setHost("localhost");  
    25.         Connection connection = factory.newConnection();  
    26.         Channel channel = connection.createChannel();  
    27.   
    28.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    29.         // 创建一个非持久的、唯一的且自动删除的队列  
    30.         String queueName = channel.queueDeclare().getQueue();  
    31.         // 为转发器指定队列,设置binding  
    32.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    33.   
    34.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    35.   
    36.         QueueingConsumer consumer = new QueueingConsumer(channel);  
    37.         // 指定接收者,第二个参数为自动应答,无需手动应答  
    38.         channel.basicConsume(queueName, true, consumer);  
    39.   
    40.         while (true)  
    41.         {  
    42.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    43.             String message = new String(delivery.getBody());  
    44.   
    45.             print2File(message);  
    46.         }  
    47.   
    48.     }  
    49.   
    50.     private static void print2File(String msg)  
    51.     {  
    52.         try  
    53.         {  
    54.             String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();  
    55.             String logFileName = new SimpleDateFormat("yyyy-MM-dd")  
    56.                     .format(new Date());  
    57.             File file = new File(dir, logFileName+".txt");  
    58.             FileOutputStream fos = new FileOutputStream(file, true);  
    59.             fos.write((msg + " ").getBytes());  
    60.             fos.flush();  
    61.             fos.close();  
    62.         } catch (FileNotFoundException e)  
    63.         {  
    64.             e.printStackTrace();  
    65.         } catch (IOException e)  
    66.         {  
    67.             e.printStackTrace();  
    68.         }  
    69.     }  
    70. }  
    package com.zhy.rabbit._03_bindings_exchanges;
    
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogsToSave
    {
    	private final static String EXCHANGE_NAME = "ex_log";
    
    	public static void main(String[] argv) throws java.io.IOException,
    			java.lang.InterruptedException
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		// 创建一个非持久的、唯一的且自动删除的队列
    		String queueName = channel.queueDeclare().getQueue();
    		// 为转发器指定队列,设置binding
    		channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		// 指定接收者,第二个参数为自动应答,无需手动应答
    		channel.basicConsume(queueName, true, consumer);
    
    		while (true)
    		{
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    			String message = new String(delivery.getBody());
    
    			print2File(message);
    		}
    
    	}
    
    	private static void print2File(String msg)
    	{
    		try
    		{
    			String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
    			String logFileName = new SimpleDateFormat("yyyy-MM-dd")
    					.format(new Date());
    			File file = new File(dir, logFileName+".txt");
    			FileOutputStream fos = new FileOutputStream(file, true);
    			fos.write((msg + "
    ").getBytes());
    			fos.flush();
    			fos.close();
    		} catch (FileNotFoundException e)
    		{
    			e.printStackTrace();
    		} catch (IOException e)
    		{
    			e.printStackTrace();
    		}
    	}
    }
    

    随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

    接收端2:ReceiveLogsToConsole.java

    1. package com.zhy.rabbit._03_bindings_exchanges;  
    2.   
    3. import com.rabbitmq.client.Channel;  
    4. import com.rabbitmq.client.Connection;  
    5. import com.rabbitmq.client.ConnectionFactory;  
    6. import com.rabbitmq.client.QueueingConsumer;  
    7.   
    8. public class ReceiveLogsToConsole  
    9. {  
    10.     private final static String EXCHANGE_NAME = "ex_log";  
    11.   
    12.     public static void main(String[] argv) throws java.io.IOException,  
    13.             java.lang.InterruptedException  
    14.     {  
    15.         // 创建连接和频道  
    16.         ConnectionFactory factory = new ConnectionFactory();  
    17.         factory.setHost("localhost");  
    18.         Connection connection = factory.newConnection();  
    19.         Channel channel = connection.createChannel();  
    20.   
    21.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
    22.         // 创建一个非持久的、唯一的且自动删除的队列  
    23.         String queueName = channel.queueDeclare().getQueue();  
    24.         // 为转发器指定队列,设置binding  
    25.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
    26.   
    27.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
    28.   
    29.         QueueingConsumer consumer = new QueueingConsumer(channel);  
    30.         // 指定接收者,第二个参数为自动应答,无需手动应答  
    31.         channel.basicConsume(queueName, true, consumer);  
    32.   
    33.         while (true)  
    34.         {  
    35.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
    36.             String message = new String(delivery.getBody());  
    37.             System.out.println(" [x] Received '" + message + "'");  
    38.   
    39.         }  
    40.   
    41.     }  
    42.   
    43. }  
    package com.zhy.rabbit._03_bindings_exchanges;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class ReceiveLogsToConsole
    {
    	private final static String EXCHANGE_NAME = "ex_log";
    
    	public static void main(String[] argv) throws java.io.IOException,
    			java.lang.InterruptedException
    	{
    		// 创建连接和频道
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		Connection connection = factory.newConnection();
    		Channel channel = connection.createChannel();
    
    		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    		// 创建一个非持久的、唯一的且自动删除的队列
    		String queueName = channel.queueDeclare().getQueue();
    		// 为转发器指定队列,设置binding
    		channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    		QueueingConsumer consumer = new QueueingConsumer(channel);
    		// 指定接收者,第二个参数为自动应答,无需手动应答
    		channel.basicConsume(queueName, true, consumer);
    
    		while (true)
    		{
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    			String message = new String(delivery.getBody());
    			System.out.println(" [x] Received '" + message + "'");
    
    		}
    
    	}
    
    }
    

    随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

    现在把两个接收端运行,然后运行3次发送端:

    输出结果:

    发送端:

     [x] Sent '2014-7-10 16:04:54 : log something'

     [x] Sent '2014-7-10 16:04:58 : log something'

     [x] Sent '2014-7-10 16:05:02 : log something'

    接收端1:

    接收端2:

     [*] Waiting for messages. To exit press CTRL+C  [x] Received '2014-7-10 16:04:54 : log something'  [x] Received '2014-7-10 16:04:58 : log something'  [x] Received '2014-7-10 16:05:02 : log something'

    这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

    本篇说明了,生产者将消息发送至转发器,转发器决定将消息发送至哪些队列,消费者绑定队列获取消息。

  • 相关阅读:
    CSS样式表引用方式
    引入样式表(css)的四种方式
    html中有序列表标签ol,li的高级应用
    HTML 基本标签
    SEO中HTML标签权重列表
    HTML 和 XHTML 区别
    HTML相对路径和绝对路径
    Django静态博客开发_3_视图与模版(完成一个简单博客的建立)
    Django静态博客开发_2_模型层
    Django静态博客开发_1_入门
  • 原文地址:https://www.cnblogs.com/haoliansheng/p/4401056.html
Copyright © 2011-2022 走看看