zoukankan      html  css  js  c++  java
  • RocketMq(三、重试机制)

    rocketmq的重试策略一般分为两种:一种producer发送给MQ的重试,一种MQ发送给consumer的重试。

    一、生产者的重试

    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            //定义生产者名称
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
            //连接rocketMQ的namesrv地址(这里是集群)
            producer.setNamesrvAddr("10.32.16.179:9876");
            //发送失败重试3次
            producer.setRetryTimesWhenSendFailed(3);
            try {
                producer.start();
                //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
                Message message = new Message("TopicQuickStart","Tag1",("生产者重试").getBytes());
                //发送设置超时时间10秒
                SendResult sendResult = producer.send(message,10000);
                System.out.println(sendResult);
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }finally {
                producer.shutdown();
            }
        }
    }

    10秒内没有发送成功,最大重试次数为3次

    二、消费者的重试

    消费者的重试又分为两种情况:1.消费者接收到消息抛出exception异常。2.消费者没有收到消息,MQ发送超时。

    情况一

    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //定义消费者名称,MQ往消费者推送
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //连接rocketMQ的namesrv地址(此次为集群)
            consumer.setNamesrvAddr("10.32.16.179:9876");
            //新订阅组第一次启动,从头消费到尾,后续从上次的消费进度继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //订阅的主题和标签(*代表所有标签)
            consumer.subscribe("TopicQuickStart", "Tag1 || Tag2");
            //消费者监听
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                MessageExt msg = msgs.get(0);
                try {
                    String topic = msg.getTopic();
                    String msgbody = new String(msg.getBody(), "UTF-8");
                    String tag = msg.getTags();
                    System.out.println("topic:" + topic + " msgbody:" + msgbody + " tag:" + tag);
                    //dosomething...业务处理
                } catch (Exception e) {
                    e.printStackTrace();
                    //重试3次扔不成功则不继续重试
                    if(msg.getReconsumeTimes() == 3){
                        //记录日志或进行持久化操作。
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //MQ发送失败重试机制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消息处理成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        }
    }

    接收到消息后,消费端进行处理,若抛出异常,重试三次不成功则对该消息进行记录。

    情况二

    comsumer为集群的情况下,当MQ发送消息给c1时,若c1接收到消息后宕机了,则同一条消息会发送给c2进行处理。前提是c1和c2必须为同一个消费者组,即

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");

    要相同。

    而这里会出现一个问题,就是如果c1之后又进行了重启,则可能同一条消息发送给c2的同时,又被c1所处理,这就出现了同一条消息被多个消费者消费的情况,这种情况就需要我们保持业务的幂等性。

    解决方案在网上搜索有:

    1、消费端处理消息的业务逻辑保持幂等性
    2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

    第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

    我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

    RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

    生产者与消费者重试机制要点

    在测试生产者的重试机制的时候,发现了个问题,发现消费者端不管重试次数设置为多少,重试的时间都差不多。

    网上搜索源代码如下:

     private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
            
            //1、获取当前时间
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev ;
            //2、去服务器看下有没有主题消息
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                //4、根据设置的重试次数,循环再去获取服务器主题消息
                for (times = 0; times < timesTotal; times++) {
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
    
                    }
                    //6、如果超时直接报错
                    if (callTimeout) {
                        throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                    }
            }
        }

    由上可以看出:

    1.异步发送重试只有1次。

    2.只要连接时间超过设置的超时时间,则不会去重试。

    3.producer重试是一个for循环重试的,是立即执行的,而consumer是有时间间隔执行的。

    PS:producer默认重试2次,consumer默认重试16次。

  • 相关阅读:
    Python安装
    solr集群solrCloud的搭建
    redis单机及其集群的搭建
    maven实现tomcat热部署
    maven发布时在不同的环境使用不同的配置文件
    nexus 的使用及maven的配置
    java 自定义注解以及获得注解的值
    Jenkins学习之——(4)Email Extension Plugin插件的配置与使用
    Jenkins学习之——(3)将项目发送到tomcat
    注意Tengine(Nginx) proxy_pass之后的"/"
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12395647.html
Copyright © 2011-2022 走看看