zoukankan      html  css  js  c++  java
  • RocketMQ 拉取消息-通信模块

    首先看server端:class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer

    下面这个实现了com.alibaba.rocketmq.remoting.RemotingService中的start方法。启动通信服务端

       @Override
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
                    nettyServerConfig.getServerWorkerThreads(), //
                    new ThreadFactory() {
    
                        private AtomicInteger threadIndex = new AtomicInteger(0);
    
    
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                        }
                    });
    
            ServerBootstrap childHandler = //
                    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
                            //
                            .option(ChannelOption.SO_BACKLOG, 1024)
                            //
                            .option(ChannelOption.SO_REUSEADDR, true)
                            //
                            .option(ChannelOption.SO_KEEPALIVE, false)
                            //
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            //
                            .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                            //
                            .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                            //
                            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel ch) throws Exception {
                                    ch.pipeline().addLast(
                                            //
                                            defaultEventExecutorGroup, //
                                            new NettyEncoder(), //
                                            new NettyDecoder(), //
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
                                            new NettyConnetManageHandler(), //
                                            new NettyServerHandler());
                                }
                            });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecuter.start();
            }
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Exception e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }

    这里重点就是那两个hander了。第一个NettyConnetManageHandler就不必讲了,是关于连接方面的功能。

       class NettyConnetManageHandler extends ChannelDuplexHandler {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
                super.channelRegistered(ctx);
            }
    
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
                super.channelUnregistered(ctx);
            }
    
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
                super.channelActive(ctx);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
                }
            }
    
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
                super.channelInactive(ctx);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
                }
            }
    
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent evnet = (IdleStateEvent) evt;
                    if (evnet.state().equals(IdleState.ALL_IDLE)) {
                        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                        RemotingUtil.closeChannel(ctx.channel());
                        if (NettyRemotingServer.this.channelEventListener != null) {
                            NettyRemotingServer.this
                                    .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
                        }
                    }
                }
    
                ctx.fireUserEventTriggered(evt);
            }
    
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
                log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
                }
    
                RemotingUtil.closeChannel(ctx.channel());
            }

    重点看第二个handler:这个是实现处理消息的业务:NettyServerHandler

        class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
                processMessageReceived(ctx, msg);
            }
        }
    processMessageReceived(ctx, msg)这个方法是RemotingService继承自NettyRemotingAbstract中的方法。
    我们再来看下NettyRemotingAbstract中的这个方法:
        public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
                    default:
                        break;
                }
            }
        }
        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            final int opaque = cmd.getOpaque();
    
            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                            if (rpcHook != null) {
                                rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            }
    
                            final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                            if (rpcHook != null) {
                                rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            }
    
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        plog.error("process request over, but response failed", e);
                                        plog.error(cmd.toString());
                                        plog.error(response.toString());
                                    }
                                } else {
    
                                }
                            }
                        } catch (Throwable e) {
                            if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                                    .equals(e.getClass().getCanonicalName())) {
                                plog.error("process request exception", e);
                                plog.error(cmd.toString());
                            }
    
                            if (!cmd.isOnewayRPC()) {
                                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                                        RemotingHelper.exceptionSimpleDesc(e));
                                response.setOpaque(opaque);
                                ctx.writeAndFlush(response);
                            }
                        }
                    }
                };
    
                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                            "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }
    
                try {
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    if ((System.currentTimeMillis() % 10000) == 0) {
                        plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                                + ", too many requests and system thread pool busy, RejectedExecutionException " //
                                + pair.getObject2().toString() //
                                + " request code: " + cmd.getCode());
                    }
    
                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                                "[OVERLOAD]system busy, start flow control for a while");
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            } else {
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                        RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }

    这里就通过final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode())来选择一processor来调用他的

    processRequest(ctx, cmd)方法。

     我们来看一个拉去消息的processor:PullMessageProcessor implements NettyRequestProcessor

    @Override
        public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
            return this.processRequest(ctx.channel(), request, true);
        }
    
    
    
        private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
                throws RemotingCommandException {
            RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
            final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
            final PullMessageRequestHeader requestHeader =
                    (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    
            // 由于有直接返回的逻辑,所以必须要设置
            response.setOpaque(request.getOpaque());
    
            if (log.isDebugEnabled()) {
                log.debug("receive PullMessage request command, " + request);
            }
    
    
            if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
                return response;
            }
    
    
            SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
                        + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return response;
            }
    
    
            if (!subscriptionGroupConfig.isConsumeEnable()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
                return response;
            }
    
            final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
            final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
            final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
    
            final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
    
    
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
            if (null == topicConfig) {
                log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                response.setRemark(
                        "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                return response;
            }
    
    
            if (!PermName.isReadable(topicConfig.getPerm())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
                return response;
            }
    
    
            if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
                String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
                        + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
                log.warn(errorInfo);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorInfo);
                return response;
            }
    
    
            SubscriptionData subscriptionData = null;
            if (hasSubscriptionFlag) {
                try {
                    subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            requestHeader.getSubscription());
                } catch (Exception e) {
                    log.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
                            requestHeader.getConsumerGroup());
                    response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                    response.setRemark("parse the consumer's subscription failed");
                    return response;
                }
            } else {
                ConsumerGroupInfo consumerGroupInfo =
                        this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
                if (null == consumerGroupInfo) {
                    log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                    response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                    return response;
                }
    
                if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
                        && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                    return response;
                }
    
                subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
                if (null == subscriptionData) {
                    log.warn("the consumer's subscription not exist, group: {}", requestHeader.getConsumerGroup());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
                    response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
                    return response;
                }
    
    
                if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                    log.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
                            subscriptionData.getSubString());
                    response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
                    response.setRemark("the consumer's subscription not latest");
                    return response;
                }
            }
    
            final GetMessageResult getMessageResult =
                    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
            if (getMessageResult != null) {
                response.setRemark(getMessageResult.getStatus().name());
                responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
                responseHeader.setMinOffset(getMessageResult.getMinOffset());
                responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
    
    
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                } else {
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
    
                switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                    case ASYNC_MASTER:
                    case SYNC_MASTER:
                        break;
                    case SLAVE:
                        if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                        }
                        break;
                }
    
                if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                    // consume too slow ,redirect to another machine
                    if (getMessageResult.isSuggestPullingFromSlave()) {
                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                    }
                    // consume ok
                    else {
                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                    }
                } else {
                    responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                }
    
                switch (getMessageResult.getStatus()) {
                    case FOUND:
                        response.setCode(ResponseCode.SUCCESS);
                        break;
                    case MESSAGE_WAS_REMOVING:
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        break;
                    case NO_MATCHED_LOGIC_QUEUE:
                    case NO_MESSAGE_IN_QUEUE:
                        if (0 != requestHeader.getQueueOffset()) {
                            response.setCode(ResponseCode.PULL_OFFSET_MOVED);
    
                            // XXX: warn and notify me
                            log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
                                    requestHeader.getQueueOffset(), //
                                    getMessageResult.getNextBeginOffset(), //
                                    requestHeader.getTopic(), //
                                    requestHeader.getQueueId(), //
                                    requestHeader.getConsumerGroup()//
                            );
                        } else {
                            response.setCode(ResponseCode.PULL_NOT_FOUND);
                        }
                        break;
                    case NO_MATCHED_MESSAGE:
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        break;
                    case OFFSET_FOUND_NULL:
                        response.setCode(ResponseCode.PULL_NOT_FOUND);
                        break;
                    case OFFSET_OVERFLOW_BADLY:
                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                        // XXX: warn and notify me
                        log.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
                                + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
                        break;
                    case OFFSET_OVERFLOW_ONE:
                        response.setCode(ResponseCode.PULL_NOT_FOUND);
                        break;
                    case OFFSET_TOO_SMALL:
                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                        log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                                getMessageResult.getMinOffset(), channel.remoteAddress());
                        break;
                    default:
                        assert false;
                        break;
                }
    
                if (this.hasConsumeMessageHook()) {
                    ConsumeMessageContext context = new ConsumeMessageContext();
                    context.setConsumerGroup(requestHeader.getConsumerGroup());
                    context.setTopic(requestHeader.getTopic());
                    context.setQueueId(requestHeader.getQueueId());
    
                    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    
                    switch (response.getCode()) {
                        case ResponseCode.SUCCESS:
    
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                            context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
                            context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                            context.setCommercialOwner(owner);
    
                            break;
                        case ResponseCode.PULL_NOT_FOUND:
                            if (!brokerAllowSuspend) {
    
    
                                context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                                context.setCommercialRcvTimes(1);
                                context.setCommercialOwner(owner);
    
                            }
                            break;
                        case ResponseCode.PULL_RETRY_IMMEDIATELY:
                        case ResponseCode.PULL_OFFSET_MOVED:
                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            context.setCommercialRcvTimes(1);
                            context.setCommercialOwner(owner);
                            break;
                        default:
                            assert false;
                            break;
                    }
    
                    this.executeConsumeMessageHookBefore(context);
                }
    
                switch (response.getCode()) {
                    case ResponseCode.SUCCESS:
    
                        this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                                getMessageResult.getMessageCount());
    
                        this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                                getMessageResult.getBufferTotalSize());
    
                        this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
    
                        if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                            final long beginTimeMills = this.brokerController.getMessageStore().now();
                            final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                            this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), //
                                    requestHeader.getTopic(), requestHeader.getQueueId(),//
                                    (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                            response.setBody(r);
                        } else {
                            try {
                                FileRegion fileRegion =
                                        new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                                channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        getMessageResult.release();
                                        if (!future.isSuccess()) {
                                            log.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
                                        }
                                    }
                                });
                            } catch (Throwable e) {
                                log.error("transfer many message by pagecache exception", e);
                                getMessageResult.release();
                            }
    
                            response = null;
                        }
                        break;
                    case ResponseCode.PULL_NOT_FOUND:
    
                        if (brokerAllowSuspend && hasSuspendFlag) {
                            long pollingTimeMills = suspendTimeoutMillisLong;
                            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                            }
    
                            String topic = requestHeader.getTopic();
                            long offset = requestHeader.getQueueOffset();
                            int queueId = requestHeader.getQueueId();
                            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                    this.brokerController.getMessageStore().now(), offset, subscriptionData);
                            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                            response = null;
                            break;
                        }
    
    
                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
                        break;
                    case ResponseCode.PULL_OFFSET_MOVED:
                        if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
                                || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                            MessageQueue mq = new MessageQueue();
                            mq.setTopic(requestHeader.getTopic());
                            mq.setQueueId(requestHeader.getQueueId());
                            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
    
                            OffsetMovedEvent event = new OffsetMovedEvent();
                            event.setConsumerGroup(requestHeader.getConsumerGroup());
                            event.setMessageQueue(mq);
                            event.setOffsetRequest(requestHeader.getQueueOffset());
                            event.setOffsetNew(getMessageResult.getNextBeginOffset());
                            this.generateOffsetMovedEvent(event);
                            log.warn(
                                    "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
                                    requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
                                    responseHeader.getSuggestWhichBrokerId());
                        } else {
                            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                            log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
                                    requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
                                    responseHeader.getSuggestWhichBrokerId());
                        }
    
                        break;
                    default:
                        assert false;
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("store getMessage return null");
            }
    
    
            boolean storeOffsetEnable = brokerAllowSuspend;
            storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
            storeOffsetEnable = storeOffsetEnable
                    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
            if (storeOffsetEnable) {
                this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
            }
            return response;
        }
  • 相关阅读:
    php环境配置中各个模块在网站建设中的功能
    PHP+Apache+MySQL+phpMyAdmin在win7系统下的环境配置
    August 17th 2017 Week 33rd Thursday
    August 16th 2017 Week 33rd Wednesday
    August 15th 2017 Week 33rd Tuesday
    August 14th 2017 Week 33rd Monday
    August 13th 2017 Week 33rd Sunday
    August 12th 2017 Week 32nd Saturday
    August 11th 2017 Week 32nd Friday
    August 10th 2017 Week 32nd Thursday
  • 原文地址:https://www.cnblogs.com/guazi/p/6835221.html
Copyright © 2011-2022 走看看