zoukankan      html  css  js  c++  java
  • RabbitMQ笔记-Qos与消息应答

    QOS:服务质量保证功能

    Prefetch count (预取数目)

    prefetch是指单一消费者最多能消费的unacked messages数目。
    mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。
    更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。

    前面我们提到了如果有多个消费者同时订阅同一个Quque中的消息,Quque中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能导致某些消费者一直很忙,而另一些消费者很快处理完手头上工作,并一直空闲的情况下。我们可以通过设置prefetch count=1,则Quque每次给每个消费者发送一条消息;消费者处理完这条消息后Quque会再给消费者发送一条消息。
    试想一下,如果我们单个消费者1分钟最多处理60条消息,但是生产者1分钟可能会发送300条消息,如果我们一台消费者客户端,1分钟同时要接收到300条消息,已经超过我们最大的负载,这时,就可能导致,服务器资源被耗尽,消费者客户端卡死等情况。
    RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
    PrefetchCount:预取数量,在接收到该Consumer的ack前,他它不会将新的Message分发给它

    启用Qos和手动Ack可能导致队列阻塞:
    如果consumer接收到消息后没有ack或者unack,并且使用Qos限制了prefetch数量,挡unacked消息太多就会造成堵塞

    应答方式:
    basicAck:成功消费,消息从队列中删除
    basicNack:拒绝消息,requeue=true,消息重新进入队列,false被删除
    basicReject:等同于basicNack
    basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer

    basicQoc:

    设置服务端每次发送给消费者的消息数量

    /**
    * prefetchSize:服务器传送最大内容量(以八位字节计算),如果没有限制,则为0
    * prefetchCount:服务器每次传递的最大消息数,如果没有限制,则为0;
    * global:如果为true,则当前设置将会应用于整个Channel(频道)
    **/
    void basicQos(int prefetchSize, int prefetchCount, boolean global)
    

    消息回复:

    Ack(确认)收到一个或者多个消息

    /**
    * 消费者确认收到一个或者多个消息
    * deliveryTag:服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息的唯一标识,是一个递增的正整数
    * multiple:true表示确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false只确认deliveryTag指定的消息
    **/
    void basicAck(long deliveryTag, boolean multiple)
    

    basicRecover:重新发送

    /**
    * 要求代理重新发送未确认的消息
    * requeue:如果为true,消息将会重新入队,可能会被发送给其它的消费者;如果为false,消息将会发送给相同的消费者
    **/
    Basic.basicRecover(boolean requeue)
    

    basicNack:拒绝消息

    /**
    * 拒绝接收到的一个或者多个消息
    * deliveryTag:接收到消息的唯一标识
    * multiple: true表示拒绝所有的消息,包括提供的deliveryTag;false表示仅拒绝提供的deliveryTag
    * requeue:true 表示拒绝的消息应重新入队,而不是否丢弃
    */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    

    basicReject:拒绝消息

    参考:https://blog.csdn.net/james_searcher/article/details/70308565

  • 相关阅读:
    redis的五种常见数据类型的常用指令
    Linux常用的命令
    moco操作
    如何使用GoEasy实现PHP与Websocket实时通信
    浅谈websocket
    nginx 配置虚拟主机访问PHP文件 502错误的解决方法
    集群/分布式环境下5种session处理策略
    nginx 集群
    使用Nginx实现反向代理
    nginx的配置和基本使用命令
  • 原文地址:https://www.cnblogs.com/fanfan-90/p/13589626.html
Copyright © 2011-2022 走看看