zoukankan      html  css  js  c++  java
  • RocketMQ 重试机制

    消息重试分为两种:Producer发送消息的重试 和 Consumer消息消费的重试。
    一、Producer端重试

    Producer端重试是指: Producer往MQ上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败。

    部分源码解析:

    /**
    * 说明 抽取部分代码
    */
    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {

    //1、获取当前时间
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev ;
    //2、去服务器看下有没有主题消息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
    boolean callTimeout = false;
    //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    //4、根据设置的重试次数,循环再去获取服务器主题消息
    for (times = 0; times < timesTotal; times++) {
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    beginTimestampPrev = System.currentTimeMillis();
    long costTime = beginTimestampPrev - beginTimestampFirst;
    //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
    if (timeout < costTime) {
    callTimeout = true;
    break;

    }
    //6、如果超时直接报错
    if (callTimeout) {
    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
    }
    }
    }

    通过这段源码很明显可以看出以下几点

    如果是异步发送 那么重试次数只有1次
    对于同步而言,超时异常也是不会再去重试。
    如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。

    实践出真知!!!
    二、 Consumer端重试

    消费端比较有意思,而且在实际开发过程中,我们也更应该考虑的是消费端的重试。

    消费者端的失败主要分为2种情况,Exception 和 Timeout。
    1、Exception

    @Slf4j
    @Component
    public class Consumer {
    /**
    * 消费者实体对象
    */
    private DefaultMQPushConsumer consumer;
    /**
    * 消费者组
    */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
    * 通过构造函数 实例化对象
    */
    public Consumer() throws MQClientException {
    consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
    consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
    //订阅topic和 tags( * 代表所有标签)下信息
    consumer.subscribe("topic_family", "*");
    //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    //1、获取消息
    Message msg = msgs.get(0);
    try {
    //2、消费者获取消息
    String body = new String(msg.getBody(), "utf-8");
    //3、获取重试次数
    int count = ((MessageExt) msg).getReconsumeTimes();
    log.info("当前消费重试次数为 = {}", count);
    //4、这里设置重试大于3次 那么通过保存数据库 人工来兜底
    if (count >= 2) {
    log.info("该消息已经重试3次,保存数据库。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    //直接抛出异常
    throw new Exception("=======这里出错了============");
    //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    });
    //启动监听
    consumer.start();
    }
    }

    这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。

    看下运行结果

    img

    注意 消费者和生产者的重试还是有区别的,主要有两点

    1、默认重试次数:Product默认是2次,而Consumer默认是16次。

    2、重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。
    3、Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。
    2、Timeout

    说明 这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 return ConsumeConcurrentlyStatus.RECONSUME_LATER。

    那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。

    做这个测试很简单。

    //1、消费者获得消息
    String body = new String(msg.getBody(), "utf-8");
    //2、获取重试次数
    int count = ((MessageExt) msg).getReconsumeTimes();
    log.info("当前消费重试次数为 = {}", count);
    //3、这里睡眠60秒
    Thread.sleep(60000);
    log.info("休眠60秒 看还能不能走到这里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
    //返回成功
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    当获得 当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息

    consumer消费者 当前消费重试次数为 = 0
    休眠60秒 看还能不能走到这里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3岁

    1
    2

    其他理解

    首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker 才会自动进行重试,对于广播消息是不会重试的。

    集群消费模式下,当消息消费失败,RocketMQ 会通过消息重试机制重新投递消息,努力使该消息消费成功。

    当消费者消费该重试消息后,需要返回结果给 broker,告知 broker 消费成功(ConsumeConcurrentlyStatus.CONSUME*SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME*LATER)

    这里有个问题,如果消费者业务本身故障导致某条消息一直无法消费成功,难道要一直重试下去吗?

    答案是显而易见的,并不会一直重试。

    事实上,对于一直无法消费成功的消息,RocketMQ 会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对该死信消息业务做人工的补偿操作。

    那如何返回消息消费失败呢?

    RocketMQ 规定,以下三种情况统一按照消费失败处理并会发起重试。

    业务消费方返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
    业务消费方返回null
    业务消费方主动/被动抛出异常

    前两种情况较容易理解,当返回 ConsumeConcurrentlyStatus.RECONSUME_LATER或者 null时,broker 会知道消费失败,后续就会发起消息重试,重新投递该消息。

    注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker 也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常的时候要注意返回 ConsumeConcurrentlyStatus.RECONSUME*LATER 或 null 并输出异常日志,打印当前重试次数。(推荐返回ConsumeConcurrentlyStatus.RECONSUME*LATER)
    死信的业务处理方式

    默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。

    我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。

    如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回 broker 状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。

    RocketMQ 的处理方式为将达到最大重试次数(16 次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。实现的逻辑在 SendMessageProcessor的 consumerSendMsgBack方法中,大致思路为首先判断重试次数是否超过 16 或者消息发送延时级别是否小于 0,如果已经超过 16 或者发送延时级别小于 0,则将消息设置为新的死信。死信 topic 为:%DLQ%+consumerGroup

    我们接着看一下死信的源码实现机制。

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumerSendMsgBackRequestHeader requestHeader =
    (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

    ......

    // 0.首先判断重试次数是否大于等于 16,或者消息延迟级别是否小于 0
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    || delayLevel < 0) {
    // 1. 如果满足判断条件,设置死信队列 topic= %DLQ%+consumerGroup
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
    DLQ_NUMS_PER_GROUP,
    PermName.PERM_WRITE, 0
    );
    if (null == topicConfig) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("topic[" + newTopic + "] not exist");
    return response;
    }
    } else {
    // 如果延迟级别为 0,则设置下一次延迟级别为 3+当前重试消费次数,达到时间衰减效果
    if (0 == delayLevel) {
    delayLevel = 3 + msgExt.getReconsumeTimes();
    }

    msgExt.setDelayTimeLevel(delayLevel);
    }

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

    // 3.死信消息投递到死信队列中并落盘
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    ......
    return response;
    }

    我们总结一下死信的处理逻辑:

    首先判断消息当前重试次数是否大于等于 16,或者消息延迟级别是否小于 0
    只要满足上述的任意一个条件,设置新的 topic(死信 topic)为:%DLQ%+consumerGroup
    进行前置属性的添加
    将死信消息投递到上述步骤 2 建立的死信 topic 对应的死信队列中并落盘,使消息持久化。

    PS:

    延迟级别有18级,启动定时任务默认先延迟1S后立马执行,因此延迟队列实际为17个 对应第2级到第18级

    消费重试直接从第3级开始,一共16次 即10S到2H
    ————————————————
    版权声明:本文为CSDN博主「Apple_Web」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/belongtocode/article/details/104310781

  • 相关阅读:
    Django +xadmin+Linux 使用Uwsgi部署xadmin后台系统
    Django+uwsgi+linux 报错: no python application found, check your startup logs for errors ---解决方案
    python程序中执行sql语句报错:columns = [col_desc[0] for col_desc in cursor.description] TypeError: 'NoneType' object is not iterable 解决方案
    配置android studio jdk要用1.8 不然会出现各种报错
    Python请求某一接口,请求头里面已经加了Cookie,但是请求之后参数返回提示:未登录,遇到这种情况解决方案
    Django 执行python3 manage.py makegrations --empty App和python3 manage.py migrate之后,数据库里面没有成功创建表解决方案
    在虚拟机VMware上安装win7时出现operating system not found报错解决方案
    将mp4格式视频转换为m3u8格式报错: Codec 'hevc' (173) is not supported by the bitstream filter 'h264_mp4toannexb'. Supported codecs are: h264 (27) 解决方案
    【.NET】Swagger 允许接口重名
    (C#) HTTP Authentication (Basic, NTLM, Digest, Negotiate/Kerberos)
  • 原文地址:https://www.cnblogs.com/tudachui/p/15132908.html
Copyright © 2011-2022 走看看