zoukankan      html  css  js  c++  java
  • RocketMQ(五):server端处理框架及消费数据查找实现

      rocketmq作为一个高性能的消息中间件,咱们光停留在使用层面,总感觉缺点什么。虽然rocketmq的官方设计文档讲得还是比较详细的,但纸上得来终觉浅!今天我们就来亲自挖一挖rocketmq的实现细节:server端处理框架以及如果进行消费消息。

      说白了,就是先走马观花一下,然后就消费消息这个细节看看如何处理就行。

    1. rocketmq的通信模型

      咱们先来个整体部署架构图,有个整体概念。

      上面的图中,producer和consumer属于客户端,虽然也都非常重要。但在服务端的部署中,我们并不需要其参与,所以,只剩下broker和nameserver了。它俩的分工也很明确,nameserver负责管理元数据信息,即哪个topic/msgqueue在哪个broker上,发现broker等等。而broker则是最核心的处理消息的服务,它负责接收客户端的读写请求,保证数据落地以及主从数据同步。而这就是一个消息中件的最本质的功能:数据存储。

      所以,从某种意义上说,类似rocketmq这种的消息中件,我们可以理解为就是一个小型数据库。 

      ok, 回到正题:rocketmq的通信模型是怎么样的呢?大体链路我们可以从如上架构图中得出,但是我们还想说更细点的是:rocketmq是如何实现这些服务组件集群间的通信的?因netty是一个非常高效易用的通信框架,所以netty作为通信服务实现,是再容易不过了。而在使用上,我们更多的,只需要设计出相应的数据格式(协议)进行编解码,然后编写相应的handler用于处理业务就可以了。(可参考前面的netty文章:https://www.cnblogs.com/yougewe/p/13415440.html

      rocketmq的逻辑存储分布大致为:1个mq集群 -> n个topic -> m个queue, 具体消息存储分布图可以简单看作如下图所示:

       其中,通信的建立我们可以概览下,netty Server handler设置大致如下:

        // org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
        @Override
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            prepareSharableHandlers();
    
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    // 编解码器
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    // 业务处理器入口
                                    serverHandler
                                );
                        }
                    });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }

      不出netty意料的处理方法,不必细说。

    2. rocketmq的业务处理方案

      我们通常的业务系统,一般会暴露出各种restful或者其他协议的接口,然后供下游系统调用。同样,rocketmq也会存在各系统间的调用问题,它自然是不会按照各种如springmvc一样的restful接口实现的。为了实现高效的处理,rocketmq使用了一个code标识来代表服务端接到请求后该如何处理的问题。具体是统一封装在一个类中:org.apache.rocketmq.common.protocol.RequestCode 。

    public class RequestCode {
    
        public static final int SEND_MESSAGE = 10;
    
        public static final int PULL_MESSAGE = 11;
    
        public static final int QUERY_MESSAGE = 12;
        public static final int QUERY_BROKER_OFFSET = 13;
        public static final int QUERY_CONSUMER_OFFSET = 14;
        public static final int UPDATE_CONSUMER_OFFSET = 15;
        public static final int UPDATE_AND_CREATE_TOPIC = 17;
        public static final int GET_ALL_TOPIC_CONFIG = 21;
        public static final int GET_TOPIC_CONFIG_LIST = 22;
    
        public static final int GET_TOPIC_NAME_LIST = 23;
    
        public static final int UPDATE_BROKER_CONFIG = 25;
    
        public static final int GET_BROKER_CONFIG = 26;
    
        public static final int TRIGGER_DELETE_FILES = 27;
    
        public static final int GET_BROKER_RUNTIME_INFO = 28;
        public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
        public static final int GET_MAX_OFFSET = 30;
        public static final int GET_MIN_OFFSET = 31;
    
        public static final int GET_EARLIEST_MSG_STORETIME = 32;
    
        public static final int VIEW_MESSAGE_BY_ID = 33;
    
        public static final int HEART_BEAT = 34;
    
        public static final int UNREGISTER_CLIENT = 35;
    
        public static final int CONSUMER_SEND_MSG_BACK = 36;
    
        public static final int END_TRANSACTION = 37;
        public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
    
        public static final int CHECK_TRANSACTION_STATE = 39;
    
        public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
    
        public static final int LOCK_BATCH_MQ = 41;
    
        public static final int UNLOCK_BATCH_MQ = 42;
        public static final int GET_ALL_CONSUMER_OFFSET = 43;
    
        public static final int GET_ALL_DELAY_OFFSET = 45;
    
        public static final int CHECK_CLIENT_CONFIG = 46;
    
        public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
    
        public static final int DELETE_ACL_CONFIG = 51;
    
        public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
    
        public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
    
        public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
    
        public static final int PUT_KV_CONFIG = 100;
    
        public static final int GET_KV_CONFIG = 101;
    
        public static final int DELETE_KV_CONFIG = 102;
    
        public static final int REGISTER_BROKER = 103;
    
        public static final int UNREGISTER_BROKER = 104;
        public static final int GET_ROUTEINFO_BY_TOPIC = 105;
    
        public static final int GET_BROKER_CLUSTER_INFO = 106;
        public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
        public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
        public static final int GET_TOPIC_STATS_INFO = 202;
        public static final int GET_CONSUMER_CONNECTION_LIST = 203;
        public static final int GET_PRODUCER_CONNECTION_LIST = 204;
        public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
    
        public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
    
        public static final int DELETE_SUBSCRIPTIONGROUP = 207;
        public static final int GET_CONSUME_STATS = 208;
    
        public static final int SUSPEND_CONSUMER = 209;
    
        public static final int RESUME_CONSUMER = 210;
        public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
        public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
    
        public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
    
        public static final int WHO_CONSUME_THE_MESSAGE = 214;
    
        public static final int DELETE_TOPIC_IN_BROKER = 215;
    
        public static final int DELETE_TOPIC_IN_NAMESRV = 216;
        public static final int GET_KVLIST_BY_NAMESPACE = 219;
    
        public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
    
        public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
    
        public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
    
        public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
    
        public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
    
        public static final int GET_TOPICS_BY_CLUSTER = 224;
    
        public static final int REGISTER_FILTER_SERVER = 301;
        public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
    
        public static final int QUERY_CONSUME_TIME_SPAN = 303;
    
        public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
        public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
    
        public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
    
        public static final int GET_CONSUMER_RUNNING_INFO = 307;
    
        public static final int QUERY_CORRECTION_OFFSET = 308;
        public static final int CONSUME_MESSAGE_DIRECTLY = 309;
    
        public static final int SEND_MESSAGE_V2 = 310;
    
        public static final int GET_UNIT_TOPIC_LIST = 311;
    
        public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
    
        public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
    
        public static final int CLONE_GROUP_OFFSET = 314;
    
        public static final int VIEW_BROKER_STATS_DATA = 315;
    
        public static final int CLEAN_UNUSED_TOPIC = 316;
    
        public static final int GET_BROKER_CONSUME_STATS = 317;
    
        /**
         * update the config of name server
         */
        public static final int UPDATE_NAMESRV_CONFIG = 318;
    
        /**
         * get config from name server
         */
        public static final int GET_NAMESRV_CONFIG = 319;
    
        public static final int SEND_BATCH_MESSAGE = 320;
    
        public static final int QUERY_CONSUME_QUEUE = 321;
    
        public static final int QUERY_DATA_VERSION = 322;
    
        /**
         * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
         */
        public static final int RESUME_CHECK_HALF_MESSAGE = 323;
    
        public static final int SEND_REPLY_MESSAGE = 324;
    
        public static final int SEND_REPLY_MESSAGE_V2 = 325;
    
        public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
    }

      哎,说实话,虽然大部分意思可以通过表面含义去理解,但是这种潦草的注释,也是够了。总之一句话,需要处理的code,都可以在这里找到。这些code也会在broker启动时注册到某个地方,以便有请求到来时,可能根据code去处理相应业务。

        // org.apache.rocketmq.broker.BrokerController#registerProcessor
        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */
            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
            /**
             * ReplyMessageProcessor
             */
            ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
            replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    
            this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
    
            /**
             * QueryMessageProcessor
             */
            NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
            /**
             * ClientManageProcessor
             */
            ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
            /**
             * ConsumerManageProcessor
             */
            ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
            this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
            /**
             * EndTransactionProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
            this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
            /**
             * Default
             */
            AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
            this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
            this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        }
        // org.apache.rocketmq.remoting.netty.NettyRemotingServer#registerProcessor
        @Override
        public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
            ExecutorService executorThis = executor;
            if (null == executor) {
                executorThis = this.publicExecutor;
            }
    
            Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
            // HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
            // 将code注册到hash表中,备用
            this.processorTable.put(requestCode, pair);
        }

      当真正的请求到来时,同可以直接从处理器表中,根据code直接取出,然后传入数据命令处理即可。

        // org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
        /**
         * Process incoming request command issued by remote peer.
         *
         * @param ctx channel handler context.
         * @param cmd request command.
         */
        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            // 从 processorTable 中获取处理器
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            final int opaque = cmd.getOpaque();
    
            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            final RemotingResponseCallback callback = new RemotingResponseCallback() {
                                @Override
                                public void callback(RemotingCommand response) {
                                    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                                    if (!cmd.isOnewayRPC()) {
                                        if (response != null) {
                                            response.setOpaque(opaque);
                                            response.markResponseType();
                                            try {
                                                ctx.writeAndFlush(response);
                                            } catch (Throwable e) {
                                                log.error("process request over, but response failed", e);
                                                log.error(cmd.toString());
                                                log.error(response.toString());
                                            }
                                        } else {
                                        }
                                    }
                                }
                            };
                            if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                                AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                                processor.asyncProcessRequest(ctx, cmd, callback);
                            } else {
                                NettyRequestProcessor processor = pair.getObject1();
                                RemotingCommand response = processor.processRequest(ctx, cmd);
                                callback.callback(response);
                            }
                        } catch (Throwable e) {
                            log.error("process request exception", e);
                            log.error(cmd.toString());
    
                            if (!cmd.isOnewayRPC()) {
                                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                    RemotingHelper.exceptionSimpleDesc(e));
                                response.setOpaque(opaque);
                                ctx.writeAndFlush(response);
                            }
                        }
                    }
                };
    
                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }
    
                try {
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    if ((System.currentTimeMillis() % 10000) == 0) {
                        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                            + ", too many requests and system thread pool busy, RejectedExecutionException "
                            + pair.getObject2().toString()
                            + " request code: " + cmd.getCode());
                    }
    
                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                            "[OVERLOAD]system busy, start flow control for a while");
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            } else {
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }

      综上,整个rocketmq的code的定义,注册,以及使用,我们就有了完整的理解。了解完这些code,我们也大致可以理解rocketmq能够干多少事了。

      而上面的processRequestCommand()也让我们看到了rmq处理请求的大体框架流程。如此,我们也大体有了概念了。

      如果想对任意一个实现存疑问,则只需要查看其相应的processor的实现即可,从表面看是这样的。是不是有一种任督二脉被打通了的感觉。别急,敬畏之心。

    3. 消息数据的拉取实现

      经过第二节,我们已经有了大体框架概念,现在,我就想了解server是如何给到消息的,请给答案。

      我们回溯下注册过程,发现在 PULL_MESSAGE 注册的是 pullMessageProcessor , 如下:

            /**
             * PullMessageProcessor
             */
            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

      再往前看看 pullMessageProcessor 的具体初始化过程如下:

        // org.apache.rocketmq.broker.BrokerController#BrokerController
        public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        ) {
            this.brokerConfig = brokerConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.nettyClientConfig = nettyClientConfig;
            this.messageStoreConfig = messageStoreConfig;
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            this.topicConfigManager = new TopicConfigManager(this);
            // pullMessageProcessor 实例化
            this.pullMessageProcessor = new PullMessageProcessor(this);
            ...
        }

      所以,要看如何实现消息的拉取,看看 PullMessageProcessor.processRequest() 就知道了。

        // org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
        @Override
        public RemotingCommand processRequest(final ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            return this.processRequest(ctx.channel(), request, true);
        }
        
        private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
            throws RemotingCommandException {
            RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
            final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
            final PullMessageRequestHeader requestHeader =
                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
            // 响应的opaque id与请求一致,以便可回滚上下文
            response.setOpaque(request.getOpaque());
    
            log.debug("receive PullMessage request command, {}", request);
    
            if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
                return response;
            }
            // 消费组有效性检查
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
                return response;
            }
    
            if (!subscriptionGroupConfig.isConsumeEnable()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
                return response;
            }
    
            final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
            final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
            final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
    
            final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
    
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
            if (null == topicConfig) {
                log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
                return response;
            }
    
            if (!PermName.isReadable(topicConfig.getPerm())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
                return response;
            }
            // queueId 检查
            if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
                String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
                    requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
                log.warn(errorInfo);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorInfo);
                return response;
            }
    
            SubscriptionData subscriptionData = null;
            ConsumerFilterData consumerFilterData = null;
            if (hasSubscriptionFlag) {
                try {
                    subscriptionData = FilterAPI.build(
                        requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
                    );
                    if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                        consumerFilterData = ConsumerFilterManager.build(
                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                            requestHeader.getExpressionType(), requestHeader.getSubVersion()
                        );
                        assert consumerFilterData != null;
                    }
                } catch (Exception e) {
                    log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                        requestHeader.getConsumerGroup());
                    response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                    response.setRemark("parse the consumer's subscription failed");
                    return response;
                }
            } else {
                ConsumerGroupInfo consumerGroupInfo =
                    this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
                if (null == consumerGroupInfo) {
                    log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                    response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                    return response;
                }
    
                if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
                    && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                    return response;
                }
    
                subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
                if (null == subscriptionData) {
                    log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                    response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                    return response;
                }
    
                if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                    log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
                        subscriptionData.getSubString());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
                    response.setRemark("the consumer's subscription not latest");
                    return response;
                }
                if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                    consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
                        requestHeader.getConsumerGroup());
                    if (consumerFilterData == null) {
                        response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                        response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                        return response;
                    }
                    if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
                        log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
                            requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                        response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                        response.setRemark("the consumer's consumer filter data not latest");
                        return response;
                    }
                }
            }
    
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
                && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
                return response;
            }
    
            MessageFilter messageFilter;
            if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
                messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                    this.brokerController.getConsumerFilterManager());
            } else {
                messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                    this.brokerController.getConsumerFilterManager());
            }
            // 真实的消息拉取,核心方法
            final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                    requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
            if (getMessageResult != null) {
                response.setRemark(getMessageResult.getStatus().name());
                responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
                responseHeader.setMinOffset(getMessageResult.getMinOffset());
                responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
    
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                } else {
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
    
                switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                    case ASYNC_MASTER:
                    case SYNC_MASTER:
                        break;
                    case SLAVE:
                        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                        }
                        break;
                }
    
                if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                    // consume too slow ,redirect to another machine
                    if (getMessageResult.isSuggestPullingFromSlave()) {
                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                    }
                    // consume ok
                    else {
                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                    }
                } else {
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
                // 设置响应code
                switch (getMessageResult.getStatus()) {
                    case FOUND:
                        response.setCode(ResponseCode.SUCCESS);
                        break;
                    case MESSAGE_WAS_REMOVING:
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        break;
                    case NO_MATCHED_LOGIC_QUEUE:
                    case NO_MESSAGE_IN_QUEUE:
                        if (0 != requestHeader.getQueueOffset()) {
                            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
    
                            // XXX: warn and notify me
                            log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
                                requestHeader.getQueueOffset(),
                                getMessageResult.getNextBeginOffset(),
                                requestHeader.getTopic(),
                                requestHeader.getQueueId(),
                                requestHeader.getConsumerGroup()
                            );
                        } else {
                            response.setCode(ResponseCode.PULL_NOT_FOUND);
                        }
                        break;
                    case NO_MATCHED_MESSAGE:
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        break;
                    case OFFSET_FOUND_NULL:
                        response.setCode(ResponseCode.PULL_NOT_FOUND);
                        break;
                    case OFFSET_OVERFLOW_BADLY:
                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                        // XXX: warn and notify me
                        log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                            requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
                        break;
                    case OFFSET_OVERFLOW_ONE:
                        response.setCode(ResponseCode.PULL_NOT_FOUND);
                        break;
                    case OFFSET_TOO_SMALL:
                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                        log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                            getMessageResult.getMinOffset(), channel.remoteAddress());
                        break;
                    default:
                        assert false;
                        break;
                }
    
                if (this.hasConsumeMessageHook()) {
                    ConsumeMessageContext context = new ConsumeMessageContext();
                    context.setConsumerGroup(requestHeader.getConsumerGroup());
                    context.setTopic(requestHeader.getTopic());
                    context.setQueueId(requestHeader.getQueueId());
    
                    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    
                    switch (response.getCode()) {
                        case ResponseCode.SUCCESS:
                            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                            int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
    
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                            context.setCommercialRcvTimes(incValue);
                            context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                            context.setCommercialOwner(owner);
    
                            break;
                        case ResponseCode.PULL_NOT_FOUND:
                            if (!brokerAllowSuspend) {
    
                                context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                                context.setCommercialRcvTimes(1);
                                context.setCommercialOwner(owner);
    
                            }
                            break;
                        case ResponseCode.PULL_RETRY_IMMEDIATELY:
                        case ResponseCode.PULL_OFFSET_MOVED:
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            context.setCommercialRcvTimes(1);
                            context.setCommercialOwner(owner);
                            break;
                        default:
                            assert false;
                            break;
                    }
    
                    this.executeConsumeMessageHookBefore(context);
                }
    
                switch (response.getCode()) {
                    case ResponseCode.SUCCESS:
    
                        this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            getMessageResult.getMessageCount());
    
                        this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            getMessageResult.getBufferTotalSize());
    
                        this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                        if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                            final long beginTimeMills = this.brokerController.getMessageStore().now();
                            final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                            this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                                requestHeader.getTopic(), requestHeader.getQueueId(),
                                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                            response.setBody(r);
                        } else {
                            try {
                                FileRegion fileRegion =
                                    new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                                channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        getMessageResult.release();
                                        if (!future.isSuccess()) {
                                            log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                                        }
                                    }
                                });
                            } catch (Throwable e) {
                                log.error("transfer many message by pagecache exception", e);
                                getMessageResult.release();
                            }
    
                            response = null;
                        }
                        break;
                    case ResponseCode.PULL_NOT_FOUND:
    
                        if (brokerAllowSuspend && hasSuspendFlag) {
                            long pollingTimeMills = suspendTimeoutMillisLong;
                            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                            }
    
                            String topic = requestHeader.getTopic();
                            long offset = requestHeader.getQueueOffset();
                            int queueId = requestHeader.getQueueId();
                            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                            response = null;
                            break;
                        }
    
                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
                        break;
                    case ResponseCode.PULL_OFFSET_MOVED:
                        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
                            || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                            MessageQueue mq = new MessageQueue();
                            mq.setTopic(requestHeader.getTopic());
                            mq.setQueueId(requestHeader.getQueueId());
                            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
    
                            OffsetMovedEvent event = new OffsetMovedEvent();
                            event.setConsumerGroup(requestHeader.getConsumerGroup());
                            event.setMessageQueue(mq);
                            event.setOffsetRequest(requestHeader.getQueueOffset());
                            event.setOffsetNew(getMessageResult.getNextBeginOffset());
                            this.generateOffsetMovedEvent(event);
                            log.warn(
                                "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
                                responseHeader.getSuggestWhichBrokerId());
                        } else {
                            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                            log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
                                responseHeader.getSuggestWhichBrokerId());
                        }
    
                        break;
                    default:
                        assert false;
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("store getMessage return null");
            }
    
            boolean storeOffsetEnable = brokerAllowSuspend;
            storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
            storeOffsetEnable = storeOffsetEnable
                && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
            if (storeOffsetEnable) {
                this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
            }
            return response;
        }

      代码相当复杂不!实际上还行,主要逻辑都写在这儿了。主要就是各种异常情况的判定,以及获取消息之后的状态码的设定。而拉取消息却只有一行,让我们细看下:

        // org.apache.rocketmq.store.DefaultMessageStore#getMessage
        public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
            final int maxMsgNums,
            final MessageFilter messageFilter) {
            if (this.shutdown) {
                log.warn("message store has shutdown, so getMessage is forbidden");
                return null;
            }
    
            if (!this.runningFlags.isReadable()) {
                log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
                return null;
            }
    
            long beginTime = this.getSystemClock().now();
    
            GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
            long nextBeginOffset = offset;
            long minOffset = 0;
            long maxOffset = 0;
    
            GetMessageResult getResult = new GetMessageResult();
    
            final long maxOffsetPy = this.commitLog.getMaxOffset();
            // 根据topic和queueId获取 consumeQueue
            // 主要就是从 consumeQueueTable 的hashMap中获取或者新创建
            ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
            if (consumeQueue != null) {
                minOffset = consumeQueue.getMinOffsetInQueue();
                maxOffset = consumeQueue.getMaxOffsetInQueue();
    
                if (maxOffset == 0) {
                    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                    nextBeginOffset = nextOffsetCorrection(offset, 0);
                } else if (offset < minOffset) {
                    status = GetMessageStatus.OFFSET_TOO_SMALL;
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else if (offset == maxOffset) {
                    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                    nextBeginOffset = nextOffsetCorrection(offset, offset);
                } else if (offset > maxOffset) {
                    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                    if (0 == minOffset) {
                        nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                    } else {
                        nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                    }
                } else {
                    // 根据offset可获取consumeQueue索引信息
                    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                    if (bufferConsumeQueue != null) {
                        try {
                            status = GetMessageStatus.NO_MATCHED_MESSAGE;
    
                            long nextPhyFileStartOffset = Long.MIN_VALUE;
                            long maxPhyOffsetPulling = 0;
    
                            int i = 0;
                            final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                            // 依次遍历队列文件,查找消息的具体索引信息
                            for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                                // 从comsumeQueue中获取偏移量
                                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                                long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
    
                                maxPhyOffsetPulling = offsetPy;
                                // 是否超出文件偏移过小,快速判定
                                if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                    if (offsetPy < nextPhyFileStartOffset)
                                        continue;
                                }
    
                                boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
    
                                if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                    isInDisk)) {
                                    break;
                                }
    
                                boolean extRet = false, isTagsCodeLegal = true;
                                if (consumeQueue.isExtAddr(tagsCode)) {
                                    extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                    if (extRet) {
                                        tagsCode = cqExtUnit.getTagsCode();
                                    } else {
                                        // can't find ext content.Client will filter messages by tag also.
                                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                            tagsCode, offsetPy, sizePy, topic, group);
                                        isTagsCodeLegal = false;
                                    }
                                }
                                // 消息过滤器处理
                                if (messageFilter != null
                                    && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
    
                                    continue;
                                }
                                // 根据偏移量信息,从commitlog 中获取具体消息数据
                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                if (null == selectResult) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }
    
                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                    continue;
                                }
    
                                if (messageFilter != null
                                    && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
                                    // release...
                                    selectResult.release();
                                    continue;
                                }
    
                                this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                getResult.addMessage(selectResult);
                                status = GetMessageStatus.FOUND;
                                nextPhyFileStartOffset = Long.MIN_VALUE;
                            }
    
                            if (diskFallRecorded) {
                                long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                                brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                            }
    
                            nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    
                            long diff = maxOffsetPy - maxPhyOffsetPulling;
                            long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                            getResult.setSuggestPullingFromSlave(diff > memory);
                        } finally {
    
                            bufferConsumeQueue.release();
                        }
                    } else {
                        status = GetMessageStatus.OFFSET_FOUND_NULL;
                        nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                            + maxOffset + ", but access logic queue failed.");
                    }
                }
            } else {
                status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            }
    
            if (GetMessageStatus.FOUND == status) {
                this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
            } else {
                this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
            }
            long elapsedTime = this.getSystemClock().now() - beginTime;
            this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
    
            getResult.setStatus(status);
            getResult.setNextBeginOffset(nextBeginOffset);
            getResult.setMaxOffset(maxOffset);
            getResult.setMinOffset(minOffset);
            return getResult;
        }
    
        public ConsumeQueue findConsumeQueue(String topic, int queueId) {
            ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
            if (null == map) {
                ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
                ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
                if (oldMap != null) {
                    map = oldMap;
                } else {
                    map = newMap;
                }
            }
    
            ConsumeQueue logic = map.get(queueId);
            if (null == logic) {
                ConsumeQueue newLogic = new ConsumeQueue(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                    this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                    this);
                ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
                if (oldLogic != null) {
                    logic = oldLogic;
                } else {
                    logic = newLogic;
                }
            }
    
            return logic;
        }
        // 以上查找过程,有一个重要的点,即如何根据offset找到索引所在位置?
        // 查找索引所在文件的实现,如下:
        // org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer
        public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
            int mappedFileSize = this.mappedFileSize;
            long offset = startIndex * CQ_STORE_UNIT_SIZE;
            if (offset >= this.getMinLogicOffset()) {
                // 委托给mappedFileQueue进行查找
                MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
                if (mappedFile != null) {
                    SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                    return result;
                }
            }
            return null;
        }
        // org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset
        public MappedFile findMappedFileByOffset(final long offset) {
            return findMappedFileByOffset(offset, false);
        }
        /**
         * Finds a mapped file by offset.
         *
         * @param offset Offset.
         * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
         * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
         */
        public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
            try {
                MappedFile firstMappedFile = this.getFirstMappedFile();
                MappedFile lastMappedFile = this.getLastMappedFile();
                if (firstMappedFile != null && lastMappedFile != null) {
                    // 越界处理
                    if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                            offset,
                            firstMappedFile.getFileFromOffset(),
                            lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                            this.mappedFileSize,
                            this.mappedFiles.size());
                    } else {
                        // 直接根据偏移量和单文件大小,即计算出偏移量所在文件(或者说应该在的文件)
                        int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                        MappedFile targetFile = null;
                        try {
                            targetFile = this.mappedFiles.get(index);
                        } catch (Exception ignored) {
                        }
    
                        if (targetFile != null && offset >= targetFile.getFileFromOffset()
                            && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                            return targetFile;
                        }
                        // 推算不正确?做兜底顺序查找!
                        for (MappedFile tmpMappedFile : this.mappedFiles) {
                            if (offset >= tmpMappedFile.getFileFromOffset()
                                && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                                return tmpMappedFile;
                            }
                        }
                    }
    
                    if (returnFirstOnNotFound) {
                        return firstMappedFile;
                    }
                }
            } catch (Exception e) {
                log.error("findMappedFileByOffset Exception", e);
            }
    
            return null;
        }

      如此,整个查找过程就清晰了:大概就是先查找consumequeue所在的buffer,然后从该buffer中根据offset定位到具体的consumequeue索引记录取出数据偏移及大小信息, 然后根据该索引信息到commitlog文件中取出数据即可, 如果配置了过滤器则通过过滤器验证的数据才为有效数据;

      整个查找过程的时序图大致如下:

      最后,提个问题:根据以上内容,一次pull会拿到多少条消息数据呢?

  • 相关阅读:
    Tasklet机制
    linux 内核与用户空间通信之netlink使用方法
    inline总结与思考
    PF_NETLINK应用实例NETLINK_KOBJECT_UEVENT具体实现--udev实现原理
    2410下DMA驱动源码分析
    [转]数据库建立索引的一般依据
    [转]性能调优的步骤
    [原] JT SQL Server 性能调优札记之二
    [转]SQL Server 2000执行计划成本(5/5)
    [转]SQL Server 2000执行计划成本(3/5)
  • 原文地址:https://www.cnblogs.com/yougewe/p/14092169.html
Copyright © 2011-2022 走看看