首先解释下consumeQueue,由于commit-log是根据消息先后存储的,而我们消费的时候是根据topic来筛选的,所以需要一个队列根据topic来划分,所以consumeQueue就是干这个事情的。而indexfile顾名思义就是索引文件,用来做单纯查询的。
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
consumeQueue就是一个topic下面一个queueID的一个具体信息,用ConsumeQueue描述,其中ConsumeQueue跟commit-log一样,也就是用mappedFileQueue描述的,说明存储数据比较多。
indexFile是直接用mappedFile描述的。
这两类对象都是需要持久化的,他们都是通过public void doDispatch(DispatchRequest req)进行异步处理的,之所以异步是因为都是可以通过commit-log得到,所以不急着像cmmit-log一样急着落盘。dispatch方法只有两个地方用到:有新消息产生和故障恢复。故障恢复这里不提,后面提,这里只说新消息产生。
ReputMessageService线程:
每当有信息提交到mappedfile以后,那么这个线程通过判断:this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
就知道有新消息产生,那么一直遍历这个mappedfile的buff,不停的取消息,然后一个消息就对应一个DispatchRequest,然后交给dispatch处理:
private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size;
有三种dispatch:
public void doDispatch(DispatchRequest req) { for (CommitLogDispatcher dispatcher : this.dispatcherList) { dispatcher.dispatch(req); } }
分别是consumeQueue、indexFile、bitmap,第三个默认用不到。
在这里可以构造一个新的consumeQueue:
public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
然后用这个新的或者老的consumeQueue来处理这条消息:
在private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) 方法中用于处理这个request
this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
这里的offset、size、tagscode、expectLogicOffset分别对应原始消息的全局物理offset、消息大小、tags、消费offset,这个消费offset就是commit-log的putMessage的:
keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); }
CQ_STORE_UNIT_SIZE就是20,也就是consumeQueue里面只存对应的topic-queueid对应的消息,并且只存物理offset、tag、总体大小,这三个指标正好占了20比特。存贮位置可以直接通过消费offset乘以20得到。由于消息很多,所以用mappedFIledQueue来存储,具体用哪个Queue也可以通过消费offset拿到指定的mappedFile。
这里的topicQueueTable其实跟consumeQueue是可以转换得到的,后面说故障恢复的时候再说。
接下来看看处理indexfile的dispatch。
为了写入indexfile,我们首先需要拿到一个indexfile,在getAndCreateLastIndexFile方法中,在这里看到只有前一个indexfile文件完全写完以后,我们才能生成新的,并且用新线程进行刷盘操作。
看下写入的操作:
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; }
每个index文件只能存入指定数量的内容,如果满了返回false,创建新的indexfile。
indexfile文件分布是:首先是文件头、然后是slot、最后是index内容。每个index内容大小是20字节,总共4个int、一个long。
这里的key就是topic+uniqkey,然后做哈希,再对槽数取模得到slot的位置absslotpos,每个slot占用4字节,存储的就是当前this.indexHeader.getIndexCount()的序号(从1开始涨),这个数字this.indexHeader.getIndexCount()不能超过hashSlotNum,这个保证了indexFile是满的,不会超过也不会过小。
index的绝对位置通过:int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;然后在这个位置插入:
this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
这里的slotValue其实就是上一个冲突slot位置对应的index-count,通过这个链表办法解决哈希冲突。在看下index存储的其他内容:哈希值、物理绝对位置、时间。
在看看如何使用index文件的:
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
通过int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);就可以拿到槽冲突对应的index-count,继续找下去。
为了防止冲突找到错误的信息,还加上了过滤条件:
if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}
这里面还有一个checkpoint机制,对于indexfile,在load方法里面:
if (!lastExitOK) { if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint() .getIndexMsgTimestamp()) { f.destroy(0); continue; } }
endtimestamp是最后一次写入数据时间,右边是刷盘时间。如果右边小,说明不是一个完整文件。一个没有及时刷盘的文件肯定是一个空文件,因为index只有两个可能:空文件或者满文件,所以直接舍弃这个文件。