zoukankan      html  css  js  c++  java
  • 消息队列(五)--- RocketMQ-消息存储4

    问题

    • index 文件有什么作用,结构又是如何

    概述

    index 文件主要是为了 message key 服务的,rocketmq 发送消息的时候可以带上 key , messge key 是为了标识某个消息的一个标志。

    思考

    我们思考一下,message key 是由用户生成的,我们需要尽可能地保证散列保存,这样当我们就可以快速地拿出来了。那么通常的作法就是利用哈希散列,当然最重要的是如何解决冲突。我们下面看一下rocketmq 是如何实现的。

    总体思路

    下面两张图片来自参考文章。侵删(作者的文章写得真的好)

    1297993-20191117120603837-1606171207.png

    我们从这里可以看到index文件分为三部分,头,散列值,索引文件。其中散列值会一一对应索引文件中的一个值,该值就是储存该message信息的。

    1297993-20191117120627309-1906493649.png

    可以看到假如有冲突(即找到散列值那个位置的时候已经有一个对应的索引位了),那么索引位就存放在新的索引位的“上一个索引位”的属性里,这里就形成一条单链表。

    源码分析

    index文件和其他consumerQueue文件的思路是一样的,同样是利用持久化在文件中,然后通过mappedFile 文件加载到内存中,开启一个服务,当发消息对消息进行持久化的时候,将消息的key持久化在index文件。

    写入

    DefaultMessageStore$$CommitLogDispatcherBuildIndex内部类

        class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    
            @Override
            public void dispatch(DispatchRequest request) {
                if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                    DefaultMessageStore.this.indexService.buildIndex(request);
                }
            }
        }
    
    

    该内部类就是当接受到消息对index进行记录的分发器,可以看到最主要的还是利用了indexService,我们来看一下indexService到底执行了什么操作。

    1297993-20191117103842173-880387985.png

    从方法名我们对index文件的加载,会刷,获取,写入等。我们看一下 buildIndex 方法

    IndexFile&putKey方法

        public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
            if (this.indexHeader.getIndexCount() < this.indexNum) {
                //获取hashCode 后取模
                int keyHash = indexKeyHashMethod(key);
                int slotPos = keyHash % this.hashSlotNum;
                //计算绝对位置
                int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
                FileLock fileLock = null;
    
                try {
    
                    // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                    // false);
                    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                        slotValue = invalidIndex;
                    }
    
                    long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                    timeDiff = timeDiff / 1000;
    
                    if (this.indexHeader.getBeginTimestamp() <= 0) {
                        timeDiff = 0;
                    } else if (timeDiff > Integer.MAX_VALUE) {
                        timeDiff = Integer.MAX_VALUE;
                    } else if (timeDiff < 0) {
                        timeDiff = 0;
                    }
                    //为什么要把 hashSlot 那部分的总长也加进来呢?
                    //因为这个pos是相对于 mapfile的位移量进行获取的
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
                    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                    //先保存上一个槽位的值
                    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                    //然后覆盖掉旧的
                    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                    if (this.indexHeader.getIndexCount() <= 1) {
                        this.indexHeader.setBeginPhyOffset(phyOffset);
                        this.indexHeader.setBeginTimestamp(storeTimestamp);
                    }
                    //更新index属性 
                    this.indexHeader.incHashSlotCount();
                    this.indexHeader.incIndexCount();
                    this.indexHeader.setEndPhyOffset(phyOffset);
                    this.indexHeader.setEndTimestamp(storeTimestamp);
    
                    return true;
                } catch (Exception e) {
                    log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
                } finally {
                    if (fileLock != null) {
                        try {
                            fileLock.release();
                        } catch (IOException e) {
                            log.error("Failed to release the lock", e);
                        }
                    }
                }
            } else {
                log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                    + "; index max num = " + this.indexNum);
            }
    
            return false;
        }
    
    
        //直接取 hashCode,并返回正数 
        public int indexKeyHashMethod(final String key) {
            int keyHash = key.hashCode();
            int keyHashPositive = Math.abs(keyHash);
            if (keyHashPositive < 0)
                keyHashPositive = 0;
            return keyHashPositive;
        }
    
    

    读取的方法

    读取方法的调用链如下 : DefaultMessageStore&queryMessage -> indexService&queryOffset -> IndexFile&selectPhyOffset

    DefaultMessageStore&queryMessage 方法

      @Override
        public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
            QueryMessageResult queryMessageResult = new QueryMessageResult();
    
            long lastQueryMsgTime = end;
    
            //重试
            for (int i = 0; i < 3; i++) {
                //查找到返回结果
                QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
                if (queryOffsetResult.getPhyOffsets().isEmpty()) {
                    break;
                }
                //封装返回结果
                Collections.sort(queryOffsetResult.getPhyOffsets());
    
                queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
                queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
    
                for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
                    long offset = queryOffsetResult.getPhyOffsets().get(m);
    
                    try {
    
                        boolean match = true;
                        MessageExt msg = this.lookMessageByOffset(offset);
                        if (0 == m) {
                            lastQueryMsgTime = msg.getStoreTimestamp();
                        }
    
    //                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
    //                    if (topic.equals(msg.getTopic())) {
    //                        for (String k : keyArray) {
    //                            if (k.equals(key)) {
    //                                match = true;
    //                                break;
    //                            }
    //                        }
    //                    }
    
                        if (match) {
                            SelectMappedBufferResult result = this.commitLog.getData(offset, false);
                            if (result != null) {
                                int size = result.getByteBuffer().getInt(0);
                                result.getByteBuffer().limit(size);
                                result.setSize(size);
                                queryMessageResult.addMessage(result);
                            }
                        } else {
                            log.warn("queryMessage hash duplicate, {} {}", topic, key);
                        }
                    } catch (Exception e) {
                        log.error("queryMessage exception", e);
                    }
                }
    
                if (queryMessageResult.getBufferTotalSize() > 0) {
                    break;
                }
    
                if (lastQueryMsgTime < begin) {
                    break;
                }
            }
    
            return queryMessageResult;
        }
    

    indexService&queryOffset 逻辑很好懂,定位文件,解决冲突的链表进行查找

        public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
            List<Long> phyOffsets = new ArrayList<Long>(maxNum);
    
            long indexLastUpdateTimestamp = 0;
            long indexLastUpdatePhyoffset = 0;
            maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
            try {
                this.readWriteLock.readLock().lock();
                if (!this.indexFileList.isEmpty()) {
                    for (int i = this.indexFileList.size(); i > 0; i--) {
                        IndexFile f = this.indexFileList.get(i - 1);
                        boolean lastFile = i == this.indexFileList.size();
                        if (lastFile) {
                            indexLastUpdateTimestamp = f.getEndTimestamp();
                            indexLastUpdatePhyoffset = f.getEndPhyOffset();
                        }
    
                        if (f.isTimeMatched(begin, end)) {
    
                            f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                        }
    
                        if (f.getBeginTimestamp() < begin) {
                            break;
                        }
    
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("queryMsg exception", e);
            } finally {
                this.readWriteLock.readLock().unlock();
            }
    
            return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
        }
    
    

    总结

    参考资料

    • https://www.kunzhao.org/blog/2018/04/08/rocketmq-message-index-flow/
  • 相关阅读:
    PHP 开发 APP 接口 学习笔记与总结
    Java实现 LeetCode 43 字符串相乘
    Java实现 LeetCode 43 字符串相乘
    Java实现 LeetCode 43 字符串相乘
    Java实现 LeetCode 42 接雨水
    Java实现 LeetCode 42 接雨水
    Java实现 LeetCode 42 接雨水
    Java实现 LeetCode 41 缺失的第一个正数
    Java实现 LeetCode 41 缺失的第一个正数
    Java实现 LeetCode 41 缺失的第一个正数
  • 原文地址:https://www.cnblogs.com/Benjious/p/11893279.html
Copyright © 2011-2022 走看看