rocketMQ 重置消息 offset,有两种情形:一种是有消费者在线,另一种则是无消费者在线。
命令行
mqadmin.cmd resetOffsetByTime -n localhost:9876 -t topic-zhang -g group-zhang -s yyyy-MM-dd#HH:mm:ss:SSS
首先描述存在在线消费者的情况:
1. 命令行程序和 nameserver 及 broker 交互
// org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { // 从 nameserver 获知 topic 分布在哪些 broker 上 TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>(); if (brokerDatas != null) { for (BrokerData brokerData : brokerDatas) { // 默认选择主节点,主节点下线,则随机选一个从 String addr = brokerData.selectBrokerAddr(); if (addr != null) { Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC); if (offsetTable != null) { allOffsetTable.putAll(offsetTable); } } } } return allOffsetTable; }
2. broker 查询 topic 分区,指定时间的消息 offset,并通知连接该 broker 的 consumer
org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset // 根据时间查找消息,是二分查找,比对消息的存储时间 org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime
3. consumer 根据 broker 返回的 offsetTable 重置位点
// org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) { DefaultMQPushConsumerImpl consumer = null; try { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null && impl instanceof DefaultMQPushConsumerImpl) { consumer = (DefaultMQPushConsumerImpl) impl; } else { log.info("[reset-offset] consumer dose not exist. group={}", group); return; } consumer.suspend(); ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { ProcessQueue pq = entry.getValue(); pq.setDropped(true); pq.clear(); } } try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { } Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator(); while (iterator.hasNext()) { MessageQueue mq = iterator.next(); Long offset = offsetTable.get(mq); if (topic.equals(mq.getTopic()) && offset != null) { try { consumer.updateConsumeOffset(mq, offset); consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq)); iterator.remove(); } catch (Exception e) { log.warn("reset offset failed. group={}, {}", group, mq, e); } } } } finally { if (consumer != null) { consumer.resume(); } } }
相关的两个请求码
INVOKE_BROKER_TO_RESET_OFFSET
RESET_CONSUMER_CLIENT_OFFSET
当没有消费者在线时
broker 直接修改 offset 并保存在 json 文件中
相关的请求码
UPDATE_CONSUMER_OFFSET
一个 topic 2 个分区,p0 和 p1,p0分布在 broker0 上,p1 分布在 broker1 上
消费组 group-zhang 有 2 个消费者 c0 和 c1 订阅了该 topic,则 c0 分配到 p0,c1 分配到 p1
c0 和 broker0 及 broker1 都有连接,虽然 c0 只从 broker0 拉取数据,只提交 offset 到 broker0