zoukankan      html  css  js  c++  java
  • RabbitMQ的消费限流

    高并发场景下生产者产生大量的消息,对于巨量的消息瞬间推送消费端无法处理

    rabbitmq提供了服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。

    使用 basicqos方法。在消费端进行使用。 0 1 false
    prefetSize:0
    prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
    global: true是channel级别, false是消费者级别
    注意:我们要使用非自动ack

    消费者:

    package com.flying.rabbitmq.api.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            
            String exchangeName = "test_qos_exchange";
            String queueName = "test_qos_queue";
            String routingKey = "qos.#";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //1 限流方式  第一件事就是 autoAck设置为 false
            
            channel.basicQos(0, 1, false);
            
            channel.basicConsume(queueName, false, new MyConsumer(channel));
            
            
        }
    }

    自定义消费监听

    package com.flying.rabbitmq.api.limit;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    public class MyConsumer extends DefaultConsumer {
    
    
        private Channel channel ;
        
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
    
    
            // TODO 第二个参数设置为false,因为消费者设置的为prefetCount1
            //2手工签收
            channel.basicAck(envelope.getDeliveryTag(), false);
            
        }
    
    
    }

    生产者:

    package com.flying.rabbitmq.api.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_qos_exchange";
            String routingKey = "qos.save";
            
            String msg = "Hello RabbitMQ QOS Message";
            
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
            
        }
    }
  • 相关阅读:
    一张900w的数据表,16s执行的SQL优化到300ms?
    webpack学习收集
    集合对象的string类型字段进行排序
    react 项目中使用antd的select组件placeholder不生效的解决方法
    React Hook做页面跳转以及携带参数,并且获取携带的值
    eclipse jar包 Source not found
    细说Redis分布式锁🔒
    Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!
    HDFS基本命令
    斐波那契数(Java)
  • 原文地址:https://www.cnblogs.com/lflying/p/11107380.html
Copyright © 2011-2022 走看看