一 消息拉取
顺序消费和非顺序消费的第一个区别是,拉取消息的时候和非顺序消息有区别
if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; }
如果processQueue没有被锁住,则延时3s再执行拉取任务。如果锁定成功还要判断是不是第一次执行,如果是第一次执行还要去请求broker计算offset
二 加锁
在 ConsumeMessageOrderlyService 启动的时候,就以20s的频率去加锁
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
三 提交消费任务
ConsumeRequest 是 ConsumeMessageOrderlyService 的内部类。它和 ConsumeMessageConcurrentlyService 中的 ConsumeRequest实现逻辑是不同的
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) {//本地锁 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { .... long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; }//超过60s结束本次消费 ...... final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);//消息不一定是最新的,而是从prcessqueue里取得 ...... try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); }
四 服务端的锁实现
RebalanceLockManager是服务端的锁实现,主要的方法就是 tryLockBatch
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); //以消费者组为单位,也就是说顺序消费一个消费组有多个消费者的话,其实是没意义的 for (MessageQueue mq : mqs) { if (this.isLocked(group, mq, clientId)) { lockedMqs.add(mq); } else { notLockedMqs.add(mq); } } if (!notLockedMqs.isEmpty()) {//一个消费者的所有messsagequeue必须全部获取锁成功才能返回 try { this.lock.lockInterruptibly();//本地锁,说明即使不是同一个group彼此也是互斥的 try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } for (MessageQueue mq : notLockedMqs) { LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", group, clientId, mq); } if (lockEntry.isLocked(clientId)) {//如果被同一个客户端锁定,直接返回 lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); lockedMqs.add(mq); continue; } String oldClientId = lockEntry.getClientId(); //如果不是同一个客户端,判断锁过期了也给获取 if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); lockedMqs.add(mq); continue; } log.warn( "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq);
//最后返回的时候有可能某一个messagequeue没有锁上的 } } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } return lockedMqs; }