zoukankan      html  css  js  c++  java
  • rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的

    首先解释下consumeQueue,由于commit-log是根据消息先后存储的,而我们消费的时候是根据topic来筛选的,所以需要一个队列根据topic来划分,所以consumeQueue就是干这个事情的。而indexfile顾名思义就是索引文件,用来做单纯查询的。

    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

    consumeQueue就是一个topic下面一个queueID的一个具体信息,用ConsumeQueue描述,其中ConsumeQueue跟commit-log一样,也就是用mappedFileQueue描述的,说明存储数据比较多。

    indexFile是直接用mappedFile描述的。

    这两类对象都是需要持久化的,他们都是通过public void doDispatch(DispatchRequest req)进行异步处理的,之所以异步是因为都是可以通过commit-log得到,所以不急着像cmmit-log一样急着落盘。dispatch方法只有两个地方用到:有新消息产生和故障恢复。故障恢复这里不提,后面提,这里只说新消息产生。

    ReputMessageService线程:

    每当有信息提交到mappedfile以后,那么这个线程通过判断:this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();

    就知道有新消息产生,那么一直遍历这个mappedfile的buff,不停的取消息,然后一个消息就对应一个DispatchRequest,然后交给dispatch处理:

    private void doReput() {
                for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    
                    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                        break;
                    }
    
                    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                    if (result != null) {
                        try {
                            this.reputFromOffset = result.getStartOffset();
    
                            for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                                DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                                int size = dispatchRequest.getMsgSize();
    
                                if (dispatchRequest.isSuccess()) {
                                    if (size > 0) {
                                        DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                        if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        }
    
                                        this.reputFromOffset += size;
                                        readSize += size;
    

      

    有三种dispatch:

        public void doDispatch(DispatchRequest req) {
            for (CommitLogDispatcher dispatcher : this.dispatcherList) {
                dispatcher.dispatch(req);
            }
        }

    分别是consumeQueue、indexFile、bitmap,第三个默认用不到。

    在这里可以构造一个新的consumeQueue:

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
            ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
            if (null == map) {
                ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
                ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
                if (oldMap != null) {
                    map = oldMap;
                } else {
                    map = newMap;
                }
            }
    
            ConsumeQueue logic = map.get(queueId);
            if (null == logic) {
                ConsumeQueue newLogic = new ConsumeQueue(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                    this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                    this);
                ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
                if (oldLogic != null) {
                    logic = oldLogic;
                } else {
                    logic = newLogic;
                }
            }
    
            return logic;
        }
    

      

    然后用这个新的或者老的consumeQueue来处理这条消息:

    在private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) 方法中用于处理这个request

            this.byteBufferIndex.flip();
            this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
            this.byteBufferIndex.putLong(offset);
            this.byteBufferIndex.putInt(size);
            this.byteBufferIndex.putLong(tagsCode);
    
            final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    

      这里的offset、size、tagscode、expectLogicOffset分别对应原始消息的全局物理offset、消息大小、tags、消费offset,这个消费offset就是commit-log的putMessage的:

      

                keyBuilder.setLength(0);
                keyBuilder.append(msgInner.getTopic());
                keyBuilder.append('-');
                keyBuilder.append(msgInner.getQueueId());
                String key = keyBuilder.toString();
                Long queueOffset = CommitLog.this.topicQueueTable.get(key);
                if (null == queueOffset) {
                    queueOffset = 0L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                }
    

      CQ_STORE_UNIT_SIZE就是20,也就是consumeQueue里面只存对应的topic-queueid对应的消息,并且只存物理offset、tag、总体大小,这三个指标正好占了20比特。存贮位置可以直接通过消费offset乘以20得到。由于消息很多,所以用mappedFIledQueue来存储,具体用哪个Queue也可以通过消费offset拿到指定的mappedFile。

    这里的topicQueueTable其实跟consumeQueue是可以转换得到的,后面说故障恢复的时候再说。

    接下来看看处理indexfile的dispatch。

    为了写入indexfile,我们首先需要拿到一个indexfile,在getAndCreateLastIndexFile方法中,在这里看到只有前一个indexfile文件完全写完以后,我们才能生成新的,并且用新线程进行刷盘操作。

    看下写入的操作:

        public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
            if (this.indexHeader.getIndexCount() < this.indexNum) {
                int keyHash = indexKeyHashMethod(key);
                int slotPos = keyHash % this.hashSlotNum;
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
                FileLock fileLock = null;
    
                try {
    
                    // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                    // false);
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                        slotValue = invalidIndex;
                    }
    
                    long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                    timeDiff = timeDiff / 1000;
    
                    if (this.indexHeader.getBeginTimestamp() <= 0) {
                        timeDiff = 0;
                    } else if (timeDiff > Integer.MAX_VALUE) {
                        timeDiff = Integer.MAX_VALUE;
                    } else if (timeDiff < 0) {
                        timeDiff = 0;
                    }
    
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
    
                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    
                    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                    if (this.indexHeader.getIndexCount() <= 1) {
                        this.indexHeader.setBeginPhyOffset(phyOffset);
                        this.indexHeader.setBeginTimestamp(storeTimestamp);
                    }
    
                    this.indexHeader.incHashSlotCount();
                    this.indexHeader.incIndexCount();
                    this.indexHeader.setEndPhyOffset(phyOffset);
                    this.indexHeader.setEndTimestamp(storeTimestamp);
    
                    return true;
                } catch (Exception e) {
                    log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
                } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {
                            log.error("Failed to release the lock", e);
                        }
                    }
                }
            } else {
                log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                    + "; index max num = " + this.indexNum);
            }
    
            return false;
        }
    

      每个index文件只能存入指定数量的内容,如果满了返回false,创建新的indexfile。

      indexfile文件分布是:首先是文件头、然后是slot、最后是index内容。每个index内容大小是20字节,总共4个int、一个long。

      这里的key就是topic+uniqkey,然后做哈希,再对槽数取模得到slot的位置absslotpos,每个slot占用4字节,存储的就是当前this.indexHeader.getIndexCount()的序号(从1开始涨),这个数字this.indexHeader.getIndexCount()不能超过hashSlotNum,这个保证了indexFile是满的,不会超过也不会过小。

      

      index的绝对位置通过:int absIndexPos =    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;然后在这个位置插入:

                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    

      这里的slotValue其实就是上一个冲突slot位置对应的index-count,通过这个链表办法解决哈希冲突。在看下index存储的其他内容:哈希值、物理绝对位置、时间。

    在看看如何使用index文件的:

        public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
            final long begin, final long end, boolean lock) {
            if (this.mappedFile.hold()) {
                int keyHash = indexKeyHashMethod(key);
                int slotPos = keyHash % this.hashSlotNum;
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
                FileLock fileLock = null;
                try {
                    if (lock) {
                        // fileLock = this.fileChannel.lock(absSlotPos,
                        // hashSlotSize, true);
                    }
    
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    // if (fileLock != null) {
                    // fileLock.release();
                    // fileLock = null;
                    // }
    
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                        || this.indexHeader.getIndexCount() <= 1) {
                    } else {
                        for (int nextIndexToRead = slotValue; ; ) {
                            if (phyOffsets.size() >= maxNum) {
                                break;
                            }
    
                            int absIndexPos =
                                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                    + nextIndexToRead * indexSize;
    
                            int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                            long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
    
                            long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                            int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
    
                            if (timeDiff < 0) {
                                break;
                            }
    
                            timeDiff *= 1000L;
    
                            long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                            boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
    
                            if (keyHash == keyHashRead && timeMatched) {
                                phyOffsets.add(phyOffsetRead);
                            }
    
                            if (prevIndexRead <= invalidIndex
                                || prevIndexRead > this.indexHeader.getIndexCount()
                                || prevIndexRead == nextIndexToRead || timeRead < begin) {
                                break;
                            }
    
                            nextIndexToRead = prevIndexRead;
                        }
                    }
                } catch (Exception e) {
                    log.error("selectPhyOffset exception ", e);
                } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {
                            log.error("Failed to release the lock", e);
                        }
                    }
    
                    this.mappedFile.release();
                }
            }
        }
    

      通过int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);就可以拿到槽冲突对应的index-count,继续找下去。

      为了防止冲突找到错误的信息,还加上了过滤条件:

    if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}

    这里面还有一个checkpoint机制,对于indexfile,在load方法里面:

                if (!lastExitOK) {
                            if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                                .getIndexMsgTimestamp()) {
                                f.destroy(0);
                                continue;
                            }
                        }
    

      

     endtimestamp是最后一次写入数据时间,右边是刷盘时间。如果右边小,说明不是一个完整文件。一个没有及时刷盘的文件肯定是一个空文件,因为index只有两个可能:空文件或者满文件,所以直接舍弃这个文件。

  • 相关阅读:
    【JAVA】java 堆溢出分析
    【数据结构】链表的基本学习 实现
    【其他】博客园自定义样式指南
    【JAVA】一些好用的开源java类库
    【Shell】Shell 常用命令
    【Spring】Spring Boot 要点整理
    【数据库】Mysql 连接相关
    【Linux】Shell 参数解析
    Erlang 中类型转换
    erlang 中 maps 练习
  • 原文地址:https://www.cnblogs.com/notlate/p/12008333.html
Copyright © 2011-2022 走看看