zoukankan      html  css  js  c++  java
  • 再说rocketmq消息存储

    两篇精彩的文章:

    RocketMQ源码 — 三、 Producer消息发送过程

    RocketMQ源码解析:Message存储

    rocketmq通过netty获取到消息请求后,直接掉处理模块,比如:SendMessageProcessor

    这个处理类主要负责处理客户端发送消息的请求

    这个类实现了com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor接口。这个接口下一共两个方法:

    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws Exception;
    boolean rejectRequest();

    加粗的方法是我们接下来要讲的方法。

    它同时还继承了com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor

    我们先看一下这个过程的调用链:

    这里有一篇博客写的特别好:http://www.cnblogs.com/sunshine-2015/p/6291116.html

    再次基础之上补充一点点东西。

    首先进入SendMessageProcessor类的第一个方法是:

    这个类什么时候被调用的呢?肯定是netty通信模块接收到消息后调用了,具体实在这里:

    就是箭头所指的地方调用的上面那个方法。

    我们看一下完整的从netty通信模块接收消息到这里的整个过程:

    现在就到了我们非常熟悉的Handler了。

    这里从下往上看会更加的直观。

    好了,我们接着往消息存储走。

    我们进入这个方法:

        private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
                                            final RemotingCommand request, //
                                            final SendMessageContext sendMessageContext, //
                                            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
            final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    
    
            response.setOpaque(request.getOpaque());
    
            response.addExtField(MessageConst.PROPERTY_MSG_REGION,this.brokerController.getBrokerConfig().getRegionId());
    
            if (log.isDebugEnabled()) {
                log.debug("receive SendMessage request command, " + request);
            }
    
            final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
            if (this.brokerController.getMessageStore().now() < startTimstamp) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
                return response;
            }
    
            response.setCode(-1);
            super.msgCheck(ctx, requestHeader, response);
            if (response.getCode() != -1) {
                return response;
            }
    
            final byte[] body = request.getBody();
    
            int queueIdInt = requestHeader.getQueueId();
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    
            if (queueIdInt < 0) {
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
            }
    
            int sysFlag = requestHeader.getSysFlag();
    
            if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
                sysFlag |= MessageSysFlag.MultiTagsFlag;
            }
    
            String newTopic = requestHeader.getTopic();
            if ((null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
    
                String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
    
                SubscriptionGroupConfig subscriptionGroupConfig =
                        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
                if (null == subscriptionGroupConfig) {
                    response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                    response.setRemark(
                            "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                    return response;
                }
    
    
                int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
                if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                    maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
                }
                int reconsumeTimes = requestHeader.getReconsumeTimes();
                if (reconsumeTimes >= maxReconsumeTimes) {
                    newTopic = MixAll.getDLQTopic(groupName);
                    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
                            DLQ_NUMS_PER_GROUP, //
                            PermName.PERM_WRITE, 0
                    );
                    if (null == topicConfig) {
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("topic[" + newTopic + "] not exist");
                        return response;
                    }
                }
            }
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(newTopic);
            msgInner.setBody(body);
            msgInner.setFlag(requestHeader.getFlag());
            MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
            msgInner.setPropertiesString(requestHeader.getProperties());
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
    
            msgInner.setQueueId(queueIdInt);
            msgInner.setSysFlag(sysFlag);
            msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
            msgInner.setBornHost(ctx.channel().remoteAddress());
            msgInner.setStoreHost(this.getStoreHost());
            msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (traFlag != null) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                    return response;
                }
            }
         // 前面是一些系统检查和数据准备,下面进入消息存储环节。
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            
         // 锚点putMessage
         if (putMessageResult != null) { boolean sendOK = false; switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: sendOK = true; response.setCode(ResponseCode.SUCCESS); break; case FLUSH_DISK_TIMEOUT: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); sendOK = true; break; case FLUSH_SLAVE_TIMEOUT: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); sendOK = true; break; case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); sendOK = true; break; // Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create maped file failed, please make sure OS and JDK both 64bit."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; } String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); if (sendOK) { this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(); response.setRemark(null); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); doResponse(ctx, request, response); if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } return null; } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); } return response; }

    我们进入PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

    的MessageStore.putMessage(msgInner)方法。

    MesageStore这个接口一共有三个实现类:

    实现了上面那个方法的只有DefaultMessageStore实现类。所以我们看DefaultMessageStore.putMessage(MessageExtBrokerInner msg)方法。

        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so putMessage is forbidden");
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is slave mode, so putMessage is forbidden ");
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            }
    
            if (!this.runningFlags.isWriteable()) {
                long value = this.printTimes.getAndIncrement();
                if ((value % 50000) == 0) {
                    log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
                }
    
                return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
            } else {
                this.printTimes.set(0);
            }
    
            // message topic长度校验
            if (msg.getTopic().length() > Byte.MAX_VALUE) {
                log.warn("putMessage message topic length too long " + msg.getTopic().length());
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            }
    
            // message properties长度校验
            if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
            }
    
    
            if (this.isOSPageCacheBusy()) {
                return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
            }
    
            long beginTime = this.getSystemClock().now();
         //上面是一些系统检查,下面是消息的存储 PutMessageResult result
    = this.commitLog.putMessage(msg); // 性能数据统计 long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 1000) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
    其中commitLog在构造方法中的初始化:this.commitLog = new CommitLog(this);

     我们接着进入commitLog.putMessage(final MessageExtBrokerInner msg)

     这个方法搞定了,你也就彻底了解了rocketmq的消息存储过程了。

        public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
            // Set the storage time
            msg.setStoreTimestamp(System.currentTimeMillis());
            // Set the message body BODY CRC (consider the most appropriate setting
            // on the client)
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            // Back to Results
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
            int queueId = msg.getQueueId();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TransactionNotType//
                    || tranType == MessageSysFlag.TransactionCommitType) {
                // Delay Delivery
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
    
            long eclipseTimeInLock = 0;
            MapedFile unlockMapedFile = null;
            MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
            synchronized (this) {
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;
    
                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                msg.setStoreTimestamp(beginLockTimestamp);
    
                if (null == mapedFile || mapedFile.isFull()) {
                    mapedFile = this.mapedFileQueue.getLastMapedFile();
                }
                if (null == mapedFile) {
                    log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
                }
                result = mapedFile.appendMessage(msg, this.appendMessageCallback);
                switch (result.getStatus()) {
                    case PUT_OK:
                        break;
                    case END_OF_FILE:
                        unlockMapedFile = mapedFile;
                        // Create a new file, re-write the message
                        mapedFile = this.mapedFileQueue.getLastMapedFile();
                        if (null == mapedFile) {
                            // XXX: warn and notify me
                            log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                        }
                        result = mapedFile.appendMessage(msg, this.appendMessageCallback);
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                    default:
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                }
    
                eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
            } // end of synchronized
    
            if (eclipseTimeInLock > 1000) {
                log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
            }
            if (null != unlockMapedFile) {
                this.defaultMessageStore.unlockMapedFile(unlockMapedFile);
            }
    
            PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
            // Statistics
            storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
            storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    
            GroupCommitRequest request = null;
    
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (msg.isWaitStoreMsgOK()) {
                    request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                                + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }
                } else {
                    service.wakeup();
                }
            }
            // Asynchronous flush
            else {
                this.flushCommitLogService.wakeup();
            }
    
            // Synchronous write double
            if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                HAService service = this.defaultMessageStore.getHaService();
                if (msg.isWaitStoreMsgOK()) {
                    // Determine whether to wait
                    if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                        if (null == request) {
                            request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                        }
                        service.putRequest(request);
    
                        service.getWaitNotifyObject().wakeupAll();
    
                        boolean flushOK =
                                // TODO
                                request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                        if (!flushOK) {
                            log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
                                    + msg.getTags() + " client address: " + msg.getBornHostString());
                            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                        }
                    }
                    // Slave problem
                    else {
                        // Tell the producer, slave not available
                        putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                    }
                }
            }
    
            return putMessageResult;
        }

     我们进入标黄的方法:

    /**
         * 插入消息到 MappedFile,并返回插入结果。
         * MappedFile的理解就可以按照字面意思来理解。换一种说法就是file的缓冲区。刷盘之前先写入缓存区。nio就是这样的。
         * @param msg
         * @param cb
         * @return
         */
        public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
            assert msg != null;
            assert cb != null;
    
            int currentPos = this.wrotePostion.get();
    
    
            if (currentPos < this.fileSize) {
                // 获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(currentPos);
                AppendMessageResult result =
                        cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
                this.wrotePostion.addAndGet(result.getWroteBytes());
                this.storeTimestamp = result.getStoreTimestamp();
                return result;
            }
    
    
            log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
                    + this.fileSize);
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }

    接着看上面这个标黄的方法:

           public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
                // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
                MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;
                // PHY OFFSET
                long wroteOffset = fileFromOffset + byteBuffer.position();
    
    
                String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset);
    
                // Record ConsumeQueue information
                String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
                Long queueOffset = CommitLog.this.topicQueueTable.get(key);
                if (null == queueOffset) {
                    queueOffset = 0L;
                    CommitLog.this.topicQueueTable.put(key, queueOffset);
                }
    
                // Transaction messages that require special handling
                final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
                switch (tranType) {
                    // Prepared and Rollback message is not consumed, will not enter the
                    // consumer queuec
                    case MessageSysFlag.TransactionPreparedType:
                    case MessageSysFlag.TransactionRollbackType:
                        queueOffset = 0L;
                        break;
                    case MessageSysFlag.TransactionNotType:
                    case MessageSysFlag.TransactionCommitType:
                    default:
                        break;
                }
    
                /**
                 * Serialize message 序列化消息
                 */
                final byte[] propertiesData =
                        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
                if (propertiesData.length > Short.MAX_VALUE) {
                    log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
                }
    
                final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;
    
                final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
                final int topicLength = topicData == null ? 0 : topicData.length;
    
                final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
    
                final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
    
                // Exceeds the maximum message
                if (msgLen > this.maxMessageSize) {
                    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                            + ", maxMessageSize: " + this.maxMessageSize);
                    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
                }
    
                // Determines whether there is sufficient free space
                // 确定是否有足够的空间
                if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                    this.resetMsgStoreItemMemory(maxBlank);
                    // 1 TOTALSIZE
                    this.msgStoreItemMemory.putInt(maxBlank);
                    // 2 MAGICCODE
                    this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
                    // 3 The remaining space may be any value
                    //
    
                    // Here the length of the specially set maxBlank
                    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
                }
    
                // Initialization of storage space
                this.resetMsgStoreItemMemory(msgLen);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(msgLen);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);
                // 3 BODYCRC
                this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
                // 4 QUEUEID
                this.msgStoreItemMemory.putInt(msgInner.getQueueId());
                // 5 FLAG
                this.msgStoreItemMemory.putInt(msgInner.getFlag());
                // 6 QUEUEOFFSET
                this.msgStoreItemMemory.putLong(queueOffset);
                // 7 PHYSICALOFFSET
                this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
                // 8 SYSFLAG
                this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
                // 9 BORNTIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
                // 10 BORNHOST
                this.msgStoreItemMemory.put(msgInner.getBornHostBytes());
                // 11 STORETIMESTAMP
                this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
                // 12 STOREHOSTADDRESS
                this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
                // 13 RECONSUMETIMES
                this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
                // 14 Prepared Transaction Offset
                this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
                // 15 BODY
                this.msgStoreItemMemory.putInt(bodyLength);
                if (bodyLength > 0)
                    this.msgStoreItemMemory.put(msgInner.getBody());
                // 16 TOPIC
                this.msgStoreItemMemory.put((byte) topicLength);
                this.msgStoreItemMemory.put(topicData);
                // 17 PROPERTIES
                this.msgStoreItemMemory.putShort(propertiesLength);
                if (propertiesLength > 0)
                    this.msgStoreItemMemory.put(propertiesData);
    
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                // message:>>>>>>>:Hello RocketMQ 0
                System.out.println("message:>>>>>>>:" + new String(msgInner.getBody()));
                // Write messages to the queue buffer
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    
                AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    
                switch (tranType) {
                    case MessageSysFlag.TransactionPreparedType:
                    case MessageSysFlag.TransactionRollbackType:
                        break;
                    case MessageSysFlag.TransactionNotType:
                    case MessageSysFlag.TransactionCommitType:
                        // The next update ConsumeQueue information
                        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                        break;
                    default:
                        break;
                }
    
                return result;
            }
    
    
            private void resetMsgStoreItemMemory(final int length) {
                this.msgStoreItemMemory.flip();
                this.msgStoreItemMemory.limit(length);
            }
        }

    我们注意到消息被保存到了:msgStoreItemMemory这里,这是一个内存映射文件MappedByteBuffer

    可以参考(http://www.cnblogs.com/guazi/p/6829487.html)

    他声明在MapedFile中,这个类我们上面说了,他的功能和他的名字正好相符。

    //映射的内存对象 文件映射为的内存
    private final MappedByteBuffer mappedByteBuffer;
    //映射的FileChannel对象 nio阻塞
    private FileChannel fileChannel;

    在构造方法中初始化:

    
    
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    保存到这里之后,我们回到最开始的 sendMessage方法中的锚点

    消息保存的映射缓存中后,就启动耍盘线程了,同样是在sendmessage方法中,接下来就是耍盘操作了:

    ...
        ...
         // 消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
            // 进行同步||异步 flush||commit
            GroupCommitRequest request = null;
    
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (msg.isWaitStoreMsgOK()) {
                    request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                                + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    }
                } else {
                    service.wakeup();
                }
            }
            // Asynchronous flush
            else {
                this.flushCommitLogService.wakeup();// important:唤醒commitLog线程,进行flush。具体看flushCommitLogService的实现类
            }
        ...
    ...

    这里分成了种刷盘方式,一种是同步FlushDiskType.SYNC_FLUSH,一种是异步。

    配置系统的同步和异步刷盘方式可以通过这里看到:

    同步的是这个线程:GroupCommitService service = (GroupCommitService) this.flushCommitLogService;来处理刷盘逻辑。

    我们直接看他的刷盘业务代码:

            private void doCommit() {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; (i < 2) && !flushOK; i++) {
                            flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
    
                            if (!flushOK) {
                                CommitLog.this.mapedFileQueue.commit(0);
                            }
                        }
    
                        req.wakeupCustomer(flushOK);
                    }
    
                    long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
    
                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mapedFileQueue.commit(0);
                }
            }

    最后调用:CommitLog.this.mapedFileQueue.commit(0);来保存到硬盘。

      // 消息文件队列,包含所有保存在磁盘上的文件。另外MappedFile和文件一对一关系。这两个类之间的关系非常紧密。
    com.alibaba.rocketmq.store.MapedFileQueue

    这个也是在 CommitLog的构造方法中初始化的。

    MapedFileQueue mapedFileQueue;
    this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
    defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
      public MapedFileQueue(final String storePath, int mapedFileSize,
                              AllocateMapedFileService allocateMapedFileService) {
            this.storePath = storePath;
            this.mapedFileSize = mapedFileSize;
            this.allocateMapedFileService = allocateMapedFileService;
        }

    看一下commit(final int flushLeastPages)方法

       public boolean commit(final int flushLeastPages) {
            boolean result = true;
            MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);
            if (mapedFile != null) {
                long tmpTimeStamp = mapedFile.getStoreTimestamp();
                int offset = mapedFile.commit(flushLeastPages);
                long where = mapedFile.getFileFromOffset() + offset;
                result = (where == this.committedWhere);
                this.committedWhere = where;
                if (0 == flushLeastPages) {
                    this.storeTimestamp = tmpTimeStamp;
                }
            }
    
            return result;
        }
        public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
            try {
                this.readWriteLock.readLock().lock();
                MapedFile mapedFile = this.getFirstMapedFile();
    
                if (mapedFile != null) {
                    int index =
                            (int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));
                    if (index < 0 || index >= this.mapedFiles.size()) {
                        logError
                                .warn(
                                        "findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//
                                        offset,//
                                        index,//
                                        this.mapedFileSize,//
                                        this.mapedFiles.size(),//
                                        UtilAll.currentStackTrace());
                    }
    
                    try {
                        return this.mapedFiles.get(index);
                    } catch (Exception e) {
                        if (returnFirstOnNotFound) {
                            return mapedFile;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("findMapedFileByOffset Exception", e);
            } finally {
                this.readWriteLock.readLock().unlock();
            }
    
            return null;
        }
        private MapedFile getFirstMapedFile() {
            if (this.mapedFiles.isEmpty()) {
                return null;
            }
    
            return this.mapedFiles.get(0);
        }

    最终要找到mapedFiles的赋值的方法:

    就是他了!!!

       com.alibaba.rocketmq.store.MapedFileQueue

      public boolean load() { File dir = new File(this.storePath); File[] files = dir.listFiles(); if (files != null) { // ascending order Arrays.sort(files); for (File file : files) { if (file.length() != this.mapedFileSize) { log.warn(file + " " + file.length() + " length not matched message store config value, ignore it"); return true; } try { MapedFile mapedFile = new MapedFile(file.getPath(), mapedFileSize); mapedFile.setWrotePostion(this.mapedFileSize); mapedFile.setCommittedPosition(this.mapedFileSize); this.mapedFiles.add(mapedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true; }

    现在来看看这个MapedFile的构造方法:

    需要注意的是一个内部属性:

    //文件的起始偏移量 该文件的全局offset,也就是文件名的前缀
    private final long fileFromOffset;

        public MapedFile(final String fileName, final int fileSize) throws IOException {
            this.fileName = fileName;
            this.fileSize = fileSize;
            this.file = new File(fileName);
            this.fileFromOffset = Long.parseLong(this.file.getName());
            boolean ok = false;
    
            ensureDirOK(this.file.getParent());
    
            try {
                this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
           this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); TotalMapedVitualMemory.addAndGet(fileSize); TotalMapedFiles.incrementAndGet(); ok
    = true; } catch (FileNotFoundException e) { log.error("create file channel " + this.fileName + " Failed. ", e); throw e; } catch (IOException e) { log.error("map file " + this.fileName + " Failed. ", e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }

    好了回到

     int offset = mapedFile.commit(flushLeastPages)

    接着看:

        /**
         * 消息刷盘
         *
         * @param flushLeastPages
         *            至少刷几个page
         *
         * @return
         */
        public int commit(final int flushLeastPages) {
            //判断当前是否能刷盘
            if (this.isAbleToFlush(flushLeastPages)) {
                //类似于一个智能指针,控制刷盘线程数
                if (this.hold()) {
                    int value = this.wrotePostion.get();
                    //刷盘,内存到硬盘
                    this.mappedByteBuffer.force();
                    this.committedPosition.set(value);
                    //释放智能指针
                    this.release();
                } else {
                    log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
                    this.committedPosition.set(this.wrotePostion.get());
                }
            }
    
            return this.getCommittedPosition();
        }

    到这来说刷盘就算结束了。当然中间跳过了很多,也是不得已的事,争取再抽时间把文章拆分一下。

  • 相关阅读:
    生成前N个自然数随机置换的3个程序
    网络流媒体协议之——RTSP协议
    海思屏幕HAL代码解析
    事件路由
    hi3559v100 sdk中双系统AMP架构的初步了解
    LCD RGB 控制技术 时钟篇(下)【转】
    liteos C++支持(十七)
    liteos MMU(十八)
    APP接口做什么?
    APP如何进行通信的
  • 原文地址:https://www.cnblogs.com/guazi/p/6822939.html
Copyright © 2011-2022 走看看