zoukankan      html  css  js  c++  java
  • RocketMQ 拉取消息-文件获取

    看完了上一篇的《RocketMQ 拉取消息-通信模块》,请求进入PullMessageProcessor中,接着

    PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中调用了:

     final GetMessageResult getMessageResult =
                    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);

    来从硬盘中获取消息体。

    接着来看看DefaultMessageStore是如何从消息队列中获取消息的:

        /**
         * 获取消息结果
         * 所有的参数都是从requestheader中获取的。也就是说从consumer client端传递过来的。
         * @param group
         * @param topic
         * @param queueId
         * @param offset
         * @param maxMsgNums
         * @param subscriptionData
         * @return
         */
        public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
                                           final SubscriptionData subscriptionData) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so getMessage is forbidden");
                return null;
            }
    
            if (!this.runningFlags.isReadable()) {
                log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
                return null;
            }
    
            long beginTime = this.getSystemClock().now();
    
            // 枚举变量,取消息结果
            GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
            // 当被过滤后,返回下一次开始的Offset
            long nextBeginOffset = offset;
            // 逻辑队列中的最小Offset
            long minOffset = 0;
            // 逻辑队列中的最大Offset
            long maxOffset = 0;
    
            GetMessageResult getResult = new GetMessageResult();
    
    
            final long maxOffsetPy = this.commitLog.getMaxOffset();
    
            //通过topic和queueid查找逻辑队列对象,相当于字典的目录,用来指定消息在物理文件commitlog上的位置
            ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
            if (consumeQueue != null) {
                minOffset = consumeQueue.getMinOffsetInQuque(); // 消费队列 最小队列编号
                maxOffset = consumeQueue.getMaxOffsetInQuque(); // 消费队列 最大队列编号
    
                // 判断 队列位置(offset)
                if (maxOffset == 0) {   // 消费队列无消息
                    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                    nextBeginOffset = nextOffsetCorrection(offset, 0);
                } else if (offset < minOffset) {    // 查询offset 太小!!!这里可一看出offset的意义
                    status = GetMessageStatus.OFFSET_TOO_SMALL;
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else if (offset == maxOffset) {   // 查询offset 超过 消费队列 一个位置
                    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                    nextBeginOffset = nextOffsetCorrection(offset, offset);
                } else if (offset > maxOffset) {    // 查询offset 超过 消费队列 太多(大于一个位置)
                    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                    if (0 == minOffset) {
                        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                    } else {
                        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                    }
                } else {
                    // 获得 映射Buffer结果(MappedFile)
                    SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                    if (bufferConsumeQueue != null) {
                        try {
                            status = GetMessageStatus.NO_MATCHED_MESSAGE;
    
                            long nextPhyFileStartOffset = Long.MIN_VALUE;   // commitLog下一个文件(MappedFile)对应的开始offset。
                            long maxPhyOffsetPulling = 0;   // 消息物理位置拉取到的最大offset
    
                            int i = 0;
                            final int MaxFilterMessageCount = 16000;
                            final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                            // 循环获取 消息位置信息
                            for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
                                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();   // 消息物理位置offset
                                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();   // 消息长度
                                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();   // 消息tagsCode
                                // 设置消息物理位置拉取到的最大offset
                                maxPhyOffsetPulling = offsetPy;
    
                                // 当 offsetPy 小于 nextPhyFileStartOffset 时,意味着对应的 Message 已经移除,所以直接continue,直到可读取的Message。
                                if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                    if (offsetPy < nextPhyFileStartOffset)
                                        continue;
                                }
    
                                // 校验 commitLog 是否需要硬盘,无法全部放在内存
                                boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                                // 此批消息达到上限了    // 是否已经获得足够消息
                                if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                        isInDisk)) {
                                    break;
                                }
    
                                // 消息过滤 // 判断消息是否符合条件 !!!
                                if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                    // 从commitLog获取对应消息ByteBuffer !!!
                                    SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                    
                       // 最中从对应的MappedFile对象中通过位置获取到这个数据,下面把它转换为GetMessageResult对象。
                       if (selectResult != null) {
                                        this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                        getResult.addMessage(selectResult);
                                        status = GetMessageStatus.FOUND;
                                        nextPhyFileStartOffset = Long.MIN_VALUE;
                                    } else {
                                        // 从commitLog无法读取到消息,说明该消息对应的文件(MappedFile)已经删除,计算下一个MappedFile的起始位置
                                        if (getResult.getBufferTotalSize() == 0) {
                                            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                        }
    
    
                                        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                    }
                                } else {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
    
                                    if (log.isDebugEnabled()) {
                                        log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
                                    }
                                }
                            }
    
                            // 统计剩余可拉取消息字节数
                            if (diskFallRecorded) {
                                long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                                brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                            }
    
                            // 计算下次拉取消息的消息队列编号 !!!
                            nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);
    
                            // 根据剩余可拉取消息字节数与内存判断是否建议读取从节点
                            long diff = maxOffsetPy - maxPhyOffsetPulling;
                            long memory = (long) (StoreUtil.TotalPhysicalMemorySize
                                    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                            getResult.setSuggestPullingFromSlave(diff > memory);
                        } finally {
                            // 必须释放资源
                            bufferConsumeQueue.release();
                        }
                    } else {
                        status = GetMessageStatus.OFFSET_FOUND_NULL;
                        nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                                + maxOffset + ", but access logic queue failed.");
                    }
                }
            }
            // 请求的队列Id没有
            else {
                status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            }
            // 统计
            if (GetMessageStatus.FOUND == status) {
                this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
            } else {
                this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
            }
            long eclipseTime = this.getSystemClock().now() - beginTime;
            this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
            // 设置返回结果
            getResult.setStatus(status);
            getResult.setNextBeginOffset(nextBeginOffset);
            getResult.setMaxOffset(maxOffset);
            getResult.setMinOffset(minOffset);
            return getResult;
        }

    先来看一下获取消息队列的方法:

        public ConsumeQueue findConsumeQueue(String topic, int queueId) {
            ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
            if (null == map) {
                ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
                ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
                if (oldMap != null) {
                    map = oldMap;
                } else {
                    map = newMap;
                }
            }
    
            ConsumeQueue logic = map.get(queueId);
            if (null == logic) {
                ConsumeQueue newLogic = new ConsumeQueue(//
                        topic, //
                        queueId, //
                        StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                        this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                        this);
                ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
                if (oldLogic != null) {
                    logic = oldLogic;
                } else {
                    logic = newLogic;
                }
            }
    
            return logic;
        }

     下面看第二个标黄的方法commitlog.getmessage~:

        public SelectMapedBufferResult getMessage(final long offset, final int size) {
            int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
            MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset, (0 == offset ? true : false));
            if (mapedFile != null) {
                int pos = (int) (offset % mapedFileSize);
                SelectMapedBufferResult result = mapedFile.selectMapedBuffer(pos, size);
                return result;
            }
    
            return null;
        }

    标黄的方法findMapedFileByOffset:这个方法在《再说rocketmq消息存储》中有介绍。可以到页面上搜关键词找一下。

        public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
            try {
                this.readWriteLock.readLock().lock();
                MapedFile mapedFile = this.getFirstMapedFile();
    
                if (mapedFile != null) {
                    int index =
                            (int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));
                    if (index < 0 || index >= this.mapedFiles.size()) {
                        logError
                                .warn(
                                        "findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//
                                        offset,//
                                        index,//
                                        this.mapedFileSize,//
                                        this.mapedFiles.size(),//
                                        UtilAll.currentStackTrace());
                    }
    
                    try {
                        return this.mapedFiles.get(index);
                    } catch (Exception e) {
                        if (returnFirstOnNotFound) {
                            return mapedFile;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("findMapedFileByOffset Exception", e);
            } finally {
                this.readWriteLock.readLock().unlock();
            }
    
            return null;
        }

    MappedFile中获取对应位置的数据。

        public SelectMapedBufferResult selectMapedBuffer(int pos, int size) {
    
            if ((pos + size) <= this.wrotePostion.get()) {
    
                if (this.hold()) {
                    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                    byteBuffer.position(pos);
                    ByteBuffer byteBufferNew = byteBuffer.slice();
                    byteBufferNew.limit(size);
                    return new SelectMapedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
                } else {
                    log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                            + this.fileFromOffset);
                }
            }
    
            else {
                log.warn("selectMapedBuffer request pos invalid, request pos: " + pos + ", size: " + size
                        + ", fileFromOffset: " + this.fileFromOffset);
            }
    
    
            return null;
        }

    获取到文件中的消息数据后返回DefaultMessageStore中的方法:

    getResult.addMessage(selectResult)
  • 相关阅读:
    周赛F题 POJ 1458(最长公共子序列)
    HDU 4720 Naive and Silly Muggles 2013年四川省赛题
    HDU 4716 A Computer Graphics Problem 2013年四川省赛题
    SCU 4440 Rectangle 2015年四川省赛题
    SCU 4436 Easy Math 2015年四川省赛题
    大数模板——六种实现了加减乘除和求余
    HDU 1002 A + B Problem II
    CodeForces 689C  Mike and Chocolate Thieves
    CodeForces 689A -Mike and Cellphone
    CodeForces 595B
  • 原文地址:https://www.cnblogs.com/guazi/p/6836112.html
Copyright © 2011-2022 走看看