zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件(第三章)第三部分-笔记

    消费端限流

    • 假设一个场景,首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:
    • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
    • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行
    • 消费新的消息。
    • void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
    • prefetchSize 多少M, 0表示不限制
    • prefetchCount 一次最多处理多少条消息,一般配置为1,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息
    • ,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
    • global truefalse 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别还是consumer级别
    • prefetchSize和global这两项,rabbitmq没有实现,暂且不研究prefetch_count在no_ack=false的情况下生效,即在自动应答的情况下这两个值
    • 是不生效的。

      以下是实际代码操作

      

    package com.cx.temp.common.rabbitmq.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消费端限流-生产端
     */
    public class Producer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式:消息的确认模式
            channel.confirmSelect();
    
            String exchangeName = "test_qos_exchange";
            String routingKey = "qos.save";
    
            //5 发送消息
            //这里使用routingKey消费端可以正常收到消息,如果用routingKeyError,由于消费端声明的路由key是"return.#",所以该消息不可达,则会被addReturnListener监听到
            String msg = "Hello RabbitMQ send QOS message!";
    
            for (int i = 0; i < 5; i++) {
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.limit;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 自定义消费者
     */
    public class MyConsumer extends DefaultConsumer {
    
        private Channel channel;
    
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        /**
         *
         * @param consumerTag 消费标签
         * @param envelope 包含一些关键信息
         * @param properties 附带扩展数据
         * @param body 消息
         * @throws IOException
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("----------consume message----------");
            System.err.println("consumerTag:" + consumerTag);
            System.err.println("envelope:" + envelope);
            System.err.println("properties:" + properties);
            System.err.println("consumerTag:" + consumerTag);
            System.err.println("body:" + new String(body));
    
            //false表示不批量签收
    //        channel.basicAck(envelope.getDeliveryTag(), false);
    
        }
    }
    package com.cx.temp.common.rabbitmq.limit;
    
    import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消费端限流-消费端
     */
    public class Consumer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_qos_exchange";
            String routingKey = "qos.#";
            String queueName = "test_qos_queue";
    
            //声明交换机和队列然后进行绑定,最后指定路由KEY
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //1 限流方式 第一件事情就是autoAck 设置为false
            // 0表示无限大小,3表示一个消费端可以同时处理1条消息,false表示应用于consumer
            channel.basicQos(0, 1, false);
    
            channel.basicConsume(queueName, false, new MyConsumer(channel));
        }
    
    }

    先执行Producer,然后在执行Consumer,由于我们配置每次消费端只能处理一条消息,等待消费端ack确认后在处理下一条数据,这时候MyConsumer的ack确认注释掉,就导致消费端消费了一条消息后持续等待

    消费端ACK与重回队列

    消费端的手工ACK与NACK

    • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
    • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!

    消费端的重回队列

    • 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!
    • 一般我们在实际应用中,都会关闭重回队列,也就是设置为false

      以下是实际代码操作

    package com.cx.temp.common.rabbitmq.ack;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * ACK与重回队列-生产端
     */
    public class Producer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式:消息的确认模式
            channel.confirmSelect();
    
            String exchangeName = "test_ack_exchange";
            String routingKey = "ack.save";
    
            //5 发送消息
            for (int i = 0; i < 5; i++) {
    
                Map<String, Object> headers = new HashMap<>();
                headers.put("num", i);
    
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF-8")
                        .headers(headers)
                        .build();
    
                String msg = "Hello RabbitMQ send ACK message " + i; //这里加上i的打印,更好观察哪条数据没被消费成功
                channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
            }
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.ack;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 自定义消费者
     */
    public class MyConsumer extends DefaultConsumer {
    
        private Channel channel;
    
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        /**
         *
         * @param consumerTag 消费标签
         * @param envelope 包含一些关键信息
         * @param properties 附带扩展数据
         * @param body 消息
         * @throws IOException
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("----------consume message----------");
            System.err.println("body:" + new String(body));
    
            try {
                Thread.sleep(2000); //休眠2秒中,观察变化
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            if((Integer)properties.getHeaders().get("num") == 0) {
                //long deliveryTag, boolean multiple, boolean requeue (设置为true重回队列,消费失败的队列会重回队列的尾端继续消费)
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } else {
                //false表示不批量签收
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    }
    package com.cx.temp.common.rabbitmq.ack;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * ACK与重回队列-消费端
     */
    public class Consumer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_ack_exchange";
            String routingKey = "ack.#";
            String queueName = "test_ack_queue";
    
            //声明交换机和队列然后进行绑定,最后指定路由KEY
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 手工签收 必须要关闭autoACK = false
            channel.basicConsume(queueName, false, new MyConsumer(channel));
        }
    
    }

  • 相关阅读:
    轻松自动化---selenium-webdriver(python) (八)
    轻松自动化---selenium-webdriver(python) (七)
    轻松自动化---selenium-webdriver(python) (六)
    轻松自动化---selenium-webdriver(python) (五)
    轻松自动化---selenium-webdriver(python) (四)
    轻松自动化---selenium-webdriver(python) (三)
    轻松自动化---selenium-webdriver(python) (二)
    轻松自动化---selenium-webdriver(python) (一)
    容器在 Weave 中如何通信和隔离?- 每天5分钟玩转 Docker 容器技术(65)
    Weave 网络结构分析
  • 原文地址:https://www.cnblogs.com/huihui-hui/p/14332292.html
Copyright © 2011-2022 走看看