zoukankan      html  css  js  c++  java
  • 消息队列(五)--- RocketMQ-消息存储1

    问题 ;

    • 部署时如何知道自己是 broker 还是 NameServer
    • topic 订阅信息放在哪里
    • broker 的作用到底是什么
    • 纪录是如何持久化的
    • 群发的时候,是如何储存消息的
    • send 方法到底有没有使用多线程发送处理,简单叙述一个 send 的过程
    • transientStorePool 作用到底是什么
    • 为什么要使用堆外内存来存储消息先,而不是使用堆内内存
      以上几个问题,是在学习中思考的问题,我们将在学习过程中逐渐寻找答案。

    概述

    本文将会走完文件存储的主体流程,后续分析各个重要的类。下面简述来自参考资料,过程简述如下 :

    SendMessageProcessor.processRequest
      -->buildMsgContext(ctx, requestHeader);
      -->this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
      -->sendMessage
        (检查消息)
        -->super.msgCheck(ctx, requestHeader, response);
        (查询topic的配置信息)
        -->this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        (处理重试消息)
        -->handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)
        -->this.brokerController.getMessageStore().putMessage(msgInner);
            -->this.commitLog.putMessage(msg);
                -->this.mappedFileQueue.getLastMappedFile();
                -->mappedFile.appendMessage(msg, this.appendMessageCallback);
                (如果是同步刷盘则立即调用该方法刷盘)
                -->handleDiskFlush(result, putMessageResult, msg);
                -->异步刷盘的情况下通过唤起FlushRealTimeService的线程刷盘
                (处理主从同步)
                -->handleHA(result, putMessageResult, messageExtBatch);
    
    

    源码分析

    消息持久化几个重要的类 :

    • DefaultMessageStore
    • commitLog
    • MappedFile

    DefaultMessageStore 持有 commitLog ,commitLog 持有 MappedFileQueue ,MappedFileQueue 管理着多个 MappedFile.

    首先我们从 DefaultMessageStore 的 putMessage 方法

        /**
         * 1. 判断是否条件满足条件
         * 2. 调用 this.commitLog.putMessage(msg);
         *
         * @param msg Message instance to store
         * @return
         */
        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so putMessage is forbidden");
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            // 该 broker 是 slave 模式
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is slave mode, so putMessage is forbidden ");
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            if (!this.runningFlags.isWriteable()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            } else {
                this.printTimes.set(0);
            }
    
            if (msg.getTopic().length() > Byte.MAX_VALUE) {
                log.warn("putMessage message topic length too long " + msg.getTopic().length());
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            }
    
            if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
            }
    
            if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
            }
    
    
            long beginTime = this.getSystemClock().now();
            //写入逻辑
            PutMessageResult result = this.commitLog.putMessage(msg);
    
            long eclipseTime = this.getSystemClock().now() - beginTime;
            if (eclipseTime > 500) {
                log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
    
            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
    
            return result;
        }
    
    

    CommitLog 的 putMessage 方法

        /**
         * @param msg
         * @return
         */
        public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
            // Set the storage time
            msg.setStoreTimestamp(System.currentTimeMillis());
            // Set the message body BODY CRC (consider the most appropriate setting
            // on the client)
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            // Back to Results
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
            int queueId = msg.getQueueId();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // Delay Delivery 延迟发送,将topic 换成 SCHEDULE_TOPIC , queueId 换成 延迟时间级别
                // 这是并发消息消费重试关键的一步,下一章会重点探讨消息重试机制与定时消息的实现原理。
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
    
            long eclipseTimeInLock = 0;
            MappedFile unlockMappedFile = null;
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config  自旋或是重入锁看存储配置
            try {
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;
    
                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                msg.setStoreTimestamp(beginLockTimestamp);
    
                if (null == mappedFile || mappedFile.isFull()) {
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                }
                if (null == mappedFile) {
                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                }
                //核心逻辑!!可知 mappedFile是最终写入文件中的实施者
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                switch (result.getStatus()) {
                    case PUT_OK:
                        break;
                    case END_OF_FILE:
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) {
                            // XXX: warn and notify me
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        }
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    default:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
    
                eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
            } finally {
                putMessageLock.unlock();
            }
    
            if (eclipseTimeInLock > 500) {
                log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
            }
    
            if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
            }
    
            PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
            // Statistics
            storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
            storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    
            handleDiskFlush(result, putMessageResult, msg);
            handleHA(result, putMessageResult, msg);
    
            return putMessageResult;
        }
    
    

    mappedFile 执行写入内存操作

        public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
            return appendMessagesInner(msg, cb);
        }
    
    
            public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
            assert messageExt != null;
            assert cb != null;
    
            int currentPos = this.wrotePosition.get();
            // 判断写针位置(是否已经大于文件大小)如果 currentPos 小于文件大小,通过 slice()方法创建一个与 MappedFile 的共享 内
            //存区,并设置 position 为当前指针 。
            if (currentPos < this.fileSize) {
                // writeBuffer不为空则取 writeBuffer 剩下的空间,否则取 mappedByteBuffer 剩下的空间
                // writeBuffer 的 赋值在 init 方法,构造时不使用 TransientStorePool ,writeBuffeer 为 null 
                ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
                byteBuffer.position(currentPos);
                AppendMessageResult result = null;
                if (messageExt instanceof MessageExtBrokerInner) {
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
                } else if (messageExt instanceof MessageExtBatch) {
                    result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
                } else {
                    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
                }
                this.wrotePosition.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
            log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
    
    

    CommitLog$DefaultAppendMessageCallback非静态内部类中的方法

            /**
             * 1. 生成 messageId
             * 2. 获取offset
             * 3. 构造消息
             * 4. buffer put 方法 ,后续会刷入文件
             * @param fileFromOffset
             * @param byteBuffer
             * @param maxBlank
             * @param msgInner
             * @return
             */
            public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
                                                final MessageExtBrokerInner msgInner) {
                // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
    
                // 创建全局唯一消息ID
                // PHY OFFSET
                long wroteOffset = fileFromOffset + byteBuffer.position();
    
                this.resetByteBuffer(hostHolder, 8);
                String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
    
                // Record ConsumeQueue information
                keyBuilder.setLength(0);
                keyBuilder.append(msgInner.getTopic());
                keyBuilder.append('-');
                keyBuilder.append(msgInner.getQueueId());
                String key = keyBuilder.toString();
    
                //获取该消息在消息队列的偏移量 。 CommitLog 中保存了当前所有消息队列的当
                //前待写入偏移量 。
                Long queueOffset = CommitLog.this.topicQueueTable.get(key);
                if (null == queueOffset) {
                    queueOffset = 0L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                }
    
                // Transaction messages that require special handling
                // 需要特殊处理的事务消息
                // 可以看到消息类型为 TRANSACTION_PREPARED_TYPE 或是 TRANSACTION_ROLLBACK_TYPE ,那么
                // 偏移量改为 0L
                final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
                switch (tranType) {
                    // Prepared and Rollback message is not consumed, will not enter the
                    // consumer queuec
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        queueOffset = 0L;
                        break;
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    default:
                        break;
                }
    
                /**
                 * Serialize message
                 */
                final byte[] propertiesData =
                        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
    
                final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
    
                if (propertiesLength > Short.MAX_VALUE) {
                    log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
                }
    
                final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
                final int topicLength = topicData.length;
    
                final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
    
                final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
    
                // Exceeds the maximum message
                if (msgLen > this.maxMessageSize) {
                    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                            + ", maxMessageSize: " + this.maxMessageSize);
                    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
                }
    
                // Determines whether there is sufficient free space
                if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                    // 1 TOTALSIZE
                    this.msgStoreItemMemory.putInt(maxBlank);
                    // 2 MAGICCODE
                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                    // 3 The remaining space may be any value
                    // Here the length of the specially set maxBlank
                    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
    
                // Initialization of storage space
                this.resetByteBuffer(msgStoreItemMemory, msgLen);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(msgLen);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
                // 3 BODYCRC
                this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
                // 4 QUEUEID
                this.msgStoreItemMemory.putInt(msgInner.getQueueId());
                // 5 FLAG
                this.msgStoreItemMemory.putInt(msgInner.getFlag());
                // 6 QUEUEOFFSET
                this.msgStoreItemMemory.putLong(queueOffset);
                // 7 PHYSICALOFFSET
                this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
                // 8 SYSFLAG
                this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
                // 9 BORNTIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
                // 10 BORNHOST
                this.resetByteBuffer(hostHolder, 8);
                this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
                // 11 STORETIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
                // 12 STOREHOSTADDRESS
                this.resetByteBuffer(hostHolder, 8);
                this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
                //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
                // 13 RECONSUMETIMES
                this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
                // 14 Prepared Transaction Offset
                this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
                // 15 BODY
                this.msgStoreItemMemory.putInt(bodyLength);
                if (bodyLength > 0)
                    this.msgStoreItemMemory.put(msgInner.getBody());
                // 16 TOPIC
                this.msgStoreItemMemory.put((byte) topicLength);
                this.msgStoreItemMemory.put(topicData);
                // 17 PROPERTIES
                this.msgStoreItemMemory.putShort((short) propertiesLength);
                if (propertiesLength > 0)
                    this.msgStoreItemMemory.put(propertiesData);
    
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                // Write messages to the queue buffer
                // msgStoreItemMemory :HeapByteBuffer  byteBuffer DirectByteBuffer。将消息从堆内内存转移到堆外内存。
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    
                AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    
                switch (tranType) {
                    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                        break;
                    case MessageSysFlag.TRANSACTION_NOT_TYPE:
                    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                        // The next update ConsumeQueue information
                        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                        break;
                    default:
                        break;
                }
                return result;
            }
    
    

    补充

    堆内内存(HeapByteBuffer)就是我们常说的垃圾回收那部分的内存,而堆外内存(DirectByteBuffer)则是计算机分配给某个进程的内存,位于堆外,那么使用堆外内存的优势是什么呢?

    1. 减少了垃圾回收
      堆外内存由于不在堆内,所以减少了垃圾会的工作,垃圾回收的时候会涨停其他工作。
    2. 减去了复制的操作。
      堆内在flush到远程时,会先复制到直接内存(非堆内存),然后在发送;而堆外内存相当于省略掉了这个工作。(这一点就是零拷贝的知识点)

    注意事项 :

    java.nio.DirectByteBuffer对象在创建过程中会先通过Unsafe接口直接通过os::malloc来分配内存,然后将内存的起始地址和大小存到java.nio.DirectByteBuffer对象里,这样就可以直接操作这些内存(即是说DirectByteBuffer 放着堆内对象的地址,当操作的时候实际是操作堆内对象)。这些内存只有在DirectByteBuffer回收掉之后才有机会被回收,因此如果这些对象大部分都移到了old,但是一直没有触发CMS GC或者Full GC,那么悲剧将会发生,因为你的物理内存被他们耗尽了,因此为了避免这种悲剧的发生,通过

    -XX:MaxDirectMemorySize
    

    来指定最大的堆外内存大小,当使用达到了阈值的时候将调用System.gc来做一次full gc,以此来回收掉没有被使用的堆外内存。摘自参考资料

    总结

    我们可以看到 RocketMQ 最主要就是利用映射文件的方法来提高读写效率。

    参考资料

  • 相关阅读:
    CSS定位属性position相关介绍
    JavaScript 预解析机制
    使用 python 实现π的计算
    turtle库的学习
    关于使用MVVM模式在WPF的DataGrid控件中实现ComboBox编辑列
    关于wpf窗体中Allowtransparent和WindowsFormsHost的纠纷
    关于WPF中ItemsControl系列控件中Item不能继承父级的DataContext的解决办法
    Python简史
    敏捷开发原则
    线程池
  • 原文地址:https://www.cnblogs.com/Benjious/p/11727898.html
Copyright © 2011-2022 走看看