zoukankan      html  css  js  c++  java
  • 如何保证mq不丢消息

    1.消息的发送流程

    一条消息从生产到被消费,将会经历3个阶段
    image

    • 生产阶段,Producer 新建消息,然后通过网络将消息投递给MQ Broker
    • 存储阶段,消息将会存储在Broker端磁盘中
    • 消费阶段,Consumer将会从Broker拉取消息
      以上3个阶段,都有可能会丢失消息,只要找到这3个阶段丢失消息的原因,采取合理的办法进行避免,就可以彻底解决丢失消息问题。

    2.生产阶段

    Prodducer 通过网络发送消息给Broker,当Broker收到之后,将会返回确认响应信息给Producer,所以生产者只有接收到返回的确认响应,就代表消息在生产阶段未丢失

    同步发送伪代码

    DefaultMQProducer mqProducer=new DefaultMQProducer("test");
    // 设置 nameSpace 地址
    mqProducer.setNamesrvAddr("namesrvAddr");
    mqProducer.start();
    Message msg = new Message("test_topic" /* Topic */,
            "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    );
    // 发送消息到一个Broker
    try {
        SendResult sendResult = mqProducer.send(msg);
    } catch (RemotingException e) {
        e.printStackTrace();
    } catch (MQBrokerException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    send 方法是同步操作,只要这个方法不抛出异常,就代表消息已经发送成功
    消息发送成功仅代表消息已经到了Broker端,Broker在不同配置下,可能返回不同的状态

    • SendStatus.SEND_OK
      消息发送成功,消息发送成功,不意味着它是可靠的,要保证不会丢失任何信息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH
    • SendStatus.FLUSH_DISK_TIMEOUT
      消息发送成功,但是服务器刷盘超时.消息进入到了broker的内存里,只有服务器戎机,消息才会丢.消息存储配置参数中可以配置刷盘方式和同步刷盘的时间长度,如果是同步刷盘,FlushDiskType=SYNC_FLUSH(默认是异步刷盘),当Broker服务器未在同步刷盘时间内(默认5秒)完成刷盘,则将返回该状态-刷盘超时
    • SendStatus.FLUSH_SLAVE_TIMEOUT
      消息发送成功,但是服务器同步到Slave超时,消息进入到了broker的内存里,只有broker戎机,消息才会丢。如果Broker角色是同步Master,及SYNC_MASTER(默认是异步Master及ASYNC_MASTER),并且从Broker未在同步刷盘时间内(5秒)内完成与从服务器的同步,就会返回数据同步Slave超时
    • SendStatus.SLAVE_NOT_AVAILABLE
      消息发送成功,Slave不可用,同步master,单没配置slave broker,会返回此状态

    异步发送伪代码

    DefaultMQProducer mqProducer = new DefaultMQProducer("test");
    // 设置 nameSpace 地址
    mqProducer.setNamesrvAddr("127.0.0.1:9876");
    mqProducer.setRetryTimesWhenSendFailed(5);
    mqProducer.start();
    Message msg = new Message("test_topic" /* Topic */,
            "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
    );
    
    try {
        // 异步发送消息到,主线程不会被阻塞,立刻会返回
        mqProducer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息发送成功,
            }
    
            @Override
            public void onException(Throwable e) {
                // 消息发送失败,可以持久化这条数据,后续进行补偿处理
            }
        });
    } catch (RemotingException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    异步发送,要重写回调方法,在回调方法中检查发送结果
    不管同步还是异步,都会碰到网络问题导致发送失败的请求,针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试,设置方式如下

    // 同步发送消息重试次数,默认为 2
    mqProducer.setRetryTimesWhenSendFailed(3);
    // 异步发送消息重试次数,默认为 2
    mqProducer.setRetryTimesWhenSendAsyncFailed(3);
    

    3.存储阶段

    消息到了Broker端,将会优先保存到内存里,然后立刻返回确认响应ack给生产者。随后Broker 定期批量的将一组消息从内存中异步刷到磁盘中
    定期异步刷数据到盘的操作,减少了IO次数,可以有更好的性能,但是如果发生几起掉电,戎机的情况,消息还未及时刷到磁盘,就会出现丢失消息的情况。
    如果要保证Broker端不丢消息,需要将消息的保存机制改为同步刷盘方式,来一个消息,刷一下到磁盘中,再返回响应。
    master配置修改

    flushDiskType = SYNC_FLUSH
    

    当Broker未在同步时间内(默认5秒)完成刷盘,将会返回SendStatus.FLUSH_DISK_TIMEOUT状态给生产者

    高可靠
    broker通常采用集群部署,一主多从架构,为了保证消息不丢,消息还会复制到slave节点
    默认情况下,消息写入master成功,就可以返回确认响应ack给生产者,接着消息异步复制到slave节点

    此时若Master突然戎机不可恢复,那么还未恢复到slave的消息将会丢失
    为了进一步提高消息可靠性,可以采用同步的复制方式,master节点将会同步等待slave节点复制完成,才会返回确认响应。
    异步复制与同步复制的区别如下:
    image

    Broker master节点同步复制配置如下

    brokerRole = SYNC_MASTER
    

    如果slave 节点未在指定时间内同步返回响应,生产者将会受到SendStatus.FLUSH_SLAVE_TIMEOUT返回状态


    如果 要严格保证消息不丢,broker需要如下配置:

    ## master 节点配置
    flushDiskType = SYNC_FLUSH
    brokerRole = SYNC_MASTER
    
    ## slave 节点配置
    brokerRole = slave
    flushDiskType = SYNC_FLUSH
    

    生产者要配合,判断返回状态是否SendStatus.SEND_OK,若是其他状态,需要考虑补偿重试。上述配置会提高消息的可靠性,但是会降低性能,生产实践中需要综合选择。不是完全固化的配置

    4.消费阶段

    消费者从broker拉取消息,然后执行相应的业务逻辑,一旦执行成功,将会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态给Broker
    如果Broker 未收到消息确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。
    此种方式有效避免了消费者消费过程中发生异常,消息在网络传输中丢失的情况

    消费伪代码

    // 实例化消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
    
    // 设置NameServer的地址
    consumer.setNamesrvAddr("namesrvAddr");
    
    // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
    consumer.subscribe("test_topic", "*");
    // 注册回调实现类来处理从broker拉取回来的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 执行业务逻辑
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者实例
    consumer.start();
    

    以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

  • 相关阅读:
    [arm]虚拟机,2440开发板,主机三者互通
    Linux下的lds链接脚本简介(四)
    Linux下的lds链接脚本简介(三)
    Linux下的lds链接脚本简介(二)
    Linux下的lds链接脚本简介(一)
    程序员面试资源大收集
    Source Insight 3.50.0065使用详解
    DNW烧写FL2440 NAND Flash分区
    php isset()与empty()的使用
    JSON.parse()和JSON.stringify()的区别
  • 原文地址:https://www.cnblogs.com/PythonOrg/p/14837074.html
Copyright © 2011-2022 走看看