zoukankan      html  css  js  c++  java
  • RabbitMQ 之 订阅模式 Publish/Subscribe

    模型图

    我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

    在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。

    举列:    

    类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)

    那么咱们来看一下图,我们学过前两种有一些不一样,work 模式 是不是同一个队列 多个消费者,而 ps 这种模式呢,是一个队列对应一个消费者,pb 模式还多了一个 X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息

    解读:
    1、1 个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

    生产者

     1 package cn.wh.simple;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 
     7 import cn.wh.util.RabbitMqConnectionUtil;
     8 import com.rabbitmq.client.Channel;
     9 import com.rabbitmq.client.Connection;
    10 
    11 public class Send {
    12 
    13     private static final String  EXCHANGE_NAME="test_exchange_fanout";
    14     public static void main(String[] args) throws IOException, TimeoutException {
    15         
    16         Connection connection = RabbitMqConnectionUtil.getConnection();
    17         
    18         Channel channel = connection.createChannel();
    19         
    20         //声明交换机
    21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发
    22         
    23         //发送消息
    24         String msg="hello ps";
    25         
    26         channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
    27         
    28         System.out.println("Send :"+msg);
    29         
    30         channel.close();
    31         connection.close();
    32     }
    33 }

    消费者1

     1 package cn.wh.simple;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import cn.wh.util.RabbitMqConnectionUtil;
     7 
     8 import com.rabbitmq.client.Channel;
     9 import com.rabbitmq.client.Connection;
    10 import com.rabbitmq.client.Consumer;
    11 import com.rabbitmq.client.DefaultConsumer;
    12 import com.rabbitmq.client.Envelope;
    13 import com.rabbitmq.client.AMQP.BasicProperties;
    14 
    15 public class Recv1 {
    16     
    17     private static final String QUEUE_NAME="test_queue_fanout_email";
    18     private static final String  EXCHANGE_NAME="test_exchange_fanout";
    19     public static void main(String[] args) throws IOException, TimeoutException {
    20         Connection connection = RabbitMqConnectionUtil.getConnection();
    21         final Channel channel = connection.createChannel();
    22         
    23         //队列声明
    24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25         
    26         //绑定队列到交换机 转发器
    27         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    28         
    29         
    30         channel.basicQos(1);//保证一次只分发一个  
    31         
    32         //定义一个消费者
    33         Consumer consumer=new DefaultConsumer(channel){
    34             //消息到达 触发这个方法
    35             @Override
    36             public void handleDelivery(String consumerTag, Envelope envelope,
    37                     BasicProperties properties, byte[] body) throws IOException {
    38              
    39                 String msg=new String(body,"utf-8");
    40                 System.out.println("[1] Recv msg:"+msg);
    41                 
    42                 try {
    43                     Thread.sleep(2000);
    44                 } catch (InterruptedException e) {
    45                     e.printStackTrace();
    46                 }finally{
    47                     System.out.println("[1] done ");
    48                     channel.basicAck(envelope.getDeliveryTag(), false);
    49                 }
    50             }
    51         };
    52         
    53         boolean autoAck=false;//自动应答 false
    54         channel.basicConsume(QUEUE_NAME,autoAck , consumer);
    55     }
    56 }

    消费者2

    package cn.wh.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    import cn.wh.util.RabbitMqConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Recv2 {
        
        private static final String QUEUE_NAME="test_queue_fanout_sms";
        private static final String  EXCHANGE_NAME="test_exchange_fanout";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitMqConnectionUtil.getConnection();
            final Channel channel = connection.createChannel();
    
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //绑定队列到交换机 转发器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            channel.basicQos(1);//保证一次只分发一个
    
            //定义一个消费者
            Consumer consumer=new DefaultConsumer(channel){
                //消息到达 触发这个方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        BasicProperties properties, byte[] body) throws IOException {
    
                    String msg=new String(body,"utf-8");
                    System.out.println("[2] Recv msg:"+msg);
    
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally{
                        System.out.println("[2] done ");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck=false;//自动应答 false
            channel.basicConsume(QUEUE_NAME,autoAck , consumer);
        }
    }

     测试

    一个消息 可以被多个消费者

  • 相关阅读:
    runloop源代码
    runloop的source
    How an Event Enters a Cocoa Application
    RunLoop主要处理以下6类事件
    NSRunloop总结
    performSelector与objc_msgSend
    iOSUI显示思想
    NSPort与NSRunloop的关系是流与消息调度的关系
    Core Animation 负责将bitmap绑定提交到 GPU-[CALayer _display]
    iOS构建流畅的交互界面--CPU,GPU资源消耗的原因和解决方案
  • 原文地址:https://www.cnblogs.com/wh1520577322/p/10065914.html
Copyright © 2011-2022 走看看