消息限流处理
如果 RabbitMQ 一次性将所有消息都发送给消费端,有很大几率会导致消费端崩掉,所以需要进行限流操作。让 RabbitMQ 每次最多发送指定数量的消息,一般情况下都设置数量为1。
通过调用 channel.basicQos(0, 1, false); 方法实现限流
实例
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂并进行配置相关信息
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("111.231.83.100");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2.通过连接工厂获取一个连接对象
Connection connection = connectionFactory.newConnection();
// 3.通过连接对象获取数据通信信道对象
Channel channel = connection.createChannel();
// 4.循环发送消息
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
for (int i = 0; i < 10; i++) {
String msg = "Hello RabbitMQ! ";
msg += i;
channel.basicPublish(exchange, routingKey, null, msg.getBytes());
}
// 5.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
消费端:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂并进行配置相关信息
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("111.231.83.100");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2.通过连接工厂获取一个连接对象
Connection connection = connectionFactory.newConnection();
// 3.通过连接对象创建一个通信信道对象
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 限流,autoAck设置为 false
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
自定义消费处理:
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
System.err.println("body: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
控制台输出:
body: Hello RabbitMQ! 0
body: Hello RabbitMQ! 1
body: Hello RabbitMQ! 2
body: Hello RabbitMQ! 3
body: Hello RabbitMQ! 4
body: Hello RabbitMQ! 5
body: Hello RabbitMQ! 6
body: Hello RabbitMQ! 7
body: Hello RabbitMQ! 8
body: Hello RabbitMQ! 9