zoukankan      html  css  js  c++  java
  • RabbitMq(6) 如何保证消息不丢包

    RabbitMQ一般情况很少丢失,但是不能排除意外,为了保证我们自己系统高可用,我们必须作出更好完善措施,保证系统的稳定性。

    下面来介绍下,如何保证消息的绝对不丢失的问题,下面分享的绝对干货,都是在知名互联网产品的产线中使用。

    1.消息持久化

    2.ACK确认机制

    3.设置集群镜像模式

    4.消息补偿机制

    一、消息持久化

    RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

    所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

    要想做到消息持久化,必须满足以下三个条件,缺一不可。

    1)Exchange 设置持久化

    2)Queue 设置持久化

    3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

     Exchange 设置持久化

        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topicExchange");
        }

    默认的Topic交换机,持久化属性为true。

    改为:

    return new TopicExchange("topicExchange",false,false);  

     执行下:

    报错:

    inequivalent arg 'durable' for exchange 'topicExchange' in vhost '/': received 'false' but current is 'true'

    看下当前topic交换机属性:

    好像没找到线上切换交换机持久化属性的入口,有知道的大神可以告知下,我这里是删除掉原来的而交换机,再次执行:

    此时重启RabbitMq服务,未持久化的交换机消失了。。。。

     

    Queue 设置持久化

        @Bean
        public Queue queueArm() {
            return new Queue(TopicRabbitConfig.ARM_QUEUE);
        }

    源码:

    如果改为非持久化,也是先删除,再执行。非持久化后,重启服务,队列就没了。

     Message持久化发送

     RabbitTemplate.class

    MessageProperties.class

    默认是持久化。

    二、ACK确认机制

     多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

    这个使用就要使用Message acknowledgment 机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除。

    这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

    demo:

    package com.example.demo.rabbitMq.ack;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitTemplateConfig {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        /**
         * 定制化amqp模板
         * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
         * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
         *
         * @return
         */
        @Bean("rabbitTemplateAck")
        public AmqpTemplate getabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplateAck = new RabbitTemplate(connectionFactory);
            rabbitTemplateAck.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplateAck.setMandatory(true);
    
            //消息返回, 需要配置spring.rabbitmq.publisher-returns=true
            rabbitTemplateAck.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
                                            String routingKey) {
                    String correlationId = message.getMessageProperties().getCorrelationId();
                    logger.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText,
                            exchange, routingKey);
                }
            });
    
    
            // 消息确认, 需要配置 spring.rabbitmq.publisher-confirms=true
            rabbitTemplateAck.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.info("消息发送到exchange成功");
                } else {
                    logger.info("消息发送到exchange失败,原因: {}", cause);
                }
            });
    
            return rabbitTemplateAck;
        }
    
    }
       @Test
        public void send4() throws Exception {
            topicSender.send4();
        }

    测试:确认发送到交换机了:

     

    修改发送路由,如下:

     rabbitTemplateAck.convertAndSend("topicExchange","1arm.gun",list);

    再次执行,结果如下:

    再看demo:

    生产者新增交换机和队列

    public static final String BYTE_QUEUE = "byte.queue";
        @Bean
        public Queue queueByte() {
            return new Queue(TopicRabbitConfig.BYTE_QUEUE);
        }
        @Bean
        Binding bindingExchangeByte(Queue queueByte, TopicExchange exchange) {
            return BindingBuilder.bind(queueByte).to(exchange).with("byte.#");
        }
    TopicSender.java
    public void send5() throws IOException {
            User user = new User();
            user.setUserName("Sender1.....");
            user.setMobile("555555555");
            byte[] body = Base64Utils.obj2byte(user);
            Message message = new Message(body,new MessageProperties());
            rabbitTemplateAck.convertAndSend("topicExchange","byte.message",message);
        }

    消费者工程:

    package com.example.demo.rabbitMq.ack;
    
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitTemplateConfig {
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    }
    package com.example.demo.rabbitMq.exchange.topic;
    
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class TopicReceiver5 {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
       @RabbitListener(queues = TopicRabbitConstant.BYTE_QUEUE, containerFactory="rabbitListenerContainerFactory")
        public void process(Message message, Channel channel) throws IOException {
            try{
                System.out.println("Receiver5  : " + message);
                //int i = 6/0;
    
            }catch (Exception e){
                if (!message.getMessageProperties().getRedelivered()) {
                    System.out.println("消息即将再次返回队列处理...");
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                } else {
                    //requeue为是否重新回到队列
                    System.out.println("消息已重复处理失败,拒绝再次接收...");
                    //BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag
                    //第二个参数是是否放回queue中,requeue,如果只有一个消费者的话,true将导致无限循坏
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        }
    }

    执行:

    如果人造异常,将会走黄色部分,最后因为重复请求而拒绝再次放入队列。

    三、设置集群镜像模式

    我们先来介绍下RabbitMQ三种部署模式:

    1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
    2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
    3)镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

    为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

     

    如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

    下面介绍下三种HA策略模式:

    1)同步至所有的
    2)同步最多N个机器
    3)只同步至符合指定名称的nodes

    命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

    1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
    rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
    rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

    2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式
    rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat"
    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

    3)为每个以“node.”开头的队列分配指定的节点做镜像
    rabbitmqctl set_policy ha-nodes "^nodes."
    '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

    但是:HA 镜像队列有一个很大的缺点就是:   系统的吞吐量会有所下降

    四、消息补偿机制

    为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

    但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

    比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

    产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

    然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

    参考:

    https://www.cnblogs.com/flyrock/p/8859203.html

    https://www.cnblogs.com/milicool/p/9662447.html

  • 相关阅读:
    EEPROM芯片AT2402驱动
    FPGA 状态机(FSM)的三段式推荐写法
    1602液晶驱动
    Bresenham快速画直线算法
    I2C总线驱动程序
    从数据库中取时间类型显示
    C# 页面关联类似模式窗口
    C# 页面javascript 页面跳转刷新
    网页有趣的时间显示控件
    DataSet
  • 原文地址:https://www.cnblogs.com/xiaozhuanfeng/p/10719874.html
Copyright © 2011-2022 走看看