zoukankan      html  css  js  c++  java
  • RabbitMQ 消息确认

    参考:https://blog.csdn.net/qq_29914837/article/details/93376741

    消息确认方式:

    1、消息发送确认

    a、通过AMQP(高级消息队列协议)事务确认

    (1)txSelect用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。
    (2)txCommit用于提交事务。当开启了事务模式后,只有当一个消息被所有的镜像队列保存完毕后,RabbitMQ才会调用tx.commit-ok返回给客户端。
    (3)txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了

    channel.txSelect();
     //ConfirmConfig.exchangeName(交换机名称)
     //ConfirmConfig.routingKey(路由键)
     //message (消息内容)
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message));
    channel.txCommit();
    

    b、由rabbitmq实现的confirm跟return机制

    生产者通过confirm机制确认消息是否送达。producer端消息确认分两步,一时确认是否到达交换机,二是确认是否到达队列。

    confirm实现原理
    生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

    前提:需要开启publisher-confirms: true
    对于ConfirmCallback来说:
    如果消息没有到exchange,则confirm回调,ack=false
    如果消息到达exchange,则confirm回调,ack=true

    对于ReturnCallback来说:
    exchange到queue成功,则不回调return
    exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,消息就丢了)
    比如路由不到队列时触发回调

    2、消息接收确认

    acknowledge机制说明:
    a、通过 ACK机制(消息确认机制)确认消息是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
    b、默认情况下,一条消息被消费者正确消费就会从队列中移除
    c、自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端处理逻辑抛出异常,也就是消费端没有成功处理这条消息,那么就相当于丢失了消息
    d、如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
    e、如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作
    f、如果某个服务忘记确认 ACK 了,则 RabbitMQ 不会再发送此消息数据给它,只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。
    g、ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

    acknowledge的三种模式
    a、none:默认情况下消息消费者是NONE模式,默认所有消息消费成功,会不断的向消费者推送消息。因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险
    b、auto:在自动确认模式下,消息发送后即被认为成功投递,不管消费者端是否成功处理本次投递
    c、mandal:消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功

    注:使用下面手动签收消息的方式需要将Acknowledge设置为manual
    channel.basicAck(deliveryTag, multiple);
    deliveryTag:deliveryTag(唯一标识 ID):当一个消费者向RabbitMQ注册后,会建立起一个Channel,RabbitMQ会用basic.deliver方法向消费者推送消息,这个方法携带了一个delivery tag,它代表了RabbitMQ向该Channel投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag的范围仅限于Channel
    multiple:false表示ack当前消息;true表示将一次性ack所有小于deliveryTag的消息。

    channel.basicNack(deliveryTag, multiple, requeue);
    requeue:false表示丢弃消息,true表示将消息重新入队

    channel.basicReject(deliveryTag, requeue);
    requeue:false表示丢弃消息,true表示将消息重新入队
    basicNack()与basicReject()的区别在于basicReject方法只处理一条消息。

    —转载请注明出处
  • 相关阅读:
    CocoaPods 的安装和使用介绍
    C 语言宏快速入门
    C中的预编译宏定义
    __block 与 __weak的区别理解
    Xcode7 项目转 Xcode6 时 出现问题
    css margin 参数
    第三方框架 INTULocationManager 定位的一些方法
    ios 定位 监听是否跨入某个指定的区域
    ios 指南针
    ios 定位 航向检测
  • 原文地址:https://www.cnblogs.com/landiss/p/14575798.html
Copyright © 2011-2022 走看看