zoukankan      html  css  js  c++  java
  • rocketMQ为什么会重复消费

    messageModel有两种方式:BROADCASTING 和 CLUSTERING,

    消费者收到消息也有两种消费方式:orderly和concurrently,

    1、BROADCASTING模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费。

    2、在CLUSTERING模式下,如果一个topic被多个consumerGroup消费,也会重复消费。

    3、即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset,此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。

    4、默认情况下orderly和concurrently模式都是一条一条的消费,但是如果在消费一条消息的时候要进行比如AB两个操作,在没有事务控制的情况下,如果A操作成功而B操作失败,就会重新消费导致A操作会再执行一次,这样虽然不是重复的消费整条消息,但是一部分的操作会重复。如果放宽条件到一次消费多条的情况,orderly模式是一批中有一条消费失败,一批统一重新消费,直到达到最大消费次数的限制,发送到死信队列。而concurrently情况下,在返回成功(CONSUME_SUCCESS)的前提下,有个ackIndex可以分隔成功和失败的消息,失败的、没有消费的消息发送到retry队列,不会造成重复消费,而如果返回的是(RECONSUME_LATER),仍然是和orderly一样的同批次全部重新消费。(其实orderly也可以这样做,但是不知道为什么没有,而是全部重新放入队列消费)。

    5、消费者pullRequest发出去,如果长时间收不到请求,是会被取出来重新再放入队列再请求一次的,所以也是会重复拉取消息的。

    注:orderly和concurrently消费失败的处理(假设在集群模式下):

    ConsumeMessageOrderlyService的内部类ConsumeRequest的run方法中,messageListener.consumeMessage的返回值status的值有两种:SUSPEND_CURRENT_QUEUE_A_MOMENT和 SUCCESS,如果抛出异常返回null,随后仍会被赋值SUSPEND_CURRENT_QUEUE_A_MOMENT,接下来的processConsumeResult方法中,在isAutoCommit为true的判断中,进入SUSPEND_CURRENT_QUEUE_A_MOMENT的判断,假设把msgs的size简化为1(也就是说一次只从processQueue的msgTreeMap中取出offset最小的那条),默认的maxReconsumeTimes是Integer.maxValue,一般是需要自己设置一次的,到达最大失败次数,还是没成功,进入sendMessageBack方法,在调用send方法发送消息,看起来是把消息发送到topic为retry开头的重试队列,但是,由于broker这里SendMessageProcessor.sendMessage方法中,有handleRetryAndDLQ方法判断当前消息的重试次数有没有达到最大重试次数,如果到达,替换topic为DLQ死信队列,将消息存储到死信队列中。刚才说的是每次从processQueue获取一条消息的情况,如果一次获取多条,那么如果status不是成功,那么会针对每一条消息判断是不是到达最大重试次数,如果到达,送到死信队列,如果没有,重试次数加一,再次放到processQueue中。而这次成功消费的offset要保存,因为之前的前提是集群模式,那么要提交给messageId对应的broker,具体代码逻辑是:RemoteBrokerOffsetStore的updateOffset是存到自己的offsetTable属性中,而MQClientInstance.start---startScheduledTask---persistAllConsumerOffset---RemoteBrokerOffsetStore.persistAll---updateConsumeOffsetToBroker,把offset提交。

    而ConsumeMessageConcurrentlyService的submitConsumeRequest方法,把这次取到的消息分批(如果需要的话),组装成内部类ConsumeRequest,所以concurrently也存在一次消费多条消息的情况,这时候会在context参数中保存一个ackIndex,如果成功了,ackIndex不变,如果失败了,ackIndex为-1,如果ackIndex小于这次分批msgs的size,那么之后的消息要发送回重试队列,这里和orderly不同,orderly是直接send方法,而这里的requestCode都不一样了。这里也会判断最大消费次数的问题,发送到重试队列或者死信队列。注意和orderly不同的是,orderly是本地重试,不会发送到重试队列,只会最后发送到死信队列。而concurrently会先发送到重试队列。至于本地消息的删除以及commitoffset的处理,因为并发消费在目前的业务中不太能用得到,等以后用到了再看。

  • 相关阅读:
    硬件IC汇总
    stm8s103调试注意点
    感悟短句
    USB接口
    液晶屏驱动注意
    四数之和
    所有奇数长度子数组的和
    秋叶收藏集
    删除中间节点
    组合总和
  • 原文地址:https://www.cnblogs.com/chuliang/p/12803242.html
Copyright © 2011-2022 走看看