zoukankan      html  css  js  c++  java
  • 消费端限流策略

    使用场景

    首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况:
    巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!
    Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息
    (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息。

    具体方法

    void BasicQos(unit prefetchSize, ushort prefetchCount, boolean global);
    prefetchSize:单条消息的大小限制,通常设置为0,意思是不做限制
    prefetchCount:消息的条数,一般设置为1条
    global:消息针对的级别,true:channel级别,false:consumer级别,通常设置为false

    注意:prefetchSize和global这两项,rabbitmq没有实现,暂且不做研究,prefetchCount在自动应答的情况下是不生效的,必须进行手动签收

    创建生产者

    package com.dwz.rabbitmq.qos;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_qos_exchange";
            String routingkey = "qos.save";
            String msg = "Hello rabbitmq qos message!";
            
            for(int i = 0; i < 5; i++) {
                channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
            }

          channel.close();
          connection.close();

        }
    }

    创建消费者

    package com.dwz.rabbitmq.qos;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.dwz.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            
            String exchangeName = "test_qos_exchange";
            String routingkey = "qos.#";
            String queueName = "test_qos_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingkey);
            //限流
            channel.basicQos(0, 1, false);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //手动签收(因为只传递一条数据过来,所以不用批量接收 multiple=false)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            
            //限流方式,第一件事就是autoAck设置为false
            channel.basicConsume(queueName, false, consumer);
        }
    }

    相关文章:

    RabbitMQ消费端限流策略(十)

  • 相关阅读:
    转 TClientDataSet的地位
    让Delphi的DataSnap发挥最大效率
    Delphi2010强化的反射
    Delphi的内存管理及内存泄露问题
    DataSnap 传输过滤器
    delphi dataset not in edit or insert mode
    CSS中Zoom属性的一些介绍
    Linux进程分析
    MIPS处理器介绍
    Linux内存管理
  • 原文地址:https://www.cnblogs.com/zheaven/p/11832840.html
Copyright © 2011-2022 走看看