zoukankan      html  css  js  c++  java
  • rabbitmq消息ACK确认机制及发送失败处理

    rabbitmq为确保消息发送和接收成功,采用ack机制。
    (1)生产者producter发送消息到mq时,mq会发送ack给producter告知消息是否投递成功;
    (2)消费者consumer接收处理消息后,consumer会发送ack给mq告知消息是否处理成功;
    通过ack机制,确保消息能够被producter成功发送和consumer成功接收处理,保证消息不丢失。

    1、消息发送
    rabbitmq消息发送分为两个阶段:
    (1)producter将消息发送到broker,即发送到exchage交换机;
    (2)消息通过交换机exchange被路由到队列queue;
    消息只有被正确投递到队列queue中,才算发送成功。

    消息发送代码:


        public boolean send(String queueName, String json, String msgId){
            Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化
            CorrelationDataExt correlationData = new CorrelationDataExt();
            correlationData.setId(msgId);
            correlationData.setData(json);
            rabbitTemplate.setEncoding("UTF-8");
            rabbitTemplate.setMandatory(true);//设置手工ack确认
            rabbitTemplate.setConfirmCallback(this);//ack回调
            rabbitTemplate.setReturnCallback(this);//回退回调
            rabbitTemplate.convertAndSend(queueName, message, correlationData);
            return true;
        }



    在消息发送之前,我们要设置ack机制相关参数:
    setMandatory:设置手工确认ack;
    setConfirmCallback:设置消息发送到exchange结果回调;
    setReturnCallback:设置消息投递到queue失败回退时回调;

    通过上述两个回调方法,我们能够对发送失败的消息进行重发处理,确保消息不丢失。

    2、消息发送失败
    根据rabbitmq发送过程,消息发送失败的有三种情况会出现:
    (1)producter连接mq失败,消息没有发送到mq
    (2)producter连接mq成功,但是发送到exchange失败
    (3)消息发送到exchange成功,但是路由到queue失败;

    3、发送失败处理
    (1)producter连接mq失败,消息没有发送到mq
    这种情况下,在发送消息时可以通过捕捉AmqpException异常,将消息保存db中后续进行重发处理。

            try{
                rabbitTemplate.convertAndSend(queueName, message, correlationData);
            }catch (Exception e){
                logger.error("连接MQ失败", e);
                //todo 存储到db中进行重发
            }



    (2)producter连接mq成功,但是发送到exchange失败
    通过实现ConfirmCallback接口,对发送结果进行处理。

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String msgId = correlationData.getId();
            if(ack){
                //发送成功
                logger.debug("ack,消息投递到exchange成功,msgId:{}",msgId);
            }else{
                //发送失败,重试
                logger.error("ack,消息投递exchange失败,msgId:{},原因{}" ,msgId, cause);
               
            }
        }

    confirm方法有3个参数,correlationData是消息发送时携带的数据对象,ack消息是否成功发送到exchange,cause是发送失败时的原因。
    通过ack我们可以判断发送到exchange是否成功,如果ack=false,则我们进行失败处理。
    但是这里存在一个问题,correlationData里面只有一个id属性,没有关于消息内容的属性,对于数据失败处理非常不方便。
    为解决此问题,我们可以自定义一个CorrelationData扩展对象,继承CorrelationData,并添加自己想要保存数据的属性,在消息发送时,携带相关数据在该对象上即可。

    自定义CorrelationData对象:

    /**
     * CorrelationData的自定义实现,用于拿到消息内容
     */
    public class CorrelationDataExt extends CorrelationData {
        //数据
        private volatile Object data;
        //队列
        private String queueName;

        public Object getData() {
            return data;
        }

        public void setData(Object data) {
            this.data = data;
        }

        public String getQueueName() {
            return queueName;
        }

        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
    }


    重写发送方法,使用CorrelationDataExt对象携带数据:

        public boolean send(String queueName, String json, String msgId){
            Message message = MessageBuilder.withBody(json.getBytes()).setCorrelationId(msgId).build();
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化

            //使用自定义的数据对象
            CorrelationDataExt correlationData = new CorrelationDataExt();
            correlationData.setId(msgId);
            correlationData.setData(json);
            correlationData.setQueueName(queueName);

            rabbitTemplate.setEncoding("UTF-8");
            rabbitTemplate.setMandatory(true);//设置手工ack确认
            rabbitTemplate.setConfirmCallback(this);//设置发送成功回调
            rabbitTemplate.setReturnCallback(this);//设置消息回退回调
            try{
                rabbitTemplate.convertAndSend(queueName, message, correlationData);//使用amqp default exchange direct
            }catch (Exception e){
                logger.error("MQ连接失败,请联系管理员处理!!!!");
                //保存到db重发
                saveToDB(msgId, json, queueName, "90");
            }
            return true;
        }

    重写confirm方法,对CorrelationData进行处理:


        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String msgId = correlationData.getId();
            if(ack){
                //发送成功
                logger.debug("ack,消息投递到exchange成功,msgId:{}",msgId);
            }else{
                //发送失败,重试
                logger.error("ack,消息投递exchange失败,msgId:{},原因{}" ,msgId, cause);
                if(correlationData instanceof CorrelationDataExt){
                    CorrelationDataExt correlationDataExt = (CorrelationDataExt) correlationData;
                    String message = (String) correlationDataExt.getData();
                    String queueName = ((CorrelationDataExt) correlationData).getQueueName();
                    saveToDB(msgId, message, queueName, "91");
                }else{
                    logger.info("correlationData对象不包含数据");
                }
            }
        }

     

    (3)消息发送到exchange成功,但是路由到queue失败
    通过实现ReturnCallback接口,对回退消息进行重发处理。

        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            logger.error("消息发送失败-消息回退,应答码:{},原因:{},交换机:{},路由键:{}", replyCode, replyText, exchange, routingKey);
            String msgId = message.getMessageProperties().getCorrelationId();
            String data = new String(message.getBody());
            saveToDB(msgId, data, routingKey, "92");
        }



    关于对失败消息的处理,我这里是统一保存到DB中,后续通过定时任务进行重发处理的。

    通过以上3个方面对失败消息的处理,可以确保消息能够成功发送到mq,确保不丢失。

  • 相关阅读:
    新一代MQ apache pulsar的架构与核心概念
    Flutter使用fluwx实现微信分享
    BZOJ3622 已经没有什么好害怕的了 动态规划 容斥原理 组合数学
    NOIP2016提高组Day1T2 天天爱跑步 树链剖分 LCA 倍增 差分
    Codeforces 555C Case of Chocolate 其他
    NOIP2017提高组Day2T3 列队 洛谷P3960 线段树
    NOIP2017提高组Day2T2 宝藏 洛谷P3959 状压dp
    NOIP2017提高组Day1T3 逛公园 洛谷P3953 Tarjan 强连通缩点 SPFA 动态规划 最短路 拓扑序
    Codeforces 873F Forbidden Indices 字符串 SAM/(SA+单调栈)
    Codeforces 873E Awards For Contestants ST表
  • 原文地址:https://www.cnblogs.com/tiancai/p/13322220.html
Copyright © 2011-2022 走看看