zoukankan      html  css  js  c++  java
  • 对比 kafka 和 rocketmq 的 IO

    rocketmq

    写文件 mmap
    MappedByteBuffer 写入消息的 byte

    读文件 mmap
    读 MappedByteBuffer

    发送消息给消费者 mmap + write
    一种:堆外 ByteBuffer 转堆内 byte 数组,通过 netty 发送
    另一种: netty 直接发送堆外 ByteBuffer

    kafka

    消息写入文件

    kafka.log.LogSegment#append

    写日志 (FileChannel)

    // org.apache.kafka.common.record.FileRecords#append
    public int append(MemoryRecords records) throws IOException {
        // channel 是 FileChannel 类型
        int written = records.writeFullyTo(channel);
        size.getAndAdd(written);
        return written;
    }
    // org.apache.kafka.common.record.MemoryRecords#writeFullyTo
    public int writeFullyTo(GatheringByteChannel channel) throws IOException {
        buffer.mark();
        int written = 0;
        while (written < sizeInBytes())
            // 这里清楚地看到,把 ByteBuffer 写入 FileChannel 
            written += channel.write(buffer);
        buffer.reset();
        return written;
    }

    写索引(mmap)

    // kafka.log.OffsetIndex#append
      def append(offset: Long, position: Int) {
        inLock(lock) {
          require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
          if (_entries == 0 || offset > _lastOffset) {
            trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
            mmap.putInt((offset - baseOffset).toInt)
            mmap.putInt(position)
            _entries += 1
            _lastOffset = offset
            require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
          } else {
            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
              s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
          }
        }
      }

    写 time index (mmap)

    // kafka.log.TimeIndex#maybeAppend
      def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
        inLock(lock) {
          if (!skipFullCheck)
            require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
          // We do not throw exception when the offset equals to the offset of last entry. That means we are trying
          // to insert the same time index entry as the last entry.
          // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
          // because that could happen in the following two scenarios:
          // 1. A log segment is closed.
          // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
          if (_entries != 0 && offset < lastEntry.offset)
            throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
              s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
          if (_entries != 0 && timestamp < lastEntry.timestamp)
            throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
              s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
          // We only append to the time index when the timestamp is greater than the last inserted timestamp.
          // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
          // index will be empty.
          if (timestamp > lastEntry.timestamp) {
            trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
            mmap.putLong(timestamp)
            mmap.putInt((offset - baseOffset).toInt)
            _entries += 1
            _lastEntry = TimestampOffset(timestamp, offset)
            require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
          }
        }
      }

    读 log 文件 file channel

    // org.apache.kafka.common.record.FileRecords#read

    发送消息给消费者 send file

    // org.apache.kafka.common.record.FileRecords#writeTo
    @Override
    public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
        long newSize = Math.min(channel.size(), end) - start;
        int oldSize = sizeInBytes();
        if (newSize < oldSize)
            throw new KafkaException(String.format(
                    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
                    file.getAbsolutePath(), oldSize, newSize));
    
        long position = start + offset;
        int count = Math.min(length, oldSize);
        final long bytesTransferred;
        if (destChannel instanceof TransportLayer) {
            TransportLayer tl = (TransportLayer) destChannel;
            bytesTransferred = tl.transferFrom(channel, position, count);
        } else {
            bytesTransferred = channel.transferTo(position, count, destChannel);
        }
        return bytesTransferred;
    }
  • 相关阅读:
    [最优化理论与技术]线性规划
    [吴恩达深度学习]神经网络和深度学习
    Linux系统级性能分析工具perf的介绍与使用
    Oracle表变化趋势追踪记录 & 表 历史统计信息查看
    delete noprompt archivelog
    DEPLOYING ORACLE RAC DATABASE 12C RELEASE 2 ON RED HAT ENTERPRISE LINUX 7
    How to Modify SCAN Setting or SCAN Listener Port after Installation (Doc ID 972500.1)
    How to create a RAC Database Service With Physical Standby Role Option? (Doc ID 1129143.1)
    Oracle级联备库0数据丢失--重建控制文件并应用主库online redo logfile的激活方法
    Handling ORL and SRL (Resize) on Primary and Physical Standby in Data Guard Environment (Doc ID 1532566.1)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12696551.html
Copyright © 2011-2022 走看看