zoukankan      html  css  js  c++  java
  • Rabbitmq之高级特性——实现消费端限流&NACK重回队列

      如果是高并发下,rabbitmq服务器上收到成千上万条消息,那么当打开消费端时,这些消息必定喷涌而来,导致消费端消费不过来甚至挂掉都有可能。

    在非自动确认的模式下,可以采用限流模式,rabbitmq 提供了服务质量保障qos机制来控制一次消费消息数量。

    下面直接上代码:

    生产端:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.AMQP;
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.ConfirmListener;
     9 import com.rabbitmq.client.Connection;
    10 import com.rabbitmq.client.ConnectionFactory;
    11 import com.rabbitmq.client.ReturnListener;
    12 import com.rabbitmq.client.AMQP.BasicProperties;
    13 
    14 public class Producter {
    15 
    16     public static void main(String[] args) throws IOException, TimeoutException {
    17         // TODO Auto-generated method stub
    18         ConnectionFactory factory = new ConnectionFactory();
    19         factory.setHost("192.168.10.110");
    20         factory.setPort(5672);
    21         factory.setUsername("guest");
    22         factory.setPassword("guest");
    23         factory.setVirtualHost("/");
    24         Connection conn = factory.newConnection();
    25         Channel channel = conn.createChannel();
    26         String exchange001 = "exchange_001";
    27         String queue001 = "queue_001";
    28         String routingkey = "mq.topic";
    29         String body = "hello rabbitmq!===============限流策略";
    30 //        开启确认模式
    31         channel.confirmSelect();
    32 //        循环发送多条消息        
    33         for(int i = 0 ;i<10;i++){
    34         channel.basicPublish(exchange001, routingkey, null, body.getBytes());
    35     }
    36         
    37 //        添加一个返回监听========消息返回模式重要添加
    38         channel.addConfirmListener(new ConfirmListener() {
    39             
    40             @Override
    41             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    42                 System.out.println("===========NACK============");
    43                 
    44             }
    45             
    46             @Override
    47             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    48                 System.out.println("===========ACK============");
    49                 
    50             }
    51         });
    52     }
    53 
    54 }

    消费端:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 
    10 public class Receiver {
    11 
    12     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    13         // TODO Auto-generated method stub
    14         ConnectionFactory factory = new ConnectionFactory();
    15         factory.setHost("192.168.10.110");
    16         factory.setPort(5672);
    17         factory.setUsername("guest");
    18         factory.setPassword("guest");
    19         factory.setVirtualHost("/");
    20         Connection conn = factory.newConnection();
    21         Channel channel = conn.createChannel();
    22         String exchange001 = "exchange_001";
    23         String queue001 = "queue_001";
    24         String routingkey = "mq.*";
    25         channel.exchangeDeclare(exchange001, "topic", true, false, null);
    26         channel.queueDeclare(queue001, true, false, false, null);
    27         channel.queueBind(queue001, exchange001, routingkey);
    28 //        设置限流策略
    29 //        channel.basicQos(获取消息最大数[0-无限制], 依次获取数量, 作用域[true作用于整个channel,false作用于具体消费者]);
    30         channel.basicQos(0, 2, false);
    31 //        自定义消费者
    32         MyConsumer myConsumer = new MyConsumer(channel);
    33 //        进行消费,签收模式一定要为手动签收
    34         Thread.sleep(3000);
    35         channel.basicConsume(queue001, false, myConsumer);
    36     }
    37 
    38 }

    自定义消费者:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.AMQP.BasicProperties;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.DefaultConsumer;
     8 import com.rabbitmq.client.Envelope;
     9 
    10 /**
    11  * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承
    12  *
    13  */
    14 public class MyConsumer extends DefaultConsumer{
    15     private Channel channel;
    16     public MyConsumer(Channel channel) {
    17         super(channel);
    18         // TODO Auto-generated constructor stub
    19         this.channel=channel;
    20     }
    21 
    22     @Override
    23     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    24             throws IOException {
    25         System.out.println("消费标签:"+consumerTag);
    26         System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());
    27         System.out.println("envelope.getExchange():==="+envelope.getExchange());
    28         System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());
    29         System.out.println("body:==="+new String(body));
    30 //        手动签收,一定要有消费者签收,如果没有如下代码,则限流模式下,仅能打印出来channel.basicQos(0, 2, false);第二参数的2条信息
    31         channel.basicAck(envelope.getDeliveryTag(), false);
    32     }
    33     
    34 
    35 }

     重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式

    代码如下

    生产端:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.HashMap;
     5 import java.util.Map;
     6 import java.util.concurrent.TimeoutException;
     7 
     8 import org.springframework.amqp.core.Message;
     9 
    10 import com.rabbitmq.client.AMQP.BasicProperties;
    11 import com.rabbitmq.client.Channel;
    12 import com.rabbitmq.client.ConfirmListener;
    13 import com.rabbitmq.client.Connection;
    14 import com.rabbitmq.client.ConnectionFactory;
    15 import com.rabbitmq.client.ReturnListener;
    16 
    17 public class Producter {
    18 
    19     public static void main(String[] args) throws IOException, TimeoutException {
    20         // TODO Auto-generated method stub
    21         ConnectionFactory factory = new ConnectionFactory();
    22         factory.setHost("192.168.10.110");
    23         factory.setPort(5672);
    24         factory.setUsername("guest");
    25         factory.setPassword("guest");
    26         factory.setVirtualHost("/");
    27         Connection conn = factory.newConnection();
    28         Channel channel = conn.createChannel();
    29         String exchange001 = "exchange_001";
    30         String queue001 = "queue_001";
    31         String routingkey = "mq.topic";
    32         
    33 //        循环发送多条消息        
    34         for(int i = 0 ;i<5;i++){
    35             String body = "hello rabbitmq!===============ACK&重回队列,第"+i+"条";
    36             Map<String,Object> head = new HashMap<>();
    37             head.put("n", i);
    38             BasicProperties properties = new BasicProperties(null, "utf-8", head, 2, 1, null, null, null, null, null, null, null, null, null);
    39             
    40         channel.basicPublish(exchange001, routingkey, properties, body.getBytes());
    41     }
    42         
    43     }
    44 
    45 }

    消费端:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.Connection;
     8 import com.rabbitmq.client.ConnectionFactory;
     9 
    10 public class Receiver {
    11 
    12     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    13         // TODO Auto-generated method stub
    14         ConnectionFactory factory = new ConnectionFactory();
    15         factory.setHost("192.168.10.110");
    16         factory.setPort(5672);
    17         factory.setUsername("guest");
    18         factory.setPassword("guest");
    19         factory.setVirtualHost("/");
    20         Connection conn = factory.newConnection();
    21         Channel channel = conn.createChannel();
    22         String exchange001 = "exchange_001";
    23         String queue001 = "queue_001";
    24         String routingkey = "mq.*";
    25         channel.exchangeDeclare(exchange001, "topic", true, false, null);
    26         channel.queueDeclare(queue001, true, false, false, null);
    27         channel.queueBind(queue001, exchange001, routingkey);
    28 //        自定义消费者
    29         MyConsumer myConsumer = new MyConsumer(channel);
    30 //        进行消费,签收模式一定要为手动签收
    31         channel.basicConsume(queue001, false, myConsumer);
    32     }
    33 
    34 }

    自定义消费者:

     1 package com.zxy.demo.rabbitmq;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.AMQP.BasicProperties;
     6 import com.rabbitmq.client.Channel;
     7 import com.rabbitmq.client.DefaultConsumer;
     8 import com.rabbitmq.client.Envelope;
     9 
    10 /**
    11  * 可以继承,可以实现,实现的话要覆写的方法比较多,所以这里用了继承
    12  *
    13  */
    14 public class MyConsumer extends DefaultConsumer{
    15     private Channel channel;
    16     public MyConsumer(Channel channel) {
    17         super(channel);
    18         // TODO Auto-generated constructor stub
    19         this.channel=channel;
    20     }
    21 
    22     @Override
    23     public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
    24             throws IOException {
    25         System.out.println("消费标签:"+consumerTag);
    26         System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag());
    27         System.out.println("envelope.getExchange():==="+envelope.getExchange());
    28         System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey());
    29         System.out.println("body:==="+new String(body));
    30         System.out.println("===================休眠以便查看===============");
    31         try {
    32             Thread.sleep(2000);
    33         } catch (InterruptedException e) {
    34             // TODO Auto-generated catch block
    35             e.printStackTrace();
    36         }
    37 //        手动签收
    38         Integer i = (Integer) properties.getHeaders().get("n");
    39         System.out.println("iiiiiiiiiiiiiiiii======================================================"+i);
    40         if(i==1) {
    41             channel.basicNack(envelope.getDeliveryTag(),false, true);//第三个参数为是否重返队列
    42         }else {
    43             channel.basicAck(envelope.getDeliveryTag(), false);    
    44         }
    45     }
    46     
    47 
    48 }

    下面是重回队列执行结果,可以看到当消费完后第一条不断的被扔回队列然后消费再扔回。

     1 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
     2 envelope.getDeliveryTag():===1
     3 envelope.getExchange():===exchange_001
     4 envelope.getRoutingKey():===mq.topic
     5 body:===hello rabbitmq!===============ACK&重回队列,第0条
     6 ===================休眠以便查看===============
     7 iiiiiiiiiiiiiiiii======================================================0
     8 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
     9 envelope.getDeliveryTag():===2
    10 envelope.getExchange():===exchange_001
    11 envelope.getRoutingKey():===mq.topic
    12 body:===hello rabbitmq!===============ACK&重回队列,第1条
    13 ===================休眠以便查看===============
    14 iiiiiiiiiiiiiiiii======================================================1
    15 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    16 envelope.getDeliveryTag():===3
    17 envelope.getExchange():===exchange_001
    18 envelope.getRoutingKey():===mq.topic
    19 body:===hello rabbitmq!===============ACK&重回队列,第2条
    20 ===================休眠以便查看===============
    21 iiiiiiiiiiiiiiiii======================================================2
    22 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    23 envelope.getDeliveryTag():===4
    24 envelope.getExchange():===exchange_001
    25 envelope.getRoutingKey():===mq.topic
    26 body:===hello rabbitmq!===============ACK&重回队列,第3条
    27 ===================休眠以便查看===============
    28 iiiiiiiiiiiiiiiii======================================================3
    29 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    30 envelope.getDeliveryTag():===5
    31 envelope.getExchange():===exchange_001
    32 envelope.getRoutingKey():===mq.topic
    33 body:===hello rabbitmq!===============ACK&重回队列,第4条
    34 ===================休眠以便查看===============
    35 iiiiiiiiiiiiiiiii======================================================4
    36 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    37 envelope.getDeliveryTag():===6
    38 envelope.getExchange():===exchange_001
    39 envelope.getRoutingKey():===mq.topic
    40 body:===hello rabbitmq!===============ACK&重回队列,第1条
    41 ===================休眠以便查看===============
    42 iiiiiiiiiiiiiiiii======================================================1
    43 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    44 envelope.getDeliveryTag():===7
    45 envelope.getExchange():===exchange_001
    46 envelope.getRoutingKey():===mq.topic
    47 body:===hello rabbitmq!===============ACK&重回队列,第1条
    48 ===================休眠以便查看===============
    49 iiiiiiiiiiiiiiiii======================================================1
    50 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    51 envelope.getDeliveryTag():===8
    52 envelope.getExchange():===exchange_001
    53 envelope.getRoutingKey():===mq.topic
    54 body:===hello rabbitmq!===============ACK&重回队列,第1条
    55 ===================休眠以便查看===============
    56 iiiiiiiiiiiiiiiii======================================================1
    57 消费标签:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg
    58 envelope.getDeliveryTag():===9
    59 envelope.getExchange():===exchange_001
    60 envelope.getRoutingKey():===mq.topic
    61 body:===hello rabbitmq!===============ACK&重回队列,第1条
    62 ===================休眠以便查看===============
    View Code
  • 相关阅读:
    java 大数据处理类 BigDecimal 解析
    关于纠正 C/C++ 之前在函输内改变 变量的一个错误想法。
    C++ 制作 json 数据 并 传送给服务端(Server) 的 php
    介绍一个很爽的 php 字符串特定检索函数---strpos()
    如何 判断 设备 是否 连接 上 了 wifi
    android 通过访问 php 接受 or 传送数据
    正则匹配抓取input 隐藏输入项和 <td>标签内的内容
    手把手教你Chrome扩展开发:本地存储篇
    HTML5之本地存储localstorage
    初尝CDN:什么是分布式服务节点?
  • 原文地址:https://www.cnblogs.com/xiaoyao-001/p/9608665.html
Copyright © 2011-2022 走看看