zoukankan      html  css  js  c++  java
  • 【RabbitMQ消息中间件】7.订阅模式

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
    本文链接:https://blog.csdn.net/u013517797/article/details/79506575上一篇我们了解了RabbitMQ的消息的确认模式,本篇我们继续讲解RabbitMQ的五大队列模式之一的“订阅模式”。在实际开发中,通常会遇到以下需一个生产者,多个消费者,同一个消息被多个消费者获取。

    对于“订阅模式”,其模式图如下所示:

    上图中,“P”为消息的生产者,而“X”是交换机(Exchange),即消息生产者将消息发送到“交换机”而不再是“队列”,然后需要承载成产者消息的队列,与该交换机进行绑定。余下就是消费者监听相关队列来获取服务端推送的消息,满足多个消费者共同获取同一个生产者的所有消息的情况。
    当消费者不需要从生产者获取消息了,将相关消息队列与交换机的绑定关系解除即可。

    所以上面的“订阅模式”具有以下特点:
    (1)一个生产者,多个消费者
    (2)每个消费者都有自己的一个队列
    (3)生产者没有将消息直接发送至队列,而是发送到了交换机
    (4)每个队列都要绑定到交换机
    (5)生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

    在“订阅模式”中,多了一个我们前面没有提到的“交换机”。具有交换机的消息传输模式如下:

    可以看到,作为交换机,当消息从生产者传递至交换机(Exchange)时,交换机会将消息复制,按照绑定规则(Bindings),分别将消息推送至绑定在该交换机(Exchange)上的队列(Queues)中。最终队列拿到交换机传递的消息后,消费者就可以通过消息队列获取生产推送的消息了,而且保证了每个连接不同消息队列的消费者拿到的数据是相同的。
    要注意,交换机的作用仅仅是用于信息交换,它本身是不具有消息存储的能力的。

    下面我们编写一下符合“订阅模式”的实例代码。还在之前创建的测试工程“RabbitMQ_Test”下新建一个生产者类:
     
    package cn.jack.rabbitmq.ps;
    import cn.jack.rabbitmq.connection.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
     
    public class Send {
    	
        private final static String EXCHANGE_NAME="test_exchange_fanout";
        
        public static void main(String[] args) throws Exception {
    		//获取到连接以及mq通道
        	Connection connection = ConnectionUtil.getConnection();
        	//从连接中创建通道
        	Channel channel = connection.createChannel();
        	
        	//声明Exchange
        	channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        	
        	//消息内容
        	String message = "Hello World!";
        	channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes());
        	System.out.println("[product] Send '"+ message +"'");
        	
        	//关闭通道和连接
        	channel.close();
        	connection.close();
    	}
    }
    

      

    与之前不同的是,之前的生产者是声明(创建)队列:
     
    //声明(创建)队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    而该生产者不再声明队列,即不再与队列直接连接,而是换为声明交换机:
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    

      

    我们这里先不编写消费者,直接运行一下生产者:

    在RabbitMQ的管理工具的“Exchange”模块下,我们可以看到刚刚创建的交换机:


    此时还没有队列绑定到交换机,那么此时消息到了哪里了呢?可以这么说,消息“丢失”了。因为当我们尝试将信息发送到一个没有绑定队列的交换机时,由于交换机无法及时将信息推送至消息队列,而交换机本身没有存储信息的能力,则会导致信息丢失。所以这里我们要注意。

    下面我们创建第一个消费者类:
     

    package cn.jack.rabbitmq.ps;
    import cn.jack.rabbitmq.connection.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
     
    public class Recv1 {
     
        private final static String QUEUE_NAME = "test_queue_ps_1";//队列名称
        
        private final static String EXCHANGE_NAME="test_exchange_fanout";//交换机名称
     
        public static void main(String[] argv) throws Exception {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
     
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
     
            // 定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            // 获取消息
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [consumer1] Received '" + message + "'");
                //休眠10ms
                Thread.sleep(10);
                // 返回确认状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

      

    这里我们为消费者声明了一个队列:
     

    // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);

      

    然后将该队列绑定到之前生产者绑定的交换机上:
     
    //绑定队列到交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    

      

    这样就可以获取生产者通过交换机推送的信息了。
    运行一下消费者,再运行一下生产者,可以发现消费者成功获取了信息:

    再创建一个Recv2的消费者类,与Recv1代码相同,除了定义的队列名称不同即可:
    private final static String QUEUE_NAME = "test_queue_ps_2";//队列名称
    获取消息后的控制台打印记得更换名称为“consumer2”:
    System.out.println(" [consumer2] Received '" + message + "'");
    此时我们同时启动生产者,以及两个消费者,可以想看到两个消费者同时获取了生产者发送的全部信息:


    最后,查看交换机与队列的绑定关系,可以点击“Exchange”模块下相关交换机的名称:

    然后点击下面的“Bindings”就可以看到交换机绑定的队列:

    可以看到目前绑定了名为“test_queue_ps_1”和“test_queue_ps_2”的队列。在下面的编辑区域还可以进行手动添加绑定队列。
    转载请注明出:http://blog.csdn.net/acmman/article/details/79506575
  • 相关阅读:
    LINQ中selectManay操作符(五)
    LINQ中select操作符(四)
    高效并发进阶-白银
    JVM回收算法
    一个类是怎么被JVM执行的
    一纸理解JVM
    单例模式
    深入理解Spring AOP思想
    深入理解Spring IOC工作原理
    HashMap扩容全过程
  • 原文地址:https://www.cnblogs.com/laosunlaiye/p/11671459.html
Copyright © 2011-2022 走看看