rocketmq
写文件 mmap
MappedByteBuffer 写入消息的 byte
读文件 mmap
读 MappedByteBuffer
发送消息给消费者 mmap + write
一种:堆外 ByteBuffer 转堆内 byte 数组,通过 netty 发送
另一种: netty 直接发送堆外 ByteBuffer
kafka
消息写入文件
kafka.log.LogSegment#append
写日志 (FileChannel)
// org.apache.kafka.common.record.FileRecords#append public int append(MemoryRecords records) throws IOException { // channel 是 FileChannel 类型 int written = records.writeFullyTo(channel); size.getAndAdd(written); return written; } // org.apache.kafka.common.record.MemoryRecords#writeFullyTo public int writeFullyTo(GatheringByteChannel channel) throws IOException { buffer.mark(); int written = 0; while (written < sizeInBytes()) // 这里清楚地看到,把 ByteBuffer 写入 FileChannel written += channel.write(buffer); buffer.reset(); return written; }
写索引(mmap)
// kafka.log.OffsetIndex#append def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") if (_entries == 0 || offset > _lastOffset) { trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt((offset - baseOffset).toInt) mmap.putInt(position) _entries += 1 _lastOffset = offset require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") } } }
写 time index (mmap)
// kafka.log.TimeIndex#maybeAppend def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) { inLock(lock) { if (!skipFullCheck) require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").") // We do not throw exception when the offset equals to the offset of last entry. That means we are trying // to insert the same time index entry as the last entry. // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion // because that could happen in the following two scenarios: // 1. A log segment is closed. // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled. if (_entries != 0 && offset < lastEntry.offset) throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" + s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.") if (_entries != 0 && timestamp < lastEntry.timestamp) throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" + s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.") // We only append to the time index when the timestamp is greater than the last inserted timestamp. // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time // index will be empty. if (timestamp > lastEntry.timestamp) { trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.") mmap.putLong(timestamp) mmap.putInt((offset - baseOffset).toInt) _entries += 1 _lastEntry = TimestampOffset(timestamp, offset) require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".") } } }
读 log 文件 file channel
// org.apache.kafka.common.record.FileRecords#read
发送消息给消费者 send file
// org.apache.kafka.common.record.FileRecords#writeTo @Override public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException { long newSize = Math.min(channel.size(), end) - start; int oldSize = sizeInBytes(); if (newSize < oldSize) throw new KafkaException(String.format( "Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), oldSize, newSize)); long position = start + offset; int count = Math.min(length, oldSize); final long bytesTransferred; if (destChannel instanceof TransportLayer) { TransportLayer tl = (TransportLayer) destChannel; bytesTransferred = tl.transferFrom(channel, position, count); } else { bytesTransferred = channel.transferTo(position, count, destChannel); } return bytesTransferred; }