一.消费端限流场景
如果RabbitMQ服务上堆积了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间被推送过来,但是单个客户端无法同时处理这么多消息,可能会导致服务器宕机,产生线上故障。
所以RabbitMQ提供了一种qos功能(服务质量保证),即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
二.BasicQos方法
void BasicQos(int prefetchSize,int prefetchCount,boolean global)
prefetchSize:消费端一般设置为0
prefetchCount:消费者同时接收消息的个数
global:true/false 是否将上面的设置应用于channel级别(是channel级别还是consumer级别)
prefetchCount和global这两项,rabbitmq没有实现,即在自动应答情况下这两个值是不生效的。
消费端
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String queueName = "qos";
channel.queueDeclare(queueName,true,false,false,null);
//限流策略,第一件事要设置autoAck为false,即下面basicConsume方法的第二个参数
channel.basicQos(0,1,false);
//设置Channel
channel.basicConsume(queueName,false,new MyConsumer(channel));
}
}
自定义consumer:
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope);
System.out.println(properties);
System.out.println(new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
二.消费端ACK与重回队列
消费端ACK使用场景:
1.消费端进行消费的时候,如果由于业务异常我们可以进行日志记录,然后进行补偿。
2.由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。
生产端:
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//通过Channel发送数据
// 在这里要设置Mandatory(第三个参数)为true,否则broker会自动删除消息
for(int i=0;i<10;i++){
Map<String ,Object> hearders = new HashMap<>();
hearders.put("ack","ok"+i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(hearders)
.build();
channel.basicPublish("","ack",properties,"hello world".getBytes());
}
channel.close();
connection.close();
}
消费端:
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.10.132");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//创建连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个Channel
Channel channel = connection.createChannel();
//创建一个队列
String queueName = "ack";
channel.queueDeclare(queueName,true,false,false,null);
//设置Channel
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
Object ack = properties.getHeaders().get("ack");
System.out.println(ack.toString());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if("ok0".equals(ack.toString())){
//表示消息处理失败了,设置重回队列,Broker端就会将没有成功处理的消息重新发送,并且位于队列底端。
//参数3:requeue 是否重回队列(实际生产会设置false)
channel.basicNack(envelope.getDeliveryTag(),false,true);
}else{
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//手工签收,必须关闭autoAck(false)
channel.basicConsume(queueName,false,consumer);
}
打印结果:
