zoukankan      html  css  js  c++  java
  • 5、RabbitMQ-订阅模式 Publish/Subscribe

    http://www.rabbitmq.com/tutorials/tutorial-three-java.html

    1、模型图

    我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多
    个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

    在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
    这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。
    举列:
    类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
    解读:
    1、1 个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

     2、代码实践

    生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class Send {
         private static final String  EXCHANGE_NAME="test_exchange_fanout";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //声明交换机
               channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
               
               //发送信息
               String msg = "hello";
               
               channel.basicPublish(EXCHANGE_NAME, "", null,  msg.getBytes());
               
               channel.close();
               conn.close();
         }
    }

     

    但是这个发送的消息到哪了呢? 
    消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
    能力.因为这时还没有队列,所以就会丢失;
    小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!

    消费者1

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    public class Receive {
         
         private static final String QUEUE_NAME="test_queue1";
         private static final String  EXCHANGE_NAME="test_exchange_fanout";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //队列声明
               channel.queueDeclare(QUEUE_NAME, false, false, false,  null);
               
               //绑定队列到交换机转发器
               channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
               
                         //定义一个消费者
                         Consumer consumer = new  DefaultConsumer(channel){
                               //收到消息就会触发这个方法
                               @Override
                               public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                         throws IOException {
                                    String msg = new  String(body,"utf-8");
                                    System.out.println("消费者1接收到的消息" + msg);
                                    
                                    try {
                                         Thread.sleep(1500);
                                    } catch (InterruptedException e)  {
                                         e.printStackTrace();
                                    }finally{
                                         System.out.println("消费者1处理完成!");
                                         //手动回执
                                         channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                                    
                               }
                         };
                         //监听队列
                         //自动应答false
                         boolean autoAck = false;
                         channel.basicConsume(QUEUE_NAME, autoAck,  consumer);
         }
    }

    消费者2

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive2 {
        
        private static final String QUEUE_NAME="test_queue";
        private static final String EXCHANGE_NAME="test_exchange_fanout";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            //绑定队列到交换机转发器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
            
                    
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者2接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者2处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                            
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    
    }

    一个消息 可以被多个消费者

     

     后台进行查看:

  • 相关阅读:
    Google pagespeed优化首页加载速度详解
    juqery 学习之三 选择器<子元素><表单>
    juqery 学习之五 文档处理<包裹、替换、删除、复制>
    juqery 学习之四 筛选<过滤>
    关于鼠标点击其他地方隐藏层的实例(要引入jquery包哦)
    JAVASCRIPT 对象 用法(offset screen scroll client)
    [分享]NopCommerce支付插件开发步骤(页 1) 扩展开发
    juqery 学习之五 文档处理<插入>
    juqery 学习之六 CSS<css、位置、宽高>
    th scope="row"和th scope="col"什么意思??
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531050.html
Copyright © 2011-2022 走看看