如果是高并发下,rabbitmq服务器上收到成千上万条消息,那么当打开消费端时,这些消息必定喷涌而来,导致消费端消费不过来甚至挂掉都有可能。
在非自动确认的模式下,可以采用限流模式,rabbitmq 提供了服务质量保障qos机制来控制一次消费消息数量。
下面直接上代码:
生产端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.AMQP; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.ConfirmListener; 9 import com.rabbitmq.client.Connection; 10 import com.rabbitmq.client.ConnectionFactory; 11 import com.rabbitmq.client.ReturnListener; 12 import com.rabbitmq.client.AMQP.BasicProperties; 13 14 public class Producter { 15 16 public static void main(String[] args) throws IOException, TimeoutException { 17 // TODO Auto-generated method stub 18 ConnectionFactory factory = new ConnectionFactory(); 19 factory.setHost("192.168.10.110"); 20 factory.setPort(5672); 21 factory.setUsername("guest"); 22 factory.setPassword("guest"); 23 factory.setVirtualHost("/"); 24 Connection conn = factory.newConnection(); 25 Channel channel = conn.createChannel(); 26 String exchange001 = "exchange_001"; 27 String queue001 = "queue_001"; 28 String routingkey = "mq.topic"; 29 String body = "hello rabbitmq!===============限流策略"; 30 // 开启确认模式 31 channel.confirmSelect(); 32 // 循环发送多条消息 33 for(int i = 0 ;i<10;i++){ 34 channel.basicPublish(exchange001, routingkey, null, body.getBytes()); 35 } 36 37 // 添加一个返回监听========消息返回模式重要添加 38 channel.addConfirmListener(new ConfirmListener() { 39 40 @Override 41 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 42 System.out.println("===========NACK============"); 43 44 } 45 46 @Override 47 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 48 System.out.println("===========ACK============"); 49 50 } 51 }); 52 } 53 54 }
消费端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver { 11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 13 // TODO Auto-generated method stub 14 ConnectionFactory factory = new ConnectionFactory(); 15 factory.setHost("192.168.10.110"); 16 factory.setPort(5672); 17 factory.setUsername("guest"); 18 factory.setPassword("guest"); 19 factory.setVirtualHost("/"); 20 Connection conn = factory.newConnection(); 21 Channel channel = conn.createChannel(); 22 String exchange001 = "exchange_001"; 23 String queue001 = "queue_001"; 24 String routingkey = "mq.*"; 25 channel.exchangeDeclare(exchange001, "topic", true, false, null); 26 channel.queueDeclare(queue001, true, false, false, null); 27 channel.queueBind(queue001, exchange001, routingkey); 28 // 设置限流策略 29 // channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]); 30 channel.basicQos(0, 2, false); 31 // 自定义消费者 32 MyConsumer myConsumer = new MyConsumer(channel); 33 // 进行消费,签收模式一定要为手动签收 34 Thread.sleep(3000); 35 channel.basicConsume(queue001, false, myConsumer); 36 } 37 38 }
自定义消费者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /** 11 * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承 12 * 13 */ 14 public class MyConsumer extends DefaultConsumer{ 15 private Channel channel; 16 public MyConsumer(Channel channel) { 17 super(channel); 18 // TODO Auto-generated constructor stub 19 this.channel=channel; 20 } 21 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 24 throws IOException { 25 System.out.println("消费标签:"+consumerTag); 26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag()); 27 System.out.println("envelope.getExchange():==="+envelope.getExchange()); 28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey()); 29 System.out.println("body:==="+new String(body)); 30 // 手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 34 35 }
重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式
代码如下
生产端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.concurrent.TimeoutException; 7 8 import org.springframework.amqp.core.Message; 9 10 import com.rabbitmq.client.AMQP.BasicProperties; 11 import com.rabbitmq.client.Channel; 12 import com.rabbitmq.client.ConfirmListener; 13 import com.rabbitmq.client.Connection; 14 import com.rabbitmq.client.ConnectionFactory; 15 import com.rabbitmq.client.ReturnListener; 16 17 public class Producter { 18 19 public static void main(String[] args) throws IOException, TimeoutException { 20 // TODO Auto-generated method stub 21 ConnectionFactory factory = new ConnectionFactory(); 22 factory.setHost("192.168.10.110"); 23 factory.setPort(5672); 24 factory.setUsername("guest"); 25 factory.setPassword("guest"); 26 factory.setVirtualHost("/"); 27 Connection conn = factory.newConnection(); 28 Channel channel = conn.createChannel(); 29 String exchange001 = "exchange_001"; 30 String queue001 = "queue_001"; 31 String routingkey = "mq.topic"; 32 33 // 循环发送多条消息 34 for(int i = 0 ;i<5;i++){ 35 String body = "hello rabbitmq!===============ACK&重回队列,第"+i+"条"; 36 Map<String,Object> head = new HashMap<>(); 37 head.put("n", i); 38 BasicProperties properties = new BasicProperties(null, "utf-8", head, 2, 1, null, null, null, null, null, null, null, null, null); 39 40 channel.basicPublish(exchange001, routingkey, properties, body.getBytes()); 41 } 42 43 } 44 45 }
消费端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver { 11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 13 // TODO Auto-generated method stub 14 ConnectionFactory factory = new ConnectionFactory(); 15 factory.setHost("192.168.10.110"); 16 factory.setPort(5672); 17 factory.setUsername("guest"); 18 factory.setPassword("guest"); 19 factory.setVirtualHost("/"); 20 Connection conn = factory.newConnection(); 21 Channel channel = conn.createChannel(); 22 String exchange001 = "exchange_001"; 23 String queue001 = "queue_001"; 24 String routingkey = "mq.*"; 25 channel.exchangeDeclare(exchange001, "topic", true, false, null); 26 channel.queueDeclare(queue001, true, false, false, null); 27 channel.queueBind(queue001, exchange001, routingkey); 28 // 自定义消费者 29 MyConsumer myConsumer = new MyConsumer(channel); 30 // 进行消费,签收模式一定要为手动签收 31 channel.basicConsume(queue001, false, myConsumer); 32 } 33 34 }
自定义消费者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /** 11 * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承 12 * 13 */ 14 public class MyConsumer extends DefaultConsumer{ 15 private Channel channel; 16 public MyConsumer(Channel channel) { 17 super(channel); 18 // TODO Auto-generated constructor stub 19 this.channel=channel; 20 } 21 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 24 throws IOException { 25 System.out.println("消费标签:"+consumerTag); 26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag()); 27 System.out.println("envelope.getExchange():==="+envelope.getExchange()); 28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey()); 29 System.out.println("body:==="+new String(body)); 30 System.out.println("===================休眠以便查看==============="); 31 try { 32 Thread.sleep(2000); 33 } catch (InterruptedException e) { 34 // TODO Auto-generated catch block 35 e.printStackTrace(); 36 } 37 // 手动签收 38 Integer i = (Integer) properties.getHeaders().get("n"); 39 System.out.println("iiiiiiiiiiiiiiiii======================================================"+i); 40 if(i==1) { 41 channel.basicNack(envelope.getDeliveryTag(),false, true);//第三个参数为是否重返队列 42 }else { 43 channel.basicAck(envelope.getDeliveryTag(), false); 44 } 45 } 46 47 48 }
下面是重回队列执行结果,可以看到当消费完后第一条不断的被扔回队列然后消费再扔回。
1 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 2 envelope.getDeliveryTag():===1 3 envelope.getExchange():===exchange_001 4 envelope.getRoutingKey():===mq.topic 5 body:===hello rabbitmq!===============ACK&重回队列,第0条 6 ===================休眠以便查看=============== 7 iiiiiiiiiiiiiiiii======================================================0 8 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 9 envelope.getDeliveryTag():===2 10 envelope.getExchange():===exchange_001 11 envelope.getRoutingKey():===mq.topic 12 body:===hello rabbitmq!===============ACK&重回队列,第1条 13 ===================休眠以便查看=============== 14 iiiiiiiiiiiiiiiii======================================================1 15 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 16 envelope.getDeliveryTag():===3 17 envelope.getExchange():===exchange_001 18 envelope.getRoutingKey():===mq.topic 19 body:===hello rabbitmq!===============ACK&重回队列,第2条 20 ===================休眠以便查看=============== 21 iiiiiiiiiiiiiiiii======================================================2 22 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 23 envelope.getDeliveryTag():===4 24 envelope.getExchange():===exchange_001 25 envelope.getRoutingKey():===mq.topic 26 body:===hello rabbitmq!===============ACK&重回队列,第3条 27 ===================休眠以便查看=============== 28 iiiiiiiiiiiiiiiii======================================================3 29 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 30 envelope.getDeliveryTag():===5 31 envelope.getExchange():===exchange_001 32 envelope.getRoutingKey():===mq.topic 33 body:===hello rabbitmq!===============ACK&重回队列,第4条 34 ===================休眠以便查看=============== 35 iiiiiiiiiiiiiiiii======================================================4 36 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 37 envelope.getDeliveryTag():===6 38 envelope.getExchange():===exchange_001 39 envelope.getRoutingKey():===mq.topic 40 body:===hello rabbitmq!===============ACK&重回队列,第1条 41 ===================休眠以便查看=============== 42 iiiiiiiiiiiiiiiii======================================================1 43 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 44 envelope.getDeliveryTag():===7 45 envelope.getExchange():===exchange_001 46 envelope.getRoutingKey():===mq.topic 47 body:===hello rabbitmq!===============ACK&重回队列,第1条 48 ===================休眠以便查看=============== 49 iiiiiiiiiiiiiiiii======================================================1 50 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 51 envelope.getDeliveryTag():===8 52 envelope.getExchange():===exchange_001 53 envelope.getRoutingKey():===mq.topic 54 body:===hello rabbitmq!===============ACK&重回队列,第1条 55 ===================休眠以便查看=============== 56 iiiiiiiiiiiiiiiii======================================================1 57 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 58 envelope.getDeliveryTag():===9 59 envelope.getExchange():===exchange_001 60 envelope.getRoutingKey():===mq.topic 61 body:===hello rabbitmq!===============ACK&重回队列,第1条 62 ===================休眠以便查看===============