zoukankan      html  css  js  c++  java
  • RabbitMQ---5、qos内存溢出+prefetch消息堵塞问题

    1、prefetch消息堵塞问题

    mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。

    ConnectionFactory:创建connection的工厂类

    Connection: 简单理解为socket

    Channel:和mq交互的接口,定义queue、exchange和绑定queue、exhange等接口都是它。

    接下来就是和mq的交互类

    exchange:简单地看成路由,类型不是重点,看看官网即可

    queue:客户端监听的是queue,而不是exchange,但是使用queue的前提要先将exchange和queue绑定。用过java queue工具类应该很容易上手,queue分为写和读,各自可以有自己频率,写得快读得慢,容易堵塞;写得慢读得快又容易造成消费者的空闲。

    Prefetc:一个重要却容易被忽略的指标,也是这次遇到的问题。 
    prefetch与消息投递

    prefetch是指单一消费者最多能消费的unacked messages数目。

    如何理解呢?

    mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。

    更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

    我遇到的问题是一个粗心的程序员,在编写代码的时候,他对某些消息处理方式是这样的

      if (success) {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        } else {
                            logger.error("######### The message is not delete from queue : {}", body);
                        }

    首先他讲ack机制设置为手动的,然后他的理解是如果处理成功的消息,就ack给MQ,期望MQ就可以删除完成的数据。不然,保留数据再次被处理。

    这里的误区就是就是对ack的理解,失败的时候,如果需要让程序继续处理,应该使用basicNack,并告诉mq将消息再次放入队列

        if (success) {
                                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            } else {
                                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                            }

    对于客户端意外宕机的情况,没有ack服务器确实不会删除掉数据,但是consumer重启以后,对于服务器就是一个新的消费者了,也就是它的缓冲区又被重置为原来的n-prefetch,所以这个问题被粗心的小哥想当然地测试通过了。

    prefetch的大小应该为多少

    这篇文章给了很好的建议,我简单地说一下我的理解。

    理想状况下,计算MQ SERVER 从缓冲区中拿到消息并推送到消费端,加上消费端处理完ack消息到MQ server,的时间,假设为100ms,其中消费端处理业务话费了10ms。

    这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能毛姑姑的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。

    所以如果你保证所有的消息都ack了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。一开始建议采用或者,使用一个线程池来处理这些业务逻辑。

    2、qos内存溢出----队列超长QueueingConsumer导致JVM内存溢出

    我们的服务器使用RabbitMQ作为消息中转的容器。某天我怀疑RabbitMQ队列是否都能及时消化。于是用命令查询了下:rabbitmqctllist_vhosts | grep -P ".*.host" | xargs -i rabbitmqctl list_queues-p {} | grep "queue"。 

    不查不知道,一查吓一跳:大多数服务器的队列基本上都是空的,但是有些服务器的某个队列竟然有超过500W条的记录。一般RabbitMQ进程占用内存不过100M-200M,这些队列超长的RabbitMQ进程可以占用超过2G的内存。
         显然消息队列的消费者出现了问题。开发查看日志发现作为该队列消费者的Java服务的日志也卡住了,重启服务后(这点做得不对,应该用jstat、jstack进行排查,而不是直接重启)又很快卡住。这时候他才想起来用jstat,通过jstat发现JVM内存都耗尽了,之后进入无尽的FullGC,所以当然不会处理队列消息和输出日志信息了。jstat的输出如下:
    --------------------------------------------------------------------------
    [root@mail ~]# jstat -gcutil 29389
      S0     S1    E     O     P    YGC     YGCT   FGC   FGCT     GCT  
    100.00   0.00 100.00 100.00 59.59   1639   2.963 219078 99272.246 99275.209
    --------------------------------------------------------------------------
         使用jmap导出这时候的Java堆栈,命令:jmap-dump:format=b,file=29389.hprof 29389。将得到的dump文件放到MAT(EclipseMemory Analyzer)里进行分析,发现很明显是QueueingConsumer持有大量对象导致JVM内存溢出,截图如下:
         解决RabbitMQ队列超长QueueingConsumer导致JVM内存溢出的问题
        上网搜索了下,发现有人遇到了类似的问题:RabbitMQ QueueingConsumer possible memoryleak 。解决办法是调用Channel的basicQos方法,设置临时内存中最多保存的消息数。这个数值的设置建议参考 《Some queuing theory: throughput, latency andbandwidth》 权衡决定。
         拍脑袋将basicQos设置为16后重启服务,队列终于开始消化了。用jstat观察JVM内存使用情况,也没有再出现剧增溢出的现象。
         总结:使用RabbitMQ的时候,一定要合理地设置QoS参数。我感觉RabbitMQ的默认做法其实是很脆弱的,容易导致雪崩。“Youhave a queue in Rabbit. You have some clients consuming from thatqueue. If you don't set a QoS setting at all (basic.qos), thenRabbit will push all the queue's messages to the clients as fast asthe network and the clients will allow.“。这样如果由于某些原因,队列中堆积了比较多的消息,就可能导致Comsumer内存溢出卡死,于是发生恶性循环,队列消息不断堆积得不到消化,彻底地悲剧了。
  • 相关阅读:
    牛客网 二叉树的镜像 JAVA
    牛客网 反转链表 JAVA
    牛客网 调整数组顺序使奇数位于偶数前面 JAVA
    Integer to Roman LeetCode Java
    Valid Number leetcode java
    Longest Common Prefix
    Wildcard Matching leetcode java
    Regular Expression Matching
    Longest Palindromic Substring
    Add Binary LeetCode Java
  • 原文地址:https://www.cnblogs.com/xiaohua19920/p/9584658.html
Copyright © 2011-2022 走看看