zoukankan      html  css  js  c++  java
  • rocketMQ为什么会重复消费

    messageModel有两种方式:BROADCASTING 和 CLUSTERING,

    消费者收到消息也有两种消费方式:orderly和concurrently,

    1、BROADCASTING模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费。

    2、在CLUSTERING模式下,如果一个topic被多个consumerGroup消费,也会重复消费。

    3、即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset,此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。

    4、默认情况下orderly和concurrently模式都是一条一条的消费,但是如果在消费一条消息的时候要进行比如AB两个操作,在没有事务控制的情况下,如果A操作成功而B操作失败,就会重新消费导致A操作会再执行一次,这样虽然不是重复的消费整条消息,但是一部分的操作会重复。如果放宽条件到一次消费多条的情况,orderly模式是一批中有一条消费失败,一批统一重新消费,直到达到最大消费次数的限制,发送到死信队列。而concurrently情况下,在返回成功(CONSUME_SUCCESS)的前提下,有个ackIndex可以分隔成功和失败的消息,失败的、没有消费的消息发送到retry队列,不会造成重复消费,而如果返回的是(RECONSUME_LATER),仍然是和orderly一样的同批次全部重新消费。(其实orderly也可以这样做,但是不知道为什么没有,而是全部重新放入队列消费)。

    5、消费者pullRequest发出去,如果长时间收不到请求,是会被取出来重新再放入队列再请求一次的,所以也是会重复拉取消息的。

    注:orderly和concurrently消费失败的处理(假设在集群模式下):

    ConsumeMessageOrderlyService的内部类ConsumeRequest的run方法中,messageListener.consumeMessage的返回值status的值有两种:SUSPEND_CURRENT_QUEUE_A_MOMENT和 SUCCESS,如果抛出异常返回null,随后仍会被赋值SUSPEND_CURRENT_QUEUE_A_MOMENT,接下来的processConsumeResult方法中,在isAutoCommit为true的判断中,进入SUSPEND_CURRENT_QUEUE_A_MOMENT的判断,假设把msgs的size简化为1(也就是说一次只从processQueue的msgTreeMap中取出offset最小的那条),默认的maxReconsumeTimes是Integer.maxValue,一般是需要自己设置一次的,到达最大失败次数,还是没成功,进入sendMessageBack方法,在调用send方法发送消息,看起来是把消息发送到topic为retry开头的重试队列,但是,由于broker这里SendMessageProcessor.sendMessage方法中,有handleRetryAndDLQ方法判断当前消息的重试次数有没有达到最大重试次数,如果到达,替换topic为DLQ死信队列,将消息存储到死信队列中。刚才说的是每次从processQueue获取一条消息的情况,如果一次获取多条,那么如果status不是成功,那么会针对每一条消息判断是不是到达最大重试次数,如果到达,送到死信队列,如果没有,重试次数加一,再次放到processQueue中。而这次成功消费的offset要保存,因为之前的前提是集群模式,那么要提交给messageId对应的broker,具体代码逻辑是:RemoteBrokerOffsetStore的updateOffset是存到自己的offsetTable属性中,而MQClientInstance.start---startScheduledTask---persistAllConsumerOffset---RemoteBrokerOffsetStore.persistAll---updateConsumeOffsetToBroker,把offset提交。

    而ConsumeMessageConcurrentlyService的submitConsumeRequest方法,把这次取到的消息分批(如果需要的话),组装成内部类ConsumeRequest,所以concurrently也存在一次消费多条消息的情况,这时候会在context参数中保存一个ackIndex,如果成功了,ackIndex不变,如果失败了,ackIndex为-1,如果ackIndex小于这次分批msgs的size,那么之后的消息要发送回重试队列,这里和orderly不同,orderly是直接send方法,而这里的requestCode都不一样了。这里也会判断最大消费次数的问题,发送到重试队列或者死信队列。注意和orderly不同的是,orderly是本地重试,不会发送到重试队列,只会最后发送到死信队列。而concurrently会先发送到重试队列。至于本地消息的删除以及commitoffset的处理,因为并发消费在目前的业务中不太能用得到,等以后用到了再看。

  • 相关阅读:
    设计模式之工厂模式-抽象工厂(02)
    1036 跟奥巴马一起编程 (15 分)
    1034 有理数四则运算 (20 分)
    1033 旧键盘打字 (20 分)
    1031 查验身份证 (15 分)
    大学排名定向爬虫
    1030 完美数列 (25 分)二分
    1029 旧键盘 (20 分)
    1028 人口普查 (20 分)
    1026 程序运行时间 (15 分)四舍五入
  • 原文地址:https://www.cnblogs.com/chuliang/p/12803242.html
Copyright © 2011-2022 走看看