zoukankan      html  css  js  c++  java
  • rocketMQ 消息回溯

    rocketMQ 重置消息 offset,有两种情形:一种是有消费者在线,另一种则是无消费者在线。

    命令行

    mqadmin.cmd resetOffsetByTime -n localhost:9876 -t topic-zhang -g group-zhang -s yyyy-MM-dd#HH:mm:ss:SSS

    首先描述存在在线消费者的情况:

    1. 命令行程序和 nameserver 及 broker 交互

    // org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
        boolean isC)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        // 从 nameserver 获知 topic 分布在哪些 broker 上
        TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
        Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
        if (brokerDatas != null) {
            for (BrokerData brokerData : brokerDatas) {
                // 默认选择主节点,主节点下线,则随机选一个从
                String addr = brokerData.selectBrokerAddr();
                if (addr != null) {
                    Map<MessageQueue, Long> offsetTable =
                        this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                            timeoutMillis, isC);
                    if (offsetTable != null) {
                        allOffsetTable.putAll(offsetTable);
                    }
                }
            }
        }
        return allOffsetTable;
    }

    2. broker 查询 topic 分区,指定时间的消息 offset,并通知连接该 broker 的 consumer

    org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
    // 根据时间查找消息,是二分查找,比对消息的存储时间
    org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime

    3. consumer 根据 broker 返回的 offsetTable 重置位点

    // org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
                consumer = (DefaultMQPushConsumerImpl) impl;
            } else {
                log.info("[reset-offset] consumer dose not exist. group={}", group);
                return;
            }
            consumer.suspend();
    
            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                MessageQueue mq = entry.getKey();
                if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                    ProcessQueue pq = entry.getValue();
                    pq.setDropped(true);
                    pq.clear();
                }
            }
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
            }
    
            Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
            while (iterator.hasNext()) {
                MessageQueue mq = iterator.next();
                Long offset = offsetTable.get(mq);
                if (topic.equals(mq.getTopic()) && offset != null) {
                    try {
                        consumer.updateConsumeOffset(mq, offset);
                        consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                        iterator.remove();
                    } catch (Exception e) {
                        log.warn("reset offset failed. group={}, {}", group, mq, e);
                    }
                }
            }
        } finally {
            if (consumer != null) {
                consumer.resume();
            }
        }
    }

    相关的两个请求码
    INVOKE_BROKER_TO_RESET_OFFSET
    RESET_CONSUMER_CLIENT_OFFSET

    当没有消费者在线时

    broker 直接修改 offset 并保存在 json 文件中

    相关的请求码

    UPDATE_CONSUMER_OFFSET 

    一个 topic 2 个分区,p0 和 p1,p0分布在 broker0 上,p1 分布在 broker1 上

    消费组 group-zhang 有 2 个消费者 c0 和 c1 订阅了该 topic,则 c0 分配到 p0,c1 分配到 p1

    c0 和 broker0 及 broker1 都有连接,虽然 c0 只从 broker0 拉取数据,只提交 offset 到 broker0

  • 相关阅读:
    【UWP】仅在TextBlock文本溢出时显示Tooltip
    Vue CSS引用背景图片问题
    使用C#与CefSharp相互调用所踩过的坑
    使用JS在网页上查看显示PDF文件
    VS 代码提示默认不选中 解决办法
    Windows Server 2012 R2更新(KB2919355)
    在ASP.NET Core 3.1中使用Swagger
    SQL SERVER 数据库授权指定用户
    第一篇博客
    观察者模式
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12192092.html
Copyright © 2011-2022 走看看