zoukankan      html  css  js  c++  java
  • 【RocketMQ源码学习】- 5. 消息存储机制

    前言

    面试官:你了解RocketMQ是如何存储消息的吗?
    我:额,,,你等下,我看下这篇文字, (逃

    由于这部分内容优点多,所以请哥哥姐姐们自备茶水,欢迎留言!

    问题: 

    1. RocketMQ存储的文件是什么样子

    2. RocketMQ为什么存储的性能高?

    3. 事务的prepareMsg、rollback消息如何对消费者不可见

    4. ConsumeQueue、Index文件是什么时候生成的,如何生成的

    RocketMQ存储设计是高可用和高性能的保证, 利用磁盘存储来满足海量堆积能力。Kafka单机在topic数量在100+的时候,性能会下降很多,而RocketMQ能够在多个topic存在时,依然保持高性能

    下面主要从存储结构、存储流程、存储优化的技术来形成文字

    基于的版本是RocketMQ4.5.2

     

    存储架构图

    1. 要发送的消息,会按顺序写入commitlog中,这里所有topic和queue共享一个文件
    2. 存入commitlog后,由于消息会按照topic纬度来消费,会异步构建consumeQueue(逻辑队列)和index(索引文件),consumeQueue存储消息的commitlogOffset/messageSize/tagHashCode, 方便定位commitlog中的消息实体。每个 Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
    3. 消费者会从consumeQueue取到msgOffset,方便快速取出消息 

    好处

    1. CommitLog 顺序写 ,可以大大提高写人效率,提高堆积能力
    2. 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度
    3. 在实际情况中,大部分的 ConsumeQueue能够被全部读人内存,所以这个中间结构的操作速度很快, 可以认为是内存读取的速度

    消息文件存储的结构设计

    存储的文件主要分为:

    • commitlog: 存储消息实体
    • consumequeue: 按Topic和队列存储消息的offset
    • index: index按key、tag、时间等存储

    commitlog(物理队列)

    文件地址:${user.home} store${commitlog}${fileName}

    commitlog特点:

    • 存放该broke所有topic的消息
    • 默认1G大小
    • 以偏移量为文件名,当一个文件写满时则创建新文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件
    • 一个消息存储单元是不定长的
    • 顺序写但是随机读

    消息单元的存储结构

    下面的表格说明了,每个消息体不是定长的,会存储消息的哪些内容,包括物理偏移量、consumeQueue的偏移量、消息体等信息

    顺序字段名说明
    1 totalSize(4Byte) 消息大小
    2 magicCode(4) 设置为daa320a7 (这个不太明白)
    3 bodyCRC(4) 当broker重启recover时会校验
    4 queueId(4) 消息对应的consumeQueueId
    5 flag(4) rocketmq不做处理,只存储后透传
    6 queueOffset(8) 消息在consumeQueue中的偏移量
    7 physicalOffset(8) 消息在commitlog中的偏移量
    8 sysFlg(4) 事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
    9 bronTimestamp(8) 消息产生端(producer)的时间戳
    10 bronHost(8) 消息产生端(producer)地址(address:port)
    11 storeTimestamp(8) 消息在broker存储时间
    12 storeHostAddress(8) 消息存储到broker的地址(address:port)
    13 reconsumeTimes(4) 消息重试次数
    14 preparedTransactionOffset(8) 事务消息的物理偏移量
    15 bodyLength(4) 消息长度,最长不超过4MB
    16 body(body length Bytes) 消息体内容
    17 topicLength(1) 主题长度,最长不超过255Byte
    18 topic(topic length Bytes) 主题内容
    19 propertiesLength(2) 消息属性长度,最长不超过65535Bytes
    20 properties(properties length Bytes) 消息属性内容

     

    consumequeue文件(逻辑队列)

    文件地址:${user.home}storeconsumeQueue${topic}${queueId}${fileName}

    consumequeue文件特点:

    • 按topic和queueId纬度分别存储消息commitLogOffset、size、tagHashCode
    • 以偏移量为文件名
    • 一个存储单元是20个字节的定长的
    • 顺序读顺序写
    • 每个ConsumeQueue文件大小约5.72M

    每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件
    该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁;

    消息单元的存储结构

    顺序字段名说明
    1 offset(8) commitlog的偏移量
    2 size(4) commitlog消息大小
    3 tagHashCode tag的哈希值

     

    index索引文件

    文件地址:${user.home}storeindex${fileName}

    index文件特点:

    • 以时间作为文件名
    • 一个存储单元是20个字节定长的

    索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用

    存储单元的结构

    顺序字段名说明
    1 keyHash(4) key的结构是
    2 phyOffset(8) commitLog真实的物理位移
    3 timeOffset(4) 时间偏移量
    4 slotValue(4) 下一个记录的slot值

     

    消息存储流程

    RocketMQ文件存储模型层次结构

    层次从上到下依次为:

    1. 业务层
      • QueueMessageProcessor类
      • PullMessageProcessor类
      • SendMessageProcessor类
      • DefaultMessageStore类
    2. 存储逻辑层
      • IndexService类
      • ConsumeQueue类
      • CommitLog类
      • IndexFile类
      • MappedFileQueue类
    3. 磁盘交互IO层
      • MappedFile类
      • MappedByteBuffer类
    业务层 QueueMessageProcessor PullMessageProcessor
    SendMessageProcessor
    DefaultMessageStore
    存储逻辑层 IndexService ConsumeQueue CommitLog
    IndexFile MappedFileQueue
    磁盘交互IO层 MappedFile
    MappedByteBuffer
    Disk

     

    写commoitlog流程

    1. DefaultMessageStore,入口方法是putMessage方法

    RocketMQ 的存储核心类为 DefaultMessageStore,入口方法是putMessage方法

     1 // DefaultMessageStore#putMessage
     2 public PutMessageResult putMessage(MessageExtBrokerInner msg) {
     3     // 判断该服务是否shutdown,不可用直接返回【代码省略】
     4     // 判断broke的角色,如果是从节点直接返回【代码省略】
     5     // 判断runningFlags是否是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】
     6     // 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】
     7     // 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】
     8 
     9     if (this.isOSPageCacheBusy()) { // 判断操作系统页写入是否繁忙
    10         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    11     }
    12 
    13     long beginTime = this.getSystemClock().now();
    14     PutMessageResult result = this.commitLog.putMessage(msg);   // $2 查看下方代码,写msg核心
    15 
    16     long elapsedTime = this.getSystemClock().now() - beginTime;
    17     if (elapsedTime > 500) {
    18         log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    19     }
    20     // 记录写commitlog时间,大于最大时间则设置为这个最新的时间
    21     this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
    22 
    23     if (null == result || !result.isOk()) {
    24         // 记录写commitlog 失败次数
    25         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    26     }
    27 
    28     return result;
    29 }

    $2 CommitLog#putMessage 将日志写入CommitLog 文件

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;
    
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
    
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  // $1
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
    
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
    
        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();   // $3
    
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
    
            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
    
            if (null == mappedFile || mappedFile.isFull()) {    // $5 
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
    
            result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6
            switch (result.getStatus()) {   // $7
                case PUT_OK:
                    break;
                case END_OF_FILE:   
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {   
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }
    
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }
    
        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }
    
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
    
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
        // Statistics 
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    
        handleDiskFlush(result, putMessageResult, msg); // $8
        handleHA(result, putMessageResult, msg);        // $9
    
        return putMessageResult;
    }
    1. $1 获取消息的事务类型
    2. $2 对于事务消息中UNKNOW、COMMIT消息,处理topic和queueId, 同时备份real_topic,real_queueId
    3. $3 获取最新的mappedFile文件,有可能为空
    4. $4 给写mappedFile加锁(默认自旋锁)
    5. $5 mappedFile为空时创建mappedFile文件, 创建的mappedFile文件offset为0
    6. $6 在mappedFile中append消息,下面具体说明
    7. $7 根据mappedFile写消息的结果
      • ok, 直接break
      • 文件剩下的空间不够写了,重新创建一个mappedFile文件, 重新写消息
      • msg大小,properties大小,未知错误,返回错误类型
    8. $8 执行刷盘
    9. $9 执行主从同步

    3. $6 在mappedFile中append消息

    mappedFile.appendMessage方法会调用this.appendMessagesInner方法

    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
    
        int currentPos = this.wrotePosition.get();  // $1
    
        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            if (messageExt instanceof MessageExtBrokerInner) {  // $3
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());   // $5
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    1. $1 获取当前写入位置
    2. $2 创建写缓存,放入文件的写入位置
    3. $3 判断是单条消息还是批量消息
    4. $4 同步写消息, fileSize-currentPos即为该文件还剩下的空白大小
    5. $5 写完消息,累加文件当前位置

    4. $4 同步写消息

    代码在CommitLog内部类 DefaultAppendMessageCallback中 

    // CommitLog$DefaultAppendMessageCallback#doAppend
    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
        final MessageExtBrokerInner msgInner) {
        // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
    
        long wroteOffset = fileFromOffset + byteBuffer.position();  // $1
        this.resetByteBuffer(hostHolder, 8);    // $2
        String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    
        // Record ConsumeQueue information
        keyBuilder.setLength(0);
        keyBuilder.append(msgInner.getTopic());
        keyBuilder.append('-');
        keyBuilder.append(msgInner.getQueueId());
        String key = keyBuilder.toString();
        Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3
        if (null == queueOffset) {
            queueOffset = 0L;
            CommitLog.this.topicQueueTable.put(key, queueOffset);
        }
    
        // Transaction messages that require special handling
        final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
        switch (tranType) {
            // Prepared and Rollback message is not consumed, will not enter the
            // consumer queuec
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:  // $4
                queueOffset = 0L;
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            default:
                break;
        }
    
        // Serialize message        // $5
        final byte[] propertiesData =
            msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
        final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
        if (propertiesLength > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long. length={}", propertiesData.length);
            return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
        }
    
        final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
        final int topicLength = topicData.length;
        final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
        final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
    
        // Exceeds the maximum message
        if (msgLen > this.maxMessageSize) {
            CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                + ", maxMessageSize: " + this.maxMessageSize);
            return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
        }
    
        // Determines whether there is sufficient free space
        if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {  // $6
            this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
            
            this.msgStoreItemMemory.putInt(maxBlank);   // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE
            // 3 The remaining space may be any value
            // Here the length of the specially set maxBlank
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }
    
        // $7 【代码省略】
        
        if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData);
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        // Write messages to the queue buffer
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);     // $8
    
        AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,    // $9
            msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // The next update ConsumeQueue information
                CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                break;
            default:
                break;
        }
        return result;
    }
    1. $1 计算消息的物理偏移量=文件初始偏移量+byteBuffer开始的偏移量,文件初始偏移量跟commitlog文件名相同
    2. $2 在读buffer之前,调用flip方法翻转buffer(设置position为0,limit设置为8)
    3. $3 在topicQueueTable中缓存msg对应的offset
    4. $4 针对事务消息的prepare、rollback消息,由于这个消息不需要对消费这可见,所以queueOffset=0,不记到consumerQueue
    5. $5 序列化properties,topic,计算消息最大值
    6. $6 如果消息长度+8大于MapperFile剩余文件空间,则返回END_OF_FILE, 抛给上层,由CommitLog#putMessage这层重新创建文件,重新写消息
    7. $7 根据commitlog的数据结构,构建commitlog数据,如TOTALSIZE,MAGICCODE 。。等等
    8. $8 把构建的this.msgStoreItemMemory写到byteBuffer中(内存中)
    9. $9 生成返回值
    10. $10 针对提交事务消息,重新放入topicQueueTable ??? 

    异步构建ConsumeQueue和Index文件流程

    1. ConsumeQueue和IndexFile什么时候建立的呢?
      – 在Broker启动的时候,会启动一个ReputMessageService线程服务, 会去设置consumeQueueTable内存中最大的偏移量
    long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
            }
        }
    }
    if (maxPhysicalPosInLogicQueue < 0) {
        maxPhysicalPosInLogicQueue = 0;
    }
    if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
        maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
        log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
    }
    this.reputMessageService.start();
    1. ReputMessageService线程每隔1ms执行doReput操作->根据CommitLog最新追加到的消息不断生成:
    • 消息的offset到CommitQueue
    • 消息索引到IndexFile
    1. 下面查看下doReput方法具体执行
    private void doReput() {
        if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { // $1
            log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
            this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
        }
        for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { // $2
            if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {   
                break;
            }
            SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);  // $3
            if (result != null) {
                try {
                    this.reputFromOffset = result.getStartOffset(); // $4
    
                    for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                        DispatchRequest dispatchRequest =
                            DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // $5 构建dispatchRequest
                        int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                        if (dispatchRequest.isSuccess()) {
                            if (size > 0) {
                                DefaultMessageStore.this.doDispatch(dispatchRequest);   // $6 
    
                                if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() // 如果该broker是主broker,可以推送消息到达conusmerQueue的消息,这里用户也客户自定定推送的监听
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                }
    
                                this.reputFromOffset += size;   // $7
                                readSize += size;
                                if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                        .addAndGet(dispatchRequest.getMsgSize());
                                }
                            } else if (size == 0) {
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                readSize = result.getSize();
                            }
                        } else if (!dispatchRequest.isSuccess()) {
    
                            if (size > 0) { // &8
                                log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                this.reputFromOffset += size;
                            } else {
                                doNext = false;
                                // If user open the dledger pattern or the broker is master node,
                                // it will not ignore the exception and fix the reputFromOffset variable
                                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                    DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                    log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                        this.reputFromOffset);
                                    this.reputFromOffset += result.getSize() - readSize;
                                }
                            }
                        }
                    }
                } finally {
                    result.release();
                }
            } else {
                doNext = false;
            }
        }
    }
    • doReput流程:
      1. $1 如果reputFromOffset小于文件起始偏移量,则把reputFromOffset设置为文件起始偏移量,出现的可能原因:磁盘损坏,认为人为了文件等
      2. $2 因为reputFromOffset是consumeQueue中的偏移量,所以只要reputFromOffset小于commitlog最大偏移量,就会不断的循环
      3. $3 根据offset获取byteBuffer
      4. $4 更新reputFromOffset成byteBuffer中的offset
      5. $5 构建dispatchRequest
      6. $6 分别调用CommitLogDispatcherBuildConsumeQueue(构建消息消费队列)和CommitLogDispatcherBuildIndex(构建索引文件)
      7. $7 读完这条消息,更新reputFromOffset+=size,更新readSize+=size
      8. $8 不成功,如果这个消息的size不为0,尝试下一条
    1. 根据消息更新ConsumeQueue
      在doReput方法中$6中会更新consumeQueue, 消息消费队列转发的任务实现类为:CommitLogDispatcherBuildConsumeQueue,内部实际调用的是putMessagePositionInfo方法

      Step1: 根据topicId和queueId获取ConsumeQueue
      Step2: 将消息偏移量、消息size、tagHashCode(查看ConsumeQueue的数据结构)),把消息追加到ConsumeQueue的内存映射文件(mappedFile)中(不刷盘),consumeQueue默认异步刷盘

    1 return mappedFile.appendMessage(this.byteBufferIndex.array());
    1. 根据消息更新Index索引文件
      Hash索引文件转发任务实现类:CommitLogDispatcherBuildIndex

        如果messageIndexEnable设置为true, 则转发此任务,否则不转发
        step1: 获取indexFile, 如果indexFileList的内存中没有indexFile,则根据路径重新构建indexFile
        step2: 如果消息的唯一键不存在,则条件到放到indexFile中

    说说存储的类与文件

     DefaultMessageStore类核心属性

    上面说到DefaultMessageStore是存储的业务层,putMessage是入口方法

    • messageStoreConfig
      • 存储相关的配置,例如存储路径、commitLog文件大小,刷盘频次等等。
    • CommitLog commitLog
      • comitLog 的核心处理类,消息存储在 commitlog 文件中。
    • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
      • topic 的队列信息。
    • FlushConsumeQueueService flushConsumeQueueService
      • ConsumeQueue 刷盘服务线程。
    • CleanCommitLogService cleanCommitLogService
      commitLog 过期文件删除线程。
    • CleanConsumeQueueService cleanConsumeQueueService
      • consumeQueue 过期文件删除线程。、
    • IndexService indexService
      • 索引服务。
    • AllocateMappedFileService allocateMappedFileService
      • MappedFile 分配线程,RocketMQ 使用内存映射处理 commitlog、consumeQueue文件。
    • ReputMessageService reputMessageService
      • reput 转发线程(负责 Commitlog 转发到 Consumequeue、Index文件)。
    • HAService haService
      • 主从同步实现服务。
    • ScheduleMessageService scheduleMessageService
      • 定时任务调度器,执行定时任务。
    • StoreStatsService storeStatsService
      • 存储统计服务。
    • TransientStorePool transientStorePool
      • ByteBuffer 池
    • RunningFlags runningFlags
      • 存储服务状态。
    • BrokerStatsManager brokerStatsManager
      • Broker 统计服务。
    • MessageArrivingListener messageArrivingListener
      • 消息达到监听器。
    • StoreCheckpoint storeCheckpoint
      • 刷盘检测点。
    • LinkedList dispatcherList
      • 转发 comitlog 日志,主要是从 commitlog 转发到 consumeQueue、index 文件。

    从上面的属性可以观察到有几类属性:

    • 服务类:如刷盘服务线程、删除文件线程、索引服务、mappedFile分配线程、reput转发线程、主从同步线程、定时任务服务、broker统计服务
    • 配置类:存储设置类
    • 存储信息类:commitLog、consumeQueueTable topic队列信息、transientStorePool ByteBuffer池、刷盘检测点、dispatcherList
    • 监听器:消息达到监听器

    刷盘

    这里会另起一篇文字来说明

     

    执行主从同步

    这里会另起一篇文字来说明

    PageCache(页缓存)与Mmap内存映射 

    pageCache定义

    Page cache 也叫页缓冲或文件缓冲,是由好几个磁盘块构成,大小通常为4k,在64位系统上为8k,构成的几个磁盘块在物理磁盘上不一定连续,文件的组织单位为一页, 也就是一个page cache大小,文件读取是由外存上不连续的几个磁盘块,到buffer cache,然后组成page cache,然后供给应用程序。

    pageCache加载

    操作系统操作I/O时,会先在pageCache中查找,如果未命中,则启动磁盘I/O,并把磁盘文件中的数据加载到pageCache的一个空闲快中,然后在copy到用户缓冲区 

    pageCache预读

    对于每个文件的第一个读请求操作,系统在读入所请求页面的同时会顺序读入后面少数几个页面 

    pageCache与RocketMQ的关联

    MQ读取消息依赖系统PageCache,PageCache命中率越高,读性能越高

    ConsumeQueue逻辑消费队列是顺序读取,在pageCache机制的预读取作用下,ConsumeQueue的读性能会比较高近乎内存,即使在有消息堆积情况下也不会影响性能。

    Mmap内存映射技术—MappedByteBuffer

    另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率 

    使用mmap内存映射的限制

    • 每次只能映射1.5左右的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因
    • MMAP 使用的是虚拟内存,和 PageCache 一样是由操作系统来控制刷盘的,虽然可以通过 force() 来手动控制,但这个时间把握不好,在小内存场景下会很令人头疼。
    • 会存在内存占用率较高和文件关闭不确定性的问题

    结语

    参考:

    欢迎关注我的公众号


  • 相关阅读:
    接口的经典使用方法
    多态的程序例子
    log4j常用配置过程
    log4j.properties对于web app摆放的位置
    MySQL优化实例
    No sql 相关
    yii直接执行sql
    android NDK JNI设置自己的log输出函数
    android web 网址收集
    WebKit加载网页的流程
  • 原文地址:https://www.cnblogs.com/milicool/p/11963024.html
Copyright © 2011-2022 走看看