zoukankan      html  css  js  c++  java
  • 5、RabbitMQ-订阅模式 Publish/Subscribe

    http://www.rabbitmq.com/tutorials/tutorial-three-java.html

    1、模型图

    我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多
    个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型

    在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
    这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。
    举列:
    类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
    解读:
    1、1 个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

     2、代码实践

    生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class Send {
         private static final String  EXCHANGE_NAME="test_exchange_fanout";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //声明交换机
               channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
               
               //发送信息
               String msg = "hello";
               
               channel.basicPublish(EXCHANGE_NAME, "", null,  msg.getBytes());
               
               channel.close();
               conn.close();
         }
    }

     

    但是这个发送的消息到哪了呢? 
    消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
    能力.因为这时还没有队列,所以就会丢失;
    小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!

    消费者1

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    public class Receive {
         
         private static final String QUEUE_NAME="test_queue1";
         private static final String  EXCHANGE_NAME="test_exchange_fanout";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               
               Channel channel = conn.createChannel();
               
               //队列声明
               channel.queueDeclare(QUEUE_NAME, false, false, false,  null);
               
               //绑定队列到交换机转发器
               channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
               
                         //定义一个消费者
                         Consumer consumer = new  DefaultConsumer(channel){
                               //收到消息就会触发这个方法
                               @Override
                               public void handleDelivery(String  consumerTag, Envelope envelope, BasicProperties properties,  byte[] body)
                                         throws IOException {
                                    String msg = new  String(body,"utf-8");
                                    System.out.println("消费者1接收到的消息" + msg);
                                    
                                    try {
                                         Thread.sleep(1500);
                                    } catch (InterruptedException e)  {
                                         e.printStackTrace();
                                    }finally{
                                         System.out.println("消费者1处理完成!");
                                         //手动回执
                                         channel.basicAck(envelope.getDeliveryTag(), false);
                                    }
                                    
                               }
                         };
                         //监听队列
                         //自动应答false
                         boolean autoAck = false;
                         channel.basicConsume(QUEUE_NAME, autoAck,  consumer);
         }
    }

    消费者2

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive2 {
        
        private static final String QUEUE_NAME="test_queue";
        private static final String EXCHANGE_NAME="test_exchange_fanout";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            //绑定队列到交换机转发器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
            
                    
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者2接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者2处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                            
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    
    }

    一个消息 可以被多个消费者

     

     后台进行查看:

  • 相关阅读:
    JDBC 复习4 批量执行SQL
    JDBC 复习3 存取Oracle大数据 clob blob
    Oracle复习
    Linux命令(1)grep
    JDBC 复习2 存取mysql 大数据
    JDBC 复习1 DBUtil
    php 环境搭建问题
    Windows 批处理 bat 开启 WiFi 菜单选项 设置ID PWD
    Bat 批处理启动和停止Oracle 服务
    docker 学习1 WSL docker ,Windows docker
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531050.html
Copyright © 2011-2022 走看看