zoukankan      html  css  js  c++  java
  • RocketMq总结(六) -- 顺序消息

    一 消息拉取

    顺序消费和非顺序消费的第一个区别是,拉取消息的时候和非顺序消息有区别

              if (processQueue.isLocked()) {
                    if (!pullRequest.isLockedFirst()) {
                        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                        boolean brokerBusy = offset < pullRequest.getNextOffset();
                        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                            pullRequest, offset, brokerBusy);
                        if (brokerBusy) {
                            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                                pullRequest, offset);
                        }
    
                        pullRequest.setLockedFirst(true);
                        pullRequest.setNextOffset(offset);
                    }
                } else {
                    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                    log.info("pull message later because not locked in broker, {}", pullRequest);
                    return;
                }

    如果processQueue没有被锁住,则延时3s再执行拉取任务。如果锁定成功还要判断是不是第一次执行,如果是第一次执行还要去请求broker计算offset

    二 加锁

      在 ConsumeMessageOrderlyService 启动的时候,就以20s的频率去加锁

    public void start() {
            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        ConsumeMessageOrderlyService.this.lockMQPeriodically();
                    }
                }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }

    三 提交消费任务

      ConsumeRequest 是 ConsumeMessageOrderlyService 的内部类。它和 ConsumeMessageConcurrentlyService 中的 ConsumeRequest实现逻辑是不同的

    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
                synchronized (objLock) {//本地锁
                    if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                            ....
                            
                            long interval = System.currentTimeMillis() - beginTime;
                            if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                                ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                                break;
                            }//超过60s结束本次消费
                            ......
                            final int consumeBatchSize =
                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    
                            List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);//消息不一定是最新的,而是从prcessqueue里取得
                            ......
                             try {
                                    this.processQueue.getLockConsume().lock();
                                    if (this.processQueue.isDropped()) {
                                        log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                            this.messageQueue);
                                        break;
                                    }
    
                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                                } catch (Throwable e) {
                                    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                        RemotingHelper.exceptionSimpleDesc(e),
                                        ConsumeMessageOrderlyService.this.consumerGroup,
                                        msgs,
                                        messageQueue);
                                    hasException = true;
                                } finally {
                                    this.processQueue.getLockConsume().unlock();
                                }

    四 服务端的锁实现

      RebalanceLockManager是服务端的锁实现,主要的方法就是 tryLockBatch
    public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
            final String clientId) {
            Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
            Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
         //以消费者组为单位,也就是说顺序消费一个消费组有多个消费者的话,其实是没意义的
            for (MessageQueue mq : mqs) {
                if (this.isLocked(group, mq, clientId)) {
                    lockedMqs.add(mq);
                } else {
                    notLockedMqs.add(mq);
                }
            }
    
            if (!notLockedMqs.isEmpty()) {//一个消费者的所有messsagequeue必须全部获取锁成功才能返回
                try {
                    this.lock.lockInterruptibly();//本地锁,说明即使不是同一个group彼此也是互斥的
                    try {
                        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                        if (null == groupValue) {
                            groupValue = new ConcurrentHashMap<>(32);
                            this.mqLockTable.put(group, groupValue);
                        }
                
                        for (MessageQueue mq : notLockedMqs) {
                            LockEntry lockEntry = groupValue.get(mq);
                            if (null == lockEntry) {
                                lockEntry = new LockEntry();
                                lockEntry.setClientId(clientId);
                                groupValue.put(mq, lockEntry);
                                log.info(
                                    "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
                                    group,
                                    clientId,
                                    mq);
                            }
    
                            if (lockEntry.isLocked(clientId)) {//如果被同一个客户端锁定,直接返回
                                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                                lockedMqs.add(mq);
                                continue;
                            }
    
                            String oldClientId = lockEntry.getClientId();
                  //如果不是同一个客户端,判断锁过期了也给获取
                            if (lockEntry.isExpired()) {
                                lockEntry.setClientId(clientId);
                                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                                log.warn(
                                    "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
                                    group,
                                    oldClientId,
                                    clientId,
                                    mq);
                                lockedMqs.add(mq);
                                continue;
                            }
    
                            log.warn(
                                "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
                                group,
                                oldClientId,
                                clientId,
                                mq);
                  //最后返回的时候有可能某一个messagequeue没有锁上的 } }
    finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } return lockedMqs; }
  • 相关阅读:
    未能加载文件或程序集“file:///C:Program Files (x86)SAP BusinessObjectsCrystal Reports for .NET Framework 4.0
    iwms后台出现从客户端(ctl00$cphMain$logo="<img src="pic/logo.g...")中检测到有潜在危险的 Request.Form 值。错误解决方法
    Socket层实现系列 — accept()的实现(二)
    Socket层实现系列 — accept()的实现(一)
    Socket层实现系列 — getsockname()和getpeername()的实现
    洛谷1279 字串距离
    poj 2411
    poj 2411
    noip提高组 2010 关押罪犯 (洛谷1525)
    二分图匹配 (匈牙利算法) 洛谷3386
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15130110.html
Copyright © 2011-2022 走看看