zoukankan      html  css  js  c++  java
  • rabbitmq消息ACK确认机制及发送失败处理

    rabbitmq为确保消息发送和接收成功,采用ack机制。
    (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功;
    (2)消费者consumer接收处理消息后,consumer会发送ack给mq告知消息是否处理成功;
    通过ack机制,确保消息能够被producter成功发送和consumer成功接收处理,保证消息不丢失。

    1、消息发送
    rabbitmq消息发送分为两个阶段:
    (1)producter将消息发送到broker,即发送到exchage交换机;
    (2)消息通过交换机exchange被路由到队列queue;
    消息只有被正确投递到队列queue中,才算发送成功。

    消息发送代码:


        public boolean send(String queueName, String json, String msgId){
            Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化
            CorrelationDataExt correlationData = new CorrelationDataExt();
            correlationData.setId(msgId);
            correlationData.setData(json);
            rabbitTemplate.setEncoding("UTF-8");
            rabbitTemplate.setMandatory(true);//设置手工ack确认
            rabbitTemplate.setConfirmCallback(this);//ack回调
            rabbitTemplate.setReturnCallback(this);//回退回调
            rabbitTemplate.convertAndSend(queueName, message, correlationData);
            return true;
        }



    在消息发送之前,我们要设置ack机制相关参数:
    setMandatory:设置手工确认ack;
    setConfirmCallback:设置消息发送到exchange结果回调;
    setReturnCallback:设置消息投递到queue失败回退时回调;

    通过上述两个回调方法,我们能够对发送失败的消息进行重发处理,确保消息不丢失。

    2、消息发送失败
    根据rabbitmq发送过程,消息发送失败的有三种情况会出现:
    (1)producter连接mq失败,消息没有发送到mq
    (2)producter连接mq成功,但是发送到exchange失败
    (3)消息发送到exchange成功,但是路由到queue失败;

    3、发送失败处理
    (1)producter连接mq失败,消息没有发送到mq
    这种情况下,在发送消息时可以通过捕捉AmqpException异常,将消息保存db中后续进行重发处理。

            try{
                rabbitTemplate.convertAndSend(queueName, message, correlationData);
            }catch (Exception e){
                logger.error("连接MQ失败", e);
                //todo 存储到db中进行重发
            }



    (2)producter连接mq成功,但是发送到exchange失败
    通过实现ConfirmCallback接口,对发送结果进行处理。

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String msgId = correlationData.getId();
            if(ack){
                //发送成功
                logger.debug("ack,消息投递到exchange成功,msgId:{}",msgId);
            }else{
                //发送失败,重试
                logger.error("ack,消息投递exchange失败,msgId:{},原因{}" ,msgId, cause);
               
            }
        }

    confirm方法有3个参数,correlationData是消息发送时携带的数据对象,ack消息是否成功发送到exchange,cause是发送失败时的原因。
    通过ack我们可以判断发送到exchange是否成功,如果ack=false,则我们进行失败处理。
    但是这里存在一个问题,correlationData里面只有一个id属性,没有关于消息内容的属性,对于数据失败处理非常不方便。
    为解决此问题,我们可以自定义一个CorrelationData扩展对象,继承CorrelationData,并添加自己想要保存数据的属性,在消息发送时,携带相关数据在该对象上即可。

    自定义CorrelationData对象:

    /**
     * CorrelationData的自定义实现,用于拿到消息内容
     */
    public class CorrelationDataExt extends CorrelationData {
        //数据
        private volatile Object data;
        //队列
        private String queueName;

        public Object getData() {
            return data;
        }

        public void setData(Object data) {
            this.data = data;
        }

        public String getQueueName() {
            return queueName;
        }

        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
    }


    重写发送方法,使用CorrelationDataExt对象携带数据:

        public boolean send(String queueName, String json, String msgId){
            Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化

            //使用自定义的数据对象
            CorrelationDataExt correlationData = new CorrelationDataExt();
            correlationData.setId(msgId);
            correlationData.setData(json);
            correlationData.setQueueName(queueName);

            rabbitTemplate.setEncoding("UTF-8");
            rabbitTemplate.setMandatory(true);//设置手工ack确认
            rabbitTemplate.setConfirmCallback(this);//设置发送成功回调
            rabbitTemplate.setReturnCallback(this);//设置消息回退回调
            try{
                rabbitTemplate.convertAndSend(queueName, message, correlationData);//使用amqp default exchange direct
            }catch (Exception e){
                logger.error("MQ连接失败,请联系管理员处理!!!!");
                //保存到db重发
                saveToDB(msgId, json, queueName, "90");
            }
            return true;
        }

    重写confirm方法,对CorrelationData进行处理:


        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String msgId = correlationData.getId();
            if(ack){
                //发送成功
                logger.debug("ack,消息投递到exchange成功,msgId:{}",msgId);
            }else{
                //发送失败,重试
                logger.error("ack,消息投递exchange失败,msgId:{},原因{}" ,msgId, cause);
                if(correlationData instanceof CorrelationDataExt){
                    CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
                    String message = (String) correlationDataExt.getData();
                    String queueName = ((CorrelationDataExt) correlationData).getQueueName();
                    saveToDB(msgId, message, queueName, "91");
                }else{
                    logger.info("correlationData对象不包含数据");
                }
            }
        }

     

    (3)消息发送到exchange成功,但是路由到queue失败
    通过实现ReturnCallback接口,对回退消息进行重发处理。

        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            logger.error("消息发送失败-消息回退,应答码:{},原因:{},交换机:{},路由键:{}", replyCode, replyText, exchange, routingKey);
            String msgId = message.getMessageProperties().getCorrelationId();
            String data = new String(message.getBody());
            saveToDB(msgId, data, routingKey, "92");
        }



    关于对失败消息的处理,我这里是统一保存到DB中,后续通过定时任务进行重发处理的。

    通过以上3个方面对失败消息的处理,可以确保消息能够成功发送到mq,确保不丢失。

  • 相关阅读:
    Css进阶
    Css布局
    遇到的小问题
    MySQL 8.017连接Navicat中出现的问题
    ConcurrentHashMap图文源码解析
    HashMap图文源码解析
    接口和抽象类
    dependencies 和 devDependencies
    2020.7.7第二天
    2020.7.6第一天
  • 原文地址:https://www.cnblogs.com/tiancai/p/13322220.html
Copyright © 2011-2022 走看看