zoukankan      html  css  js  c++  java
  • consumeQueue 和 indexFile 文件

    broker 把消息写入 commitLog 后,还需要把消息的索引写入 consumeQueue 文件 和 indexFile 文件

    // org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
    @Override
    public void run() {
        DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                Thread.sleep(1);
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        DefaultMessageStore.log.info(this.getServiceName() + " service end");
    }
    
    private void doReput() {
        for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
            if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                break;
            }
    
            // 读取 commitLog 文件
            SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
            if (result != null) {
                try {
                    this.reputFromOffset = result.getStartOffset();
    
                    for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                        // 解析消息,生成 DispatchRequest 对象
                        DispatchRequest dispatchRequest =
                            DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                        int size = dispatchRequest.getMsgSize();
    
                        if (dispatchRequest.isSuccess()) {
                            if (size > 0) {
                                // 把 DispatchRequest 分发出去
                                DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                }
    
                                this.reputFromOffset += size;
                                readSize += size;
                                if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                        .addAndGet(dispatchRequest.getMsgSize());
                                }
                            } else if (size == 0) {
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                readSize = result.getSize();
                            }
                        } else if (!dispatchRequest.isSuccess()) {
    
                            if (size > 0) {
                                log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                this.reputFromOffset += size;
                            } else {
                                doNext = false;
                                if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                    log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                        this.reputFromOffset);
    
                                    this.reputFromOffset += result.getSize() - readSize;
                                }
                            }
                        }
                    }
                } finally {
                    result.release();
                }
            } else {
                doNext = false;
            }
        }
    }

    consumeQueue 文件,一个 entry 20 字节,8 + 4 + 8,8 字节 commitLog offset,4 字节消息 size,8 字节 tag hashcode,当消息为延时消息时,tag 字段存放超时时间戳

    // org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
    // Timing message processing
    {
        String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
            int delayLevel = Integer.parseInt(t);
    
            if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
            }
    
            if (delayLevel > 0) {
                tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                    storeTimestamp);
            }
        }
    }

    处理 DispatchRequest 的逻辑

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    
        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }
    
    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    
        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

    broker 为消息的 UNIQ_KEY 和 key 字段建立索引,索引保存在 indexFile 文件中

    public void buildIndex(DispatchRequest req) {
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }
    
            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
    
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }
  • 相关阅读:
    MySQL错误 1030-Got error 28 from storage engine
    电脑开机无反应 不显示BIOS 硬件没问题
    python错误 import: unable to open X server
    Python 错误 invalid command 'bdist_wheel' & outside environment /usr
    Cento 7安装 Failed to execute /init
    笔记《鸟哥的Linux私房菜》5 首次登入与在线求助 man
    Scrapy XPath语法
    Linux 用户操作
    Mysql 表修改
    Ubuntu 配置 Python环境 IPython
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11993971.html
Copyright © 2011-2022 走看看