zoukankan      html  css  js  c++  java
  • RabbitMQ 五种工作模式

    官网介绍:https://www.rabbitmq.com/getstarted.html

    五种工作模式的主要特点

    1. 简单模式:一个生产者,一个消费者
    2. work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一(消费者彼此竞争成为接收者)。
    3. 订阅模式:一个生产者发送的消息会被多个消费者获取。
    4. 路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
    5. topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

    简单模式(一个生产者,一个消费者)

    这种模式下不需要将Exchange进行任何绑定(binding)操作

        public static final String QUEUE_NAME= "myqueue";
    
        public static void test() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            //设置Virtual Host
            factory.setVirtualHost("/ld");
            factory.setUsername("ld");
            factory.setPassword("aaa");
            //通过工厂获取连接
            Connection connection = factory.newConnection();
    
    
            //创建队列,发送消息
            public void producer () {
                //创建通道
                Channel channel = connection.createChannel();
                //声明创建队列
                /** 
                    队列名
                    是否持久化
                    是否排外  即只允许该channel访问该队列   一般等于true的话用于一个队列只能有一个消费者来消费的场景
                    是否自动删除(自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会 自动删除。)
                    其他属性
                */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //消息内容
                String message = "Hello World!";
                //发布消息
                /**
                    交换机
                    队列名
                    其他属性  路由
                    消息body
                */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                //关闭连接和通道
                channel.close();
                connection.close();
            }
            //消费者消费消息
            public void consumer () {
                //创建通道
                Channel channel = connection.createChannel();
                //声明通道
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //定义消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //监听队列
                //autoAck 是否自动确认消息,true自动确认
                channel.basicConsume(QUEUE_NAME, true, consumer);
                while (true) {
                    //这个方法会阻塞住,直到获取到消息
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    System.out.println("接收到消息:" + message);
                }
            }
        }
    

    work模式(一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一)

        public static final String QUEUE_NAME= "myqueue";
    
        //消息生产者
        public void producer{
            //获取连接和通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "";
            for (int i = 0; i < 100; i++) {
                message = "" + i;
                channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
                Thread.sleep(i);
            }
            channel.close();
            connection.close();
        }
    
    
        //消费者1
        public void consumer1{
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //同一时刻服务器只发送一条消息给消费端
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //false:手动确认
            channel.basicConsume(QUEUE_NAME, false, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("recive1:" + message);
                Thread.sleep(100);
                //消息消费完给服务器返回确认状态,表示该消息已被消费
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    

    channel.basicPublish

    channel.basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
    
    mandatory:
        true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
        那么会调用basic.return方法将消息返还给生产者。
    
        false:出现上述情形broker会直接将消息扔掉
    
    immediate:
        true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。
        当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    
    BasicProperties :
        需要注意的是BasicProperties.deliveryMode:
            0:不持久化 1:持久化 
        这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
    

    fanout订阅模式(一个生产者,多个队列,多个消费者)

    这种模式需要提前将Exchange与Queue进行绑定,
    一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

    ​ 一个生产者发送的消息会被多个消费者获取。
    
    生产者:可以将消息发送到队列或者是交换机。
    消费者:只能从队列中获取消息。
    
    如果消息发送到没有队列绑定的交换机上,那么消息将丢失。
    
        public static final String EXCHANGE_NAME = "exchange_fanout";
    
        //生产者
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    
            channel.close();
            connection.close();
        }
    
    
        //消费者1
        public final static String QUEUE_NAME = "queue_fanout_1";
        public void consumer1() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上
            channel.queueBind(QUEUE_NAME, Send.EXCHANGE_NAME, "");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    

    direct路由模式(完全匹配、单播的模式)

    1. 发送消息到交换机并且要指定路由key
    
    2. 消费者将队列绑定到交换机时需要指定路由key
    
    3. 完全匹配,只有匹配到的消费者才能消费消息
    
    4. 一个队列可以绑定多个路由
    
        public static final String EXCHANGE_NAME = "exchange_direct";
    
        //生产者
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); 
            channel.close();
            connection.close();
        }
    
        //消费者1
        public final static String QUEUE_NAME = "queue_direct_1";
    
        public void consumer1() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上,并制定路由键为"key"
            channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME, "key");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    

    topic通配符模式

    两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词
    
        //生产者
        public static final String EXCHANGE_NAME = "exchange_topic";
    
        public void producer() {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            //声明交换机 topic:交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
            System.out.println(message);
            channel.close();
            connection.close();
        }
    
        //消费者1
        public final static String QUEUE_NAME = "queue_topic_1";
    
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机上,并制定路由键匹配规则为"key.*"
            channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME, "key.*");
            channel.basicQos(1);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(message);
            }
        }
    
  • 相关阅读:
    剑指Offer 30 包含min函数的栈
    剑指Offer 29 顺时针打印矩阵
    剑指Offer 27 二叉树的镜像
    13张动图助你彻底看懂马尔科夫链、PCA和条件概率!
    一位ML工程师构建深度神经网络的实用技巧
    IBM沃森会成为第一个被抛弃的AI技术吗?
    中国最强AI超级服务器问世,每秒提供AI计算2000万亿次
    SAP WM 有无保存WM Level历史库存的Table?
    SAP MM 按采购订单查询付款信息的报表?
    机器学习项目失败的9个原因
  • 原文地址:https://www.cnblogs.com/loveer/p/11423173.html
Copyright © 2011-2022 走看看