zoukankan      html  css  js  c++  java
  • RocketMQ重试机制

    重试机制

      由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

      RocketMQ为了使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。

    模拟生产端重试

      Consumer端消息消费两种状态:

    package com.alibaba.rocketmq.client.consumer.listener;
    
    public enum ConsumeConcurrentlyStatus {
        CONSUME_SUCCESS,
        RECONSUME_LATER;
    
        private ConsumeConcurrentlyStatus() {
        }
    }

      一个是成功(CONSUME_SUCCESS),一个是失败&重试(RECONSUME_LATER);

      Consumer为了保证消息消费成功,只有使用方明确表示消费成功,返回CONSUME_SUCCESS,RocketMQ才会认为消息消费成功。

      如果消息消费失败,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为消息消费失败了,需要重新投递。

      1.出现异常

    package com.wn.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class MQConsumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
            //设置NameServer地址
            consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            //设置消费者实例名称
            consumer.setInstanceName("consumer");
            //订阅topic
            consumer.subscribe("wn02","TagA");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //获取消息
                    for (MessageExt msg:list){
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
    
                    try {
                        int i=1/0;
                    }catch (Exception e){
                        e.printStackTrace();
                        //需要重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
    //消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }

      2.网络延迟

    package com.wn.consumer;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class MQConsumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
            //设置NameServer地址
            consumer.setNamesrvAddr("192.168.138.187:9876;192.168.138.188:9876");
            //设置消费者实例名称
            consumer.setInstanceName("consumer");
            //订阅topic
            consumer.subscribe("wn03","TagA");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //获取消息
                    for (MessageExt msg:list){
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
    //网络延迟 try { Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); } //消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started..."); } }
  • 相关阅读:
    Linux下的sleep()和sched_yield()(转)
    各种字符串Hash函数(转)
    linux 实时监控网速脚本(转)
    linux安装chrome及chromedriver(转)
    Couldn't open file /etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7 解决办法(转)
    linux 运行时限制CPU核数、内存、读写速度
    C语言函数sscanf()的用法-从字符串中读取与指定格式相符的数据(转)
    golang在线学习与编译网站
    电子书转换网站推荐
    入门级网站经典 w3cschool
  • 原文地址:https://www.cnblogs.com/wnwn/p/12325851.html
Copyright © 2011-2022 走看看