zoukankan      html  css  js  c++  java
  • RocketMQ-broker存储机制-Index

    RocketMQ-broker存储机制-Index

    数据结构

     

     

     在indexFileHeader中存放着开始和结束的时间戳,开始和结束的物理点位,以及索引的个数。

    索引文件加载

        public boolean load(final boolean lastExitOK) {
            File dir = new File(this.storePath);
            File[] files = dir.listFiles();
            if (files != null) {
                // ascending order
                Arrays.sort(files);
                for (File file : files) {
                    try {
                        // 创建  IndexFile
                        IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                        // 初始化 IndexFile中的 indexHeader 一些时间戳和点位信息
                        f.load();
    
                        if (!lastExitOK) {
                            // 在checkpoint之后写入的IndexFile   执行删除
                            if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                                .getIndexMsgTimestamp()) {
                                f.destroy(0);
                                continue;
                            }
                        }
    
                        log.info("load index file OK, " + f.getFileName());
                        this.indexFileList.add(f);
                    } catch (IOException e) {
                        log.error("load file {} error", file, e);
                        return false;
                    } catch (NumberFormatException e) {
                        log.error("load file {} error", file, e);
                    }
                }
            }
    
            return true;
        }

    dispatch

     那么index文件是如何写入的呢?还是通过reputservice中CommitLogDispatcherBuildIndex的dispatch实现

     class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                    DefaultMessageStore.this.indexService.buildIndex(request);
                }
            }
        }
    public void buildIndex(DispatchRequest req) {
            // 判断前一个indexfile有没有写满 没写满就返回当前的indexfile  如果写满了 创建下一个indexfile 并把上一个indexfile flush
            IndexFile indexFile = retryGetAndCreateIndexFile();
            if (indexFile != null) {
                long endPhyOffset = indexFile.getEndPhyOffset();
                DispatchRequest msg = req;
                String topic = msg.getTopic();
                String keys = msg.getKeys();
                // 已经put完成
                if (msg.getCommitLogOffset() < endPhyOffset) {
                    return;
                }
    
                final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        break;
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        return;
                }
    
                if (req.getUniqKey() != null) {  // topic + "#" + UniqKey;
                    indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                    if (indexFile == null) {
                        log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                        return;
                    }
                }
    
                if (keys != null && keys.length() > 0) {   // topic + "#" + key;
                    String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                    for (int i = 0; i < keyset.length; i++) {
                        String key = keyset[i];
                        if (key.length() > 0) {
                            indexFile = putKey(indexFile, msg, buildKey(topic, key));
                            if (indexFile == null) {
                                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                                return;
                            }
                        }
                    }
                }
            } else {
                log.error("build index error, stop building index");
            }
        }
     private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
            for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
                log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
    
                indexFile = retryGetAndCreateIndexFile();
                if (null == indexFile) {
                    return null;
                }
    
                ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
            }
    
            return indexFile;
        }

    实现putkey的真正逻辑

     public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
            if (this.indexHeader.getIndexCount() < this.indexNum) {
                int keyHash = indexKeyHashMethod(key);
                int slotPos = keyHash % this.hashSlotNum;
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
                FileLock fileLock = null;
    
                try {
    
                    // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                    // false);
                    // absSlotPos点位存放的是当前需要查询的数据处在第几个index上   
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                        slotValue = invalidIndex;
                    }
    
                    long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                    timeDiff = timeDiff / 1000;
    
                    if (this.indexHeader.getBeginTimestamp() <= 0) {
                        timeDiff = 0;
                    } else if (timeDiff > Integer.MAX_VALUE) {
                        timeDiff = Integer.MAX_VALUE;
                    } else if (timeDiff < 0) {
                        timeDiff = 0;
                    }
    
                    // 目前最大索引值处在的点位
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
    
                    // 存放在indexfile最后
                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    
                    // 并返回没写之前 index的总数   实际上最新的数据是放在最前面   相同哈希的数据作为作为下一个数据引用存放
                    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                    if (this.indexHeader.getIndexCount() <= 1) {
                        this.indexHeader.setBeginPhyOffset(phyOffset);
                        this.indexHeader.setBeginTimestamp(storeTimestamp);
                    }
                    
                    // 更新indexHeader
    
                    this.indexHeader.incHashSlotCount();
                    this.indexHeader.incIndexCount();
                    this.indexHeader.setEndPhyOffset(phyOffset);
                    this.indexHeader.setEndTimestamp(storeTimestamp);
    
                    return true;
                } catch (Exception e) {
                    log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
                } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {
                            log.error("Failed to release the lock", e);
                        }
                    }
                }
            } else {
                log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                    + "; index max num = " + this.indexNum);
            }
    
            return false;
        }
  • 相关阅读:
    堆栈(线性表)
    链表 -- 循环链表(线性表)
    链表 -- 单向链表(线性表)
    排序算法--归并算法(强分治)
    sqlhelper
    sqlite与sqlserver区别
    常用sql集锦
    外出实施问题总结
    JS深入理解系列(一):编写高质量代码
    通用分页(Jquery版)
  • 原文地址:https://www.cnblogs.com/gaojy/p/15087872.html
Copyright © 2011-2022 走看看