zoukankan      html  css  js  c++  java
  • rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack

    1. 在这里不提如何集成rabbit mq到spring

    2. 实现功能的配置都在消费者端:

    3. 下面是步骤和说明

    (1)在消费者端的mq配置文件上添加,配置  关键代码为 acknowledeg = "manual"
    ,意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)

    [html] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">  
    2.     <rabbit:listener queues="queue_xxx" ref="MqConsumer"/>  
    3.     <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/>  
    4. </rabbit:listener-container>  

    (2)新建一个类 MqConsumer ,并实现接口  ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。

    因为下方图所示,springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法


    (3)关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。

    一个是message(消息实体),一个是channel就是当前的通道

    很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。


    basicAck()方法解析:



    第一个参数 deliveryTag:就是接受的消息的deliveryTag,可以通过msg.getMessageProperties().getDeliveryTag()获得

         第二个参数 multiple:如果为true,确认之前接受到的消息;如果为false,只确认当前消息。如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,能够提高效率


    同样的,如果要nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(要指定tagId)。

    注意如果抛异常或nack(并且requeue为true),消息会重新入队列,并且会造成消费者不断从队列中读取同一条消息的假象。



    //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    //ack返回false,并重新回到队列,api里面解释得很清楚

    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

    //拒绝消息

    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

  • 相关阅读:
    过滤非GBK字符
    打印整数数字
    std::string 方法列表
    设计模式——单例模式(Singleton )
    编程之美二进制一的个数
    Jsoncpp试用指南
    GCC下宏扩展后的++i
    关于字节对齐的sizeof的讨论
    Notepad++ 更改和定制主题
    求子数组的最大和
  • 原文地址:https://www.cnblogs.com/chenny3/p/10226169.html
Copyright © 2011-2022 走看看