zoukankan      html  css  js  c++  java
  • RocketMq总结(四) -- 消费者拉取消息

    一 ProcessQueue

      ProcessQueue 是 MessageQueue 在消费端的重现、快照。 PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移 顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后 ProcessQueue中移除。

    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
      消息存储容器 键为消息在 onsumeQueue 中的偏移 MessageExt :消息实体
    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
      消息临时存储容器, 为消息在 ConsumeQueue 中的偏移量, MessageExt 为消息实体,该结构用于处理顺序消息,消息消费线程从 ProcessQueue msgTreeMap 中取出消息前,先将消息临时存储在 consumingMsgOrderlyTreeMap
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
      读写锁,控制多线程并发修改 msgTreeMap consumingMsgOrderlyTreeMap
    private final AtomicLong msgCount = new AtomicLong();
      ProcessQueue 中总消息数
    private volatile long queueOffsetMax = 0L;
    最大的偏移量
    private volatile boolean dropped = false;//该队列是否被丢弃
    private volatile long lastPullTimestamp = System.currentTimeMillis();//上次的拉取时间戳
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();//上次的消费时间戳

    二  DefaultMQPushConsumerImpl#pullMessage

      拉取消息的核心逻辑在网络通信层

    try {
                this.pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion(),
                    pullRequest.getNextOffset(),
                    this.defaultMQPushConsumer.getPullBatchSize(),
                    sysFlag,
                    commitOffsetValue,
                    BROKER_SUSPEND_MAX_TIME_MILLIS,
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                    CommunicationMode.ASYNC,
                    pullCallback
                );

      pullAPIWrapper#pullKernelImpl

    FindBrokerResult findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);

      根据 brokerName BrokerId从MQClientlnstance 中获取 Broker 地址,在整个RocketMQBroker 的部署结构中,相同名称的 Broker 构成主从结构,其 BrokerId 会不

    样,在每次拉取消息后,会给出一个建议,下次拉取从主节点还是从节点拉取。
    public class FindBrokerResult {
        private final String brokerAddr;
        private final boolean slave;
        private final int brokerVersion;

      最终是通过netty的Channel进行发送

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
            final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
            final int opaque = request.getOpaque();
    
            try {
                final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
                this.responseTable.put(opaque, responseFuture);
                final SocketAddress addr = channel.remoteAddress();
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }
    
                        responseTable.remove(opaque);
                        responseFuture.setCause(f.cause());
                        responseFuture.putResponse(null);
                        log.warn("send a request command to channel <" + addr + "> failed.");
                    }
                });
    
                RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
                if (null == responseCommand) {
                    if (responseFuture.isSendRequestOK()) {
                        throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                            responseFuture.getCause());
                    } else {
                        throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                    }
                }
    
                return responseCommand;
            } finally {
                this.responseTable.remove(opaque);
            }
        }

      2 处理返回的消息

      其实看懂了很简单,就是Netty客户端的常见写法

    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
                processMessageReceived(ctx, msg);
            }
        }

      

    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
                .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
                .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        if (nettyClientConfig.isUseTLS()) {
                            if (null != sslContext) {
                                pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                                log.info("Prepend SSL handler");
                            } else {
                                log.warn("Connections are insecure as SSLContext is null!");
                            }
                        }
                        pipeline.addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                            new NettyConnectManageHandler(),
                            new NettyClientHandler());
                    }
                });

      收到消息就是解码的过程,这部分不详细展开了。

      处理拉取到的消息逻辑在发送前的PullCallBack里,主要的代码逻辑是

    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());//先把消息存到ProcessQueue里
                                    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                        pullResult.getMsgFoundList(),
                                        processQueue,
                                        pullRequest.getMessageQueue(),
                                        dispatchToConsume);//把消息包到一个ConsumeRequest里对消费线程池发送消费Request
    
                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                    } else {
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);//再次把拉取请求放到拉取线程池队列里
                                    }

    总结下就是三件事

    1 把消息先缓存在ProcessQueue里

    2 把ProcessQueue包成一个ConsumeRequest,对线程池提交,异步执行

    3 把下一次的拉取请求放入拉取队列

    三 一次拉取拉不到的情况

    =======2021-08-30更新========

      Rocketmq默认开启长轮询模式,如果某一次拉取消息拉不到,则每5s检查一次是否有新的消息到来。最长时间15s,如果到了15s则返回给客户端空结果。

      同时,由于5s一次间隔较长,Rocketmq还有一种机制保证长轮询。那就是每次写commitLog就回去PullRequestHoldService检查下该消息是否有挂起的拉取任务在等待。如果有,构造请求进行拉取

      PullRequestHoldService # run()

    public void run() {
            log.info("{} service started", this.getServiceName());
            while (!this.isStopped()) {
                try {
                    if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                        this.waitForRunning(5 * 1000);//如果开启长轮询,等待5s
                    } else {
                        this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());//不开启长轮询,默认是1s
                    }
    
                    long beginLockTimestamp = this.systemClock.now();
                    this.checkHoldRequest();
                    long costTime = this.systemClock.now() - beginLockTimestamp;
                    if (costTime > 5 * 1000) {
                        log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                    }
                } catch (Throwable e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
    
            log.info("{} service end", this.getServiceName());
        }
    private void checkHoldRequest() {
            for (String key : this.pullRequestTable.keySet()) {//pullRequestTable保存的都是挂起的请求
                String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
                if (2 == kArray.length) {
                    String topic = kArray[0];
                    int queueId = Integer.parseInt(kArray[1]);
                    final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    try {
                        this.notifyMessageArriving(topic, queueId, offset);
                    } catch (Throwable e) {
                        log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                    }
                }
            }
        }
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
            long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
            String key = this.buildKey(topic, queueId);
            ManyPullRequest mpr = this.pullRequestTable.get(key);
            if (mpr != null) {
                List<PullRequest> requestList = mpr.cloneListAndClear();
                if (requestList != null) {
                    List<PullRequest> replayList = new ArrayList<PullRequest>();
    
                    for (PullRequest request : requestList) {
                        long newestOffset = maxOffset;
                        if (newestOffset <= request.getPullFromThisOffset()) {
                            newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                        }
    
                        if (newestOffset > request.getPullFromThisOffset()) {
                            boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                                new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                            // match by bit map, need eval again when properties is not null.
                            if (match && properties != null) {
                                match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                            }
    
                            if (match) {//就是看现在的offset是不是比要拉取的offset大,如果大直接进行拉取
                                try {
                                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                        request.getRequestCommand());
                                } catch (Throwable e) {
                                    log.error("execute request when wakeup failed.", e);
                                }
                                continue;
                            }
                        }

      下面再看下在 DefaultMessageStore中的逻辑

     public void run() {
                DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        Thread.sleep(1);
                        this.doReput();
                    } catch (Exception e) {
                        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                DefaultMessageStore.log.info(this.getServiceName() + " service end");
            }
    private void doReput() {
                if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                    log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                    this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
                }
                for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    
                    if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                        break;
                    }
    
                    SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                    if (result != null) {
                        try {
                            this.reputFromOffset = result.getStartOffset();
    
                            for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                                DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                                int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    
                                if (dispatchRequest.isSuccess()) {
                                    if (size > 0) {
                                        DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                        if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                            DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                        }

      如果写入的size不为0,就调用messageArrivingListener进行通知

      NotifyMessageArrivingListener # arriving

    public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
            long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
            this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
                msgStoreTime, filterBitMap, properties);
        }

      看得到调用的还是 pullRequestHoldService.notifyMessageArriving

  • 相关阅读:
    如何将SLIC集成到ESXi中
    System Board Replacement Notice
    分发器上的会话代理进程控制脚本使用说明
    lib和dll的区别与使用
    vs2017自动生成的#include“stdafx.h”详解及解决方案
    禅定是否一定要打坐,为什么?
    PE文件解析 基础篇
    灵修书籍
    HDU 2546 饭卡(01背包裸题)
    codeforces 767A Snacktower(模拟)
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15125718.html
Copyright © 2011-2022 走看看