zoukankan      html  css  js  c++  java
  • RabbitMQ消费端限流策略

    消息限流处理

    如果 RabbitMQ 一次性将所有消息都发送给消费端,有很大几率会导致消费端崩掉,所以需要进行限流操作。让 RabbitMQ 每次最多发送指定数量的消息,一般情况下都设置数量为1。

    通过调用 channel.basicQos(0, 1, false); 方法实现限流

    实例

    public class Produce {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.创建连接工厂并进行配置相关信息
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            // 2.通过连接工厂获取一个连接对象
            Connection connection = connectionFactory.newConnection();
    
            // 3.通过连接对象获取数据通信信道对象
            Channel channel = connection.createChannel();
    
            // 4.循环发送消息
            String exchange = "test_qos_exchange";
            String routingKey = "qos.save";
            for (int i = 0; i < 10; i++) {
                String msg = "Hello RabbitMQ! ";
                msg += i;
                channel.basicPublish(exchange, routingKey, null, msg.getBytes());
            }
    
            // 5.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
        }
    }
    

    消费端:

    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            // 1.创建连接工厂并进行配置相关信息
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("111.231.83.100");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
    
            // 2.通过连接工厂获取一个连接对象
            Connection connection = connectionFactory.newConnection();
    
            // 3.通过连接对象创建一个通信信道对象
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_qos_exchange";
            String queueName = "test_qos_queue";
            String routingKey = "qos.#";
    
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 限流,autoAck设置为 false
            channel.basicQos(0, 1, false);
            channel.basicConsume(queueName, false, new MyConsumer(channel));
        }
    }
    

    自定义消费处理:

    public class MyConsumer extends DefaultConsumer {
    
        private Channel channel ;
    
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            System.err.println("body: " + new String(body));
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
    

    控制台输出:

    body: Hello RabbitMQ! 0
    body: Hello RabbitMQ! 1
    body: Hello RabbitMQ! 2
    body: Hello RabbitMQ! 3
    body: Hello RabbitMQ! 4
    body: Hello RabbitMQ! 5
    body: Hello RabbitMQ! 6
    body: Hello RabbitMQ! 7
    body: Hello RabbitMQ! 8
    body: Hello RabbitMQ! 9
    
  • 相关阅读:
    Python paramiko模块
    前端基础:JavaScript介绍
    前端基础:CSS属性操作
    前端基础:CSS样式选择器
    前端基础:HTML标签(下)
    20181120-10 Beta阶段第2周/共2周 Scrum立会报告+燃尽图 7
    beta阶段贡献分配实施
    Beta发布—美工+文案
    20181120-8 Beta阶段第2周/共2周 Scrum立会报告+燃尽图 06
    20181120-8 Beta阶段第2周/共2周 Scrum立会报告+燃尽图 05
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13268005.html
Copyright © 2011-2022 走看看