zoukankan      html  css  js  c++  java
  • RocketMQ -消费者

    RocketMQ的消费模式分为集群消费和广播消费

    启动机制分两种:pull模式和push模式

    PULL模式

     pull模式相对简单,获取到messaqueue,用户自行遍历,获取的到当前queue的消息进行处理,最后自行管理点位或者上报给broker

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.rocketmq.example.simple;
    
    import org.apache.rocketmq.client.consumer.MQPullConsumer;
    import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
    import org.apache.rocketmq.client.consumer.PullResult;
    import org.apache.rocketmq.client.consumer.PullTaskCallback;
    import org.apache.rocketmq.client.consumer.PullTaskContext;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class PullScheduleService {
    
        public static void main(String[] args) throws MQClientException {
            final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
    
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
            scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
    
                @Override
                public void doPullTask(MessageQueue mq, PullTaskContext context) {
                    MQPullConsumer consumer = context.getPullConsumer();
                    try {
    
                        long offset = consumer.fetchConsumeOffset(mq, false);
                        if (offset < 0)
                            offset = 0;
    
                        PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                        System.out.printf("%s%n", offset + "	" + mq + "	" + pullResult);
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                            case OFFSET_ILLEGAL:
                                break;
                            default:
                                break;
                        }
                        consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
    
                        context.setPullNextDelayTimeMillis(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            scheduleService.start();
        }
    }

    PUSH模式

    启动流程

     上图中一些rebalance机制,消费进度机制,push机制等将在下文中详细讲解。

    public synchronized void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
    
                    // 检查配置
                    this.checkConfig();
    
                    // 拷贝订阅关系  初始化rebalanceImpl中的订阅关系
                    this.copySubscription();
    
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        // 默认消费组名装换为进程ID
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    
                    // 设置重平衡服务参数
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                    // 初始化pull接口包装类
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    // 向Pull接口包装类注册消息过滤器
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                    // 初始化点位管理器
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                // 广播模式下  消费点位存储在本地文件
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                // 集群模式下   使用broker管理的消费点位
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                    }
                    // 加载点位     messagequeue -  offset
                    this.offsetStore.load();
    
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        // 顺序消费
                        this.consumeOrderly = true;
                        this.consumeMessageService =
                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        // 并行消费
                        this.consumeOrderly = false;
                        this.consumeMessageService =
                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    }
    
                    // 启动consume服务
                    this.consumeMessageService.start();
    
                    // 本地注册消费者启动client实例
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
    
                    // 启动client实例
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                        + this.serviceState//
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            // 更新topic路由信息
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            //  消费者检查
            this.mQClientFactory.checkClientInBroker();
            // 发送心跳给broker
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            // 立即重平衡
            this.mQClientFactory.rebalanceImmediately();
        }
        public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            // 远程获取namesrvaddr
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        // 启动通信模块服务
                        this.mQClientAPIImpl.start();
                        // Start various schedule tasks
                        // 启动各种定时任务
                        this.startScheduledTask();
                        // Start pull service
                        // 开始拉取服务
                        this.pullMessageService.start();
                        // Start rebalance service
                        // 启动重平衡服务
                        this.rebalanceService.start();
                        // Start push service
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }

    下面介绍一下整个流程,先认识一下下面这几个组件

    pullMessageService   向broker拉取采用长轮询机制拉取数据

    rebalanceService 重平衡策略   

    consumeMessageService    数据消费服务

    整体过程:

    1)rebalance服务向pullMessageService  的pullRequestQueue里面存放pullRequest

    2)pullMessageService 拉取pullRequest并请求broker 拉取数据  (在长轮询中详细介绍)

    3)拉取到的数据使用pullback回调,数据包装成ConsumeRequest任务,并交给consumeExecutor线程池来执行消息通知

    4)消费失败的消息 返回broker重试,如果发回broker失败,则延迟放入pullRequestQueue,延迟拉取。

    rebalance机制

     对于push机制使用RebalancePushImpl类,在启动的时候  会调用this.rebalanceService.start();之后再调用this.mQClientFactory.rebalanceImmediately();

    实际上会在20s定时把所有消费组执行dorebalance. 

        public void doRebalance() {
            /**
             * 查询当前clientId对应的全部消费者组,全部执行一次Rebalance
             */
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
        public void doRebalance(final boolean isOrder) {
            // 获取当前消费者全部订阅关系中的Topic
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        // 循环对每一个topic进行rebalance
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
    
            // 将不属于当前消费者的队列删除
            this.truncateMessageQueueNotMyTopic();
        }
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                case BROADCASTING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    if (mqSet != null) {
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, mqSet);
                            log.info("messageQueueChanged {} {} {} {}", //
                                consumerGroup, //
                                topic, //
                                mqSet, //
                                mqSet);
                        }
                    } else {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                    break;
                }
                case CLUSTERING: {
                    // 获取当前Topic中全部的MessageQueue和该Topic的所有消费者的clientId
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    // 从broker上找到当前组下的消费者ID  broker上的数据是来自于consumer 启动的时候心跳数据
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
    
                    /**
                     * 要解决cidall 如何分布在不同的MessageQueue上
                     * 一共有5中策略
                     * AllocateMessageQueueAveragely:平均分配,默认
                     *
                     * AllocateMessageQueueAveragelyByCircle:循环分配
                     *
                     * AllocateMessageQueueConsistentHash:一致性哈希
                     *
                     * AllocateMessageQueueByConfig:根据配置进行分配
                     *
                     * AllocateMessageQueueByMachineRoom:根据机房
                     *
                     * AllocateMachineRoomNearby:就近分配
                     *
                     */
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
                        // 保证在多个node上看到一样的试图,这样rebalance的结果是一样的
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                        List<MessageQueue> allocateResult = null;
                        try {
                            allocateResult = strategy.allocate(//
                                this.consumerGroup, //
                                this.mQClientFactory.getClientId(), //
                                mqAll, //
                                cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        // 重新分配后  可能存在当前的消费者去pull一个新的messagequeue,也可能不在pull一个messagequeue,那么就需要清理或者新增一个ProcessQueue
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(
                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                                allocateResultSet.size(), allocateResultSet);
                           // 当rebalance后发生变化   更新topic订阅信息  并通知给broker
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }

    updateProcessQueueTableInRebalance方法里面this.dispatchPullRequest(pullRequestList);接下来会执行

    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

    实际上在PullMessageService里面的pullRequestQueue里put一个pullRequest,这样就可以对一个新的messagequeue去broker上pull数据了。

        public void executePullRequestImmediately(final PullRequest pullRequest) {
            try {
                this.pullRequestQueue.put(pullRequest);
            } catch (InterruptedException e) {
                log.error("executePullRequestImmediately pullRequestQueue.put", e);
            }
        }

    接下来consumer在启动的最后调用了

     public void rebalanceImmediately() {
            this.rebalanceService.wakeup();
        }

    目的是把20s的等待过程中唤醒,来执行一次rebalance,在此之前已经完成当前消费者实例发送心跳并向broker上报完订阅信息。

    至于5个负载均衡策略,这里面不再具体阐述,读者可以自行查看源码。

    顺序消息和并行消息

    对于顺序消息,无法做到全局有序消费,但是可以做到分区有序,

    pullMessageServcice#pullMessage中,对于收到的broker响应,使用pullCallback处理,当FOUND的时候,会把消息put到ProcessQueue的treeMap中,然后使用ConsumeRequest包装起来,交给线程池处理。这里主要讲解上图的第6步开始。

    对于顺序消息的消费,主要的实现在ConsumeMessageOrderlyService,首先对messagequeue加锁,对于没有在broker上锁住的情况,则挂起一段时间不断得执行第6步,直到可处理。之后再processqueue中拉取消息,执行消费者注册的messageListener,并对消费结果处理,如果消费失败则不断地执行第6步,这就可能造成后续message无法消费的问题。

    public void run() {
                if (this.processQueue.isDropped()) {
                    log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
    
                // 获取messageQueue的监视器对象
                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {  // 加锁
                    // 如果是广播模式  获取说有序消费的时候  broker上对于的messagequeue已经被当前的消费者lock了
                    // 那么就可以执行有序消费
                    // processqueue设置锁状态为了在rebalance的时候,上一个消费者可能对已经消费完的数据没有及时通知offset到broker,
                    // 导致下一个消费者消费到重复数据,加锁的目的就是为了保证rebalance过程中,在broker上被有序拉取
                    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                        final long beginTime = System.currentTimeMillis();
                        for (boolean continueConsume = true; continueConsume; ) {
                            if (this.processQueue.isDropped()) {
                                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                                && !this.processQueue.isLocked()) {
                                log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                                break;
                            }
    
                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                                && this.processQueue.isLockExpired()) {
                                log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                                break;
                            }
    
                            long interval = System.currentTimeMillis() - beginTime;
                            if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                                ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                                break;
                            }
    
                            final int consumeBatchSize =
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    
                            // 从processQueue拉取message
                            List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                            if (!msgs.isEmpty()) {
                                final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
    
                                ConsumeOrderlyStatus status = null;
    
                                ConsumeMessageContext consumeMessageContext = null;
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                    consumeMessageContext = new ConsumeMessageContext();
                                    consumeMessageContext
                                        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                    consumeMessageContext.setMq(messageQueue);
                                    consumeMessageContext.setMsgList(msgs);
                                    consumeMessageContext.setSuccess(false);
                                    // init the consume context type
                                    consumeMessageContext.setProps(new HashMap<String, String>());
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                                }
    
                                long beginTimestamp = System.currentTimeMillis();
                                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                                boolean hasException = false;
                                try {
                                    this.processQueue.getLockConsume().lock();
                                    if (this.processQueue.isDropped()) {
                                        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                            this.messageQueue);
                                        break;
                                    }
    
                                    // 执行client注册的messageListener
                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                } catch (Throwable e) {
                                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
                                        RemotingHelper.exceptionSimpleDesc(e), //
                                        ConsumeMessageOrderlyService.this.consumerGroup, //
                                        msgs, //
                                        messageQueue);
                                    hasException = true;
                                } finally {
                                    this.processQueue.getLockConsume().unlock();
                                }
    
                                if (null == status //
                                    || ConsumeOrderlyStatus.ROLLBACK == status//
                                    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                    log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
                                        ConsumeMessageOrderlyService.this.consumerGroup, //
                                        msgs, //
                                        messageQueue);
                                }
    
                                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                                if (null == status) {
                                    if (hasException) {
                                        returnType = ConsumeReturnType.EXCEPTION;
                                    } else {
                                        returnType = ConsumeReturnType.RETURNNULL;
                                    }
                                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                                    returnType = ConsumeReturnType.TIME_OUT;
                                } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                    returnType = ConsumeReturnType.FAILED;
                                } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                                    returnType = ConsumeReturnType.SUCCESS;
                                }
    
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                    consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                                }
    
                                if (null == status) {
                                    status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                                }
    
                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                    consumeMessageContext.setStatus(status.toString());
                                    consumeMessageContext
                                        .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                    ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                                }
    
                                ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                    .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
    
                                // 消费结果处理
                                continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                            } else {
                                continueConsume = false;
                            }
                        }
                    } else {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            return;
                        }
    
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                    }
                }
            }
    
        }
    View Code

    对于并行消费,获取到的消息数量,如果超过了ConsumeMessageBatchMaxSize数量或者触发了线程池拒绝策略,则将这些多出的ConsumeRequest,使用延迟消费方式,延迟时间为5s,然后再本地消费。

    再ConsumeMessageConcurrentlyService中,消息消费结果的处理,如果是消费成功,只需要做些状态的统计,而对于延迟重消费的消息,会发送给broker ack,对于ack失败的消息本地再延迟5s重消费,broker端会对针对消息重试次数来保存在延迟队列,如果超过16次,则发送到死信队列。最后consumer都会同步消费进度到broker。

    public void processConsumeResult(//
            final ConsumeConcurrentlyStatus status, //
            final ConsumeConcurrentlyContext context, //
            final ConsumeRequest consumeRequest//
        ) {
            int ackIndex = context.getAckIndex();
    
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            switch (status) {
                case CONSUME_SUCCESS:
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    int ok = ackIndex + 1;
                    int failed = consumeRequest.getMsgs().size() - ok;
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                    break;
                case RECONSUME_LATER:
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                    break;
                default:
                    break;
            }
    
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    // 广播模式下   消费失败的消息不作处理
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING:
                    // 集群模式下   对消费失败的消息返回给broker ack,如果某些消息返回失败ack出错,则 过滤掉出错的message后,把消费成功和ack成功的消费进度返回给broker
                    //  对失败消息且ack返回失败的,5s后再次让消费线程处理
                    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        // 发送ack  在broker端会对msg的重试次数+1
                        boolean result = this.sendMessageBack(msg, context);
                        if (!result) {
                            //ack失败 对于消费重试次数+1
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            msgBackFailed.add(msg);
                        }
                    }
    
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
    
                        // 再次本地消费处理
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
    
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }

    消费进度保存机制

     广播模式下,消费点位保存在本地文件,而在集群模式下,会保存在broker,刚开始的时候通过rebalance从broker获取到消费进度保存到内存。之后定时把消费进度保存文件或者发送broker。

    // 初始化点位管理器
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                // 广播模式下  消费点位存储在本地文件
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                // 集群模式下   使用broker管理的消费点位
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                    }
                    // 加载点位     messagequeue -  offset
                    this.offsetStore.load();

    读取初始化点位,在rebalance里面有这么一段代码  long nextOffset = this.computePullFromWhere(mq);

        @Override
        public long computePullFromWhere(MessageQueue mq) {
            long result = -1;
            final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
            final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
            switch (consumeFromWhere) {
                case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
                case CONSUME_FROM_MIN_OFFSET:
                case CONSUME_FROM_MAX_OFFSET:
                    // 默认是从broker上最新的消费点位开始消费
                case CONSUME_FROM_LAST_OFFSET: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    }
                    // First start,no offset
                    // 第一次启动  没有offset  则获取到messagequeue在broker上的最大的offset作为消费起始点位
                    else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            result = 0L;
                        } else {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
                // 从第一个点位开始消费
                case CONSUME_FROM_FIRST_OFFSET: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        result = 0L;
                    } else {
                        result = -1;
                    }
                    break;
                }
                // 根据时间戳开始消费
                case CONSUME_FROM_TIMESTAMP: {
                    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                    if (lastOffset >= 0) {
                        result = lastOffset;
                    } else if (-1 == lastOffset) {
                        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            try {
                                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        } else {
                            try {
                                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                    UtilAll.YYYYMMDDHHMMSS).getTime();
                                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                            } catch (MQClientException e) {
                                result = -1;
                            }
                        }
                    } else {
                        result = -1;
                    }
                    break;
                }
    
                default:
                    break;
            }
    
            return result;
        }

    而在广播模式下 默认都是第一个位置开始拉取

        @Override
        public long computePullFromWhere(MessageQueue mq) {
            return 0;
        }

    消息过滤

    在broker#ClientManageProcessor#heartBeat里面,处理来自client的心跳数据,其中内部会向消费过过滤器注册订阅  this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);

    public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
            /**
             * 每一个订阅信息单独注册
             */
            for (SubscriptionData subscriptionData : subList) {
                register(
                    subscriptionData.getTopic(),
                    consumerGroup,
                    subscriptionData.getSubString(),
                    subscriptionData.getExpressionType(),
                    subscriptionData.getSubVersion()
                );
            }
    
            // make illegal topic dead.
            // 对于该组不在订阅的topic  删除其过滤器
            Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
    
            Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
            while (iterator.hasNext()) {
                ConsumerFilterData filterData = iterator.next();
    
                boolean exist = false;
                for (SubscriptionData subscriptionData : subList) {
                    if (subscriptionData.getTopic().equals(filterData.getTopic())) {
                        exist = true;
                        break;
                    }
                }
    
                if (!exist && !filterData.isDead()) {
                    filterData.setDeadTime(System.currentTimeMillis());
                    log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
                }
            }
        }
    public boolean register(final String topic, final String consumerGroup, final String expression,
                                final String type, final long clientVersion) {
            if (ExpressionType.isTagType(type)) {
                return false;
            }
    
            if (expression == null || expression.length() == 0) {
                return false;
            }
    
            FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
    
            if (filterDataMapByTopic == null) {
                FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
                FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
                filterDataMapByTopic = prev != null ? prev : temp;
            }
    
            // 生成BloomFilterData  保存到该组的消费过滤数据中
            BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
    
            return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
        }

    之后dispatch到consumequeue之前,需要执行CommitLogDispatcherCalcBitMap的dispatch方法,里面对bitset进行设置,需要使用bloomfilter的hashTo方法,并把filterBitMap保存在DispatchRequest中

                // eval true
                    if (ret != null && ret instanceof Boolean && (Boolean) ret) {
                        consumerFilterManager.getBloomFilter().hashTo(
                            filterData.getBloomFilterData(),
                            filterBitMap
                        );
                    }
                }
    
                request.setBitMap(filterBitMap.bytes());

    至此介绍完了生成filterData以及对filterBitMap完成初始化,而过滤是在pullmessageProcessor中查询消息的时候对consumequeue做tag或者bloom过滤,然后拿到物理点位之后对commitog做sql过滤,接下来看具体实现

    MessageFilter messageFilter;
            if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
                messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                    this.brokerController.getConsumerFilterManager());
            } else {
                // 默认是不支持重试topic的过滤
                messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                    this.brokerController.getConsumerFilterManager());
            }
    messageFilter将会在后续从comsumequeue中读取和commitlog中读取中用于数据的过滤 具体实现在ExpressionMessageFilter中
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.rocketmq.broker.filter;
    
    import org.apache.rocketmq.common.constant.LoggerName;
    import org.apache.rocketmq.common.filter.ExpressionType;
    import org.apache.rocketmq.common.message.MessageDecoder;
    import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
    import org.apache.rocketmq.filter.util.BitsArray;
    import org.apache.rocketmq.filter.util.BloomFilter;
    import org.apache.rocketmq.store.ConsumeQueueExt;
    import org.apache.rocketmq.store.MessageFilter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    public class ExpressionMessageFilter implements MessageFilter {
    
        protected static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
    
    
        // pull请求中有订阅关系的话如tag sql92  由请求数据构建出来的subscriptionData
        // 对于没有订阅关在在pull请求中的话   则从broker内部根据topic来找订阅关系  往往这个时候会有使用到布隆过滤器
        protected final SubscriptionData subscriptionData;
        protected final ConsumerFilterData consumerFilterData;
        protected final ConsumerFilterManager consumerFilterManager;
        protected final boolean bloomDataValid;
    
        public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
                                       ConsumerFilterManager consumerFilterManager) {
            this.subscriptionData = subscriptionData;
            this.consumerFilterData = consumerFilterData;
            this.consumerFilterManager = consumerFilterManager;
            if (consumerFilterData == null) {
                bloomDataValid = false;
                return;
            }
            BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
            if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
                bloomDataValid = true;
            } else {
                bloomDataValid = false;
            }
        }
    
        /**
         * cqExtUnit 会在dispatch到consumerqueue的时候,会通知数据到达的时候  会创建一个ConsumeQueueExt.CqExtUnit对象
         * @param tagsCode         tagsCode
         * @param cqExtUnit        extend unit of consume queue
         * @return
         */
        @Override
        public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            if (null == subscriptionData) {
                return true;
            }
    
            if (subscriptionData.isClassFilterMode()) {
                return true;
            }
    
            // by tags code.
            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
    
                if (tagsCode == null || tagsCode < 0L) {
                    return true;
                }
    
                if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                    return true;
                }
    
                return subscriptionData.getCodeSet().contains(tagsCode.intValue());
            } else {
                // no expression or no bloom
                if (consumerFilterData == null || consumerFilterData.getExpression() == null
                    || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
                    return true;
                }
    
                // message is before consumer
                if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
                    log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
                    return true;
                }
    
                byte[] filterBitMap = cqExtUnit.getFilterBitMap();
                BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
                if (filterBitMap == null || !this.bloomDataValid
                    || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
                    return true;
                }
    
                BitsArray bitsArray = null;
                try {
                    bitsArray = BitsArray.create(filterBitMap);
                    boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
                    log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
                    return ret;
                } catch (Throwable e) {
                    log.error("bloom filter error, sub=" + subscriptionData
                        + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
                }
            }
    
            return true;
        }
    
        @Override
        public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
            if (subscriptionData == null) {
                return true;
            }
    
            if (subscriptionData.isClassFilterMode()) {
                return true;
            }
    
            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                return true;
            }
    
            ConsumerFilterData realFilterData = this.consumerFilterData;
            Map<String, String> tempProperties = properties;
    
            // no expression
            if (realFilterData == null || realFilterData.getExpression() == null
                || realFilterData.getCompiledExpression() == null) {
                return true;
            }
    
            if (tempProperties == null && msgBuffer != null) {
                tempProperties = MessageDecoder.decodeProperties(msgBuffer);
            }
    
            Object ret = null;
            try {
                MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
    
                ret = realFilterData.getCompiledExpression().evaluate(context);
            } catch (Throwable e) {
                log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
            }
    
            log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
    
            if (ret == null || !(ret instanceof Boolean)) {
                return false;
            }
    
            return (Boolean) ret;
        }
    
    }

    长轮询

    在PullMessageService的pullMessage中回向broker发送pullRequest

    try {
                this.pullAPIWrapper.pullKernelImpl(//
                    pullRequest.getMessageQueue(), // 1
                    subExpression, // 2
                    subscriptionData.getExpressionType(), // 3
                    subscriptionData.getSubVersion(), // 4
                    pullRequest.getNextOffset(), // 5
                    this.defaultMQPushConsumer.getPullBatchSize(), // 6
                    sysFlag, // 7
                    commitOffsetValue, // 8
                    BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
                    CommunicationMode.ASYNC, // 11
                    pullCallback // 12
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }

    接下来看一下broker端是如何处理的,PullMessageProcessor#processRequest 代码片段,对于在broker的commitlog和consumerqueue中没有找到对应的消息情况下,broker会把consumer的拉取消息请求放到PullRequestHoldService队列中,然后有个定时任务不断去检测请求是否超时,或者已经接收到producer的消息,或者来dispatch的时候  被唤醒。具体可以看一下PullRequestHoldService#checkHoldRequest

        private void checkHoldRequest() {
            for (String key : this.pullRequestTable.keySet()) {
                String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
                if (2 == kArray.length) {
                    String topic = kArray[0];
                    int queueId = Integer.parseInt(kArray[1]);
                    final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                    try {
                        // 消息到达通知
                        this.notifyMessageArriving(topic, queueId, offset);
                    } catch (Throwable e) {
                        log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                    }
                }
            }
        }
    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
                                          long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
            String key = this.buildKey(topic, queueId);
            ManyPullRequest mpr = this.pullRequestTable.get(key);
            if (mpr != null) {
                // clone  并且 remove
                List<PullRequest> requestList = mpr.cloneListAndClear();
                if (requestList != null) {
                    List<PullRequest> replayList = new ArrayList<PullRequest>();
    
                    // 遍历请求
                    for (PullRequest request : requestList) {
                        long newestOffset = maxOffset;
                        if (newestOffset <= request.getPullFromThisOffset()) {
                            newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                        }
    
                        if (newestOffset > request.getPullFromThisOffset()) {
                            // comsumequeue 过滤
                            boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                                new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                            // match by bit map, need eval again when properties is not null.
                            if (match && properties != null) {
                                match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                            }
    
                            if (match) {
                                try {
                                    // 将pullMessageProcessor的处理流程再执行一遍
                                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                        request.getRequestCommand());
                                } catch (Throwable e) {
                                    log.error("execute request when wakeup failed.", e);
                                }
                                continue;
                            }
                        }
    
                        if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                            try {
                                // pullRequest超时之后   返回给消费者
                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                    request.getRequestCommand());
                            } catch (Throwable e) {
                                log.error("execute request when wakeup failed.", e);
                            }
                            continue;
                        }
    
                        // 如果没有找到消息 并且没有超时  则继续放到队列中等待被唤醒
                        replayList.add(request);
                    }
    
                    if (!replayList.isEmpty()) {
                        mpr.addPullRequest(replayList);
                    }
                }
            }
        }

     在PullMessagProcessor中,有一个优化点,那就是使用Netty的零拷贝技术,实现filechannel到socket的数据零拷贝

                     if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { // 使用堆内存
                            final long beginTimeMills = this.brokerController.getMessageStore().now();
                            final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                            this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                                requestHeader.getTopic(), requestHeader.getQueueId(),
                                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                            response.setBody(r);
                        } else {
                            try {
                                // netty zero-copy
                                FileRegion fileRegion =
                                    new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                                channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                                    @Override
                                    public void operationComplete(ChannelFuture future) throws Exception {
                                        getMessageResult.release();
                                        if (!future.isSuccess()) {
                                            log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                                        }
                                    }
                                });
                            } catch (Throwable e) {
                                log.error("transfer many message by pagecache exception", e);
                                getMessageResult.release();
                            }
    
                            response = null;
                        }
                        break;
    

      

    
    

    参考

    https://cloud.tencent.com/developer/article/1645910

    https://www.cnblogs.com/allenwas3/p/12761383.html

  • 相关阅读:
    如何在mysql下建立数据库?并设置密码
    JVM(Java虚拟机)优化大全和案例实战
    Java性能优化之JVM GC(垃圾回收机制)
    百度地图坐标系和火星坐标系之间的互相转换
    Dealloc 在哪个线程执行
    iOS夯实:ARC时代的内存管理
    iOS夯实:内存管理
    iOS 7 使用导航控制器后有关根视图高度及位置的那些事
    Xcode6之找回之前的闪屏
    iOS 关于tableView cell的分割线的一些设置
  • 原文地址:https://www.cnblogs.com/gaojy/p/15077192.html
Copyright © 2011-2022 走看看