Broker CommitLog 索引机制
绝大部分存储组件都有索引机制,RocketMQ 也一样,有巨量堆积能力的同时,通过索引可以加快读取和查询。
一、索引的数据结构:
索引,是为增加查询速度而设计的一种数据结构。在 RocketMQ 中也是以文件形式保存在 Broker 中的。
Broker中有2种索引:
-
- Consumer Queue
- Index File
第一种,Consumer Queue:消费队列,主要用于消费拉取消息、更新消费位点等所用的索引。文件内保存了消息的物理位点、消息体大小、消息Tag的Hash值。
物理位点:消息在 CommitLog 中的位点值。
消息体大小:包含消息 Topic 值大小、CRC 值大小、消息体大小等全部数据的总大小,单位是字节。
Tag 的 Hash 值:由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreMessageExtBrokerInner.java 方法计算得来。如果消息有 Tag 值,那么该值可以通过 String 的 Hashcode 获得。
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.rocketmq.store; 18 19 import org.apache.rocketmq.common.TopicFilterType; 20 import org.apache.rocketmq.common.message.MessageExt; 21 22 public class MessageExtBrokerInner extends MessageExt { 23 private static final long serialVersionUID = 7256001576878700634L; 24 private String propertiesString; 25 private long tagsCode; 26 27 public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { 28 if (null == tags || tags.length() == 0) { return 0; } 29 30 return tags.hashCode(); 31 } 32 33 public static long tagsString2tagsCode(final String tags) { 34 return tagsString2tagsCode(null, tags); 35 } 36 37 public String getPropertiesString() { 38 return propertiesString; 39 } 40 41 public void setPropertiesString(String propertiesString) { 42 this.propertiesString = propertiesString; 43 } 44 45 public long getTagsCode() { 46 return tagsCode; 47 } 48 49 public void setTagsCode(long tagsCode) { 50 this.tagsCode = tagsCode; 51 } 52 }
第二种,Index File:是一个 RocketMQ 实现的 Hash 索引,主要在用户用消息 key 查询时使用,该索引是通过 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreindexIndexFile.java 中 public class IndexFile{} 类实现的。
在 RocketMQ 中同时存在多个 Index File 文件,这些文件按照消息生产的时间顺序排序,每个Index File 文件包含文件头、Hash槽位、索引数据。每个文件的 Hash 槽位个数、索引数据个数都是固定的。Hash 槽位可以通过 Broker 启动参数 maxHashSlotNum 进行配置,默认值为 500 万。索引数据可以通过 Broker 启动参数 maxIndexNum 进行配置,默认值为 500万*4=2000万,一个Index File 约为400MB。
Index File 的索引设计在一定程度上参考了 Java的 HashMap 设计,只是当 Index file 遇到 Hash 碰撞时只会用链表,而 Java 8 中在一定情况下链表会转化为 红黑树。
具体 Index File 的 Hash 槽和索引数据之间是如何处理 Hash 碰撞的呢?
在 Hash 碰撞时,Hash 槽位中保存的总是最新消息的指针,这是因为在消息队列中,用户最关心的总是最新的数据。
二、索引的构建过程:
2.1 创建 Consume Queue 和 Index File
Consume Queue 和 Index File 两个索引都是由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java ,代码如下:
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.rocketmq.store; 18 19 import java.io.File; 20 import java.io.IOException; 21 import java.io.RandomAccessFile; 22 import java.net.Inet6Address; 23 import java.net.InetSocketAddress; 24 import java.net.SocketAddress; 25 import java.nio.ByteBuffer; 26 import java.nio.channels.FileLock; 27 import java.util.Collections; 28 import java.util.HashMap; 29 import java.util.Iterator; 30 import java.util.LinkedList; 31 import java.util.Map; 32 import java.util.Map.Entry; 33 import java.util.Set; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.ConcurrentMap; 37 import java.util.concurrent.Executors; 38 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.atomic.AtomicLong; 41 import org.apache.rocketmq.common.BrokerConfig; 42 import org.apache.rocketmq.common.MixAll; 43 import org.apache.rocketmq.common.ServiceThread; 44 import org.apache.rocketmq.common.SystemClock; 45 import org.apache.rocketmq.common.ThreadFactoryImpl; 46 import org.apache.rocketmq.common.UtilAll; 47 import org.apache.rocketmq.common.constant.LoggerName; 48 import org.apache.rocketmq.common.message.MessageDecoder; 49 import org.apache.rocketmq.common.message.MessageExt; 50 import org.apache.rocketmq.common.message.MessageExtBatch; 51 import org.apache.rocketmq.common.running.RunningStats; 52 import org.apache.rocketmq.common.sysflag.MessageSysFlag; 53 import org.apache.rocketmq.common.topic.TopicValidator; 54 import org.apache.rocketmq.logging.InternalLogger; 55 import org.apache.rocketmq.logging.InternalLoggerFactory; 56 import org.apache.rocketmq.store.config.BrokerRole; 57 import org.apache.rocketmq.store.config.MessageStoreConfig; 58 import org.apache.rocketmq.store.config.StorePathConfigHelper; 59 import org.apache.rocketmq.store.dledger.DLedgerCommitLog; 60 import org.apache.rocketmq.store.ha.HAService; 61 import org.apache.rocketmq.store.index.IndexService; 62 import org.apache.rocketmq.store.index.QueryOffsetResult; 63 import org.apache.rocketmq.store.schedule.ScheduleMessageService; 64 import org.apache.rocketmq.store.stats.BrokerStatsManager; 65 66 public class DefaultMessageStore implements MessageStore { 67 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); 68 69 private final MessageStoreConfig messageStoreConfig; 70 // CommitLog 71 private final CommitLog commitLog; 72 73 private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; 74 75 private final FlushConsumeQueueService flushConsumeQueueService; 76 77 private final CleanCommitLogService cleanCommitLogService; 78 79 private final CleanConsumeQueueService cleanConsumeQueueService; 80 81 private final IndexService indexService; 82 83 private final AllocateMappedFileService allocateMappedFileService; 84 85 private final ReputMessageService reputMessageService; 86 87 private final HAService haService; 88 89 private final ScheduleMessageService scheduleMessageService; 90 91 private final StoreStatsService storeStatsService; 92 93 private final TransientStorePool transientStorePool; 94 95 private final RunningFlags runningFlags = new RunningFlags(); 96 private final SystemClock systemClock = new SystemClock(); 97 98 private final ScheduledExecutorService scheduledExecutorService = 99 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread")); 100 private final BrokerStatsManager brokerStatsManager; 101 private final MessageArrivingListener messageArrivingListener; 102 private final BrokerConfig brokerConfig; 103 104 private volatile boolean shutdown = true; 105 106 private StoreCheckpoint storeCheckpoint; 107 108 private AtomicLong printTimes = new AtomicLong(0); 109 110 private final LinkedList<CommitLogDispatcher> dispatcherList; 111 112 private RandomAccessFile lockFile; 113 114 private FileLock lock; 115 116 boolean shutDownNormal = false; 117 118 private final ScheduledExecutorService diskCheckScheduledExecutorService = 119 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread")); 120 121 public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, 122 final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { 123 this.messageArrivingListener = messageArrivingListener; 124 this.brokerConfig = brokerConfig; 125 this.messageStoreConfig = messageStoreConfig; 126 this.brokerStatsManager = brokerStatsManager; 127 this.allocateMappedFileService = new AllocateMappedFileService(this); 128 if (messageStoreConfig.isEnableDLegerCommitLog()) { 129 this.commitLog = new DLedgerCommitLog(this); 130 } else { 131 this.commitLog = new CommitLog(this); 132 } 133 this.consumeQueueTable = new ConcurrentHashMap<>(32); 134 135 this.flushConsumeQueueService = new FlushConsumeQueueService(); 136 this.cleanCommitLogService = new CleanCommitLogService(); 137 this.cleanConsumeQueueService = new CleanConsumeQueueService(); 138 this.storeStatsService = new StoreStatsService(); 139 this.indexService = new IndexService(this); 140 if (!messageStoreConfig.isEnableDLegerCommitLog()) { 141 this.haService = new HAService(this); 142 } else { 143 this.haService = null; 144 } 145 this.reputMessageService = new ReputMessageService(); 146 147 this.scheduleMessageService = new ScheduleMessageService(this); 148 149 this.transientStorePool = new TransientStorePool(messageStoreConfig); 150 151 if (messageStoreConfig.isTransientStorePoolEnable()) { 152 this.transientStorePool.init(); 153 } 154 155 this.allocateMappedFileService.start(); 156 157 this.indexService.start(); 158 159 this.dispatcherList = new LinkedList<>(); 160 this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); 161 this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); 162 163 File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); 164 MappedFile.ensureDirOK(file.getParent()); 165 lockFile = new RandomAccessFile(file, "rw"); 166 } 167 168 public void truncateDirtyLogicFiles(long phyOffset) { 169 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; 170 171 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { 172 for (ConsumeQueue logic : maps.values()) { 173 logic.truncateDirtyLogicFiles(phyOffset); 174 } 175 } 176 } 177 178 /** 179 * @throws IOException 180 */ 181 public boolean load() { 182 boolean result = true; 183 184 try { 185 boolean lastExitOK = !this.isTempFileExist(); 186 log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); 187 188 if (null != scheduleMessageService) { 189 result = result && this.scheduleMessageService.load(); 190 } 191 192 // load Commit Log 193 result = result && this.commitLog.load(); 194 195 // load Consume Queue 196 result = result && this.loadConsumeQueue(); 197 198 if (result) { 199 this.storeCheckpoint = 200 new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); 201 202 this.indexService.load(lastExitOK); 203 204 this.recover(lastExitOK); 205 206 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); 207 } 208 } catch (Exception e) { 209 log.error("load exception", e); 210 result = false; 211 } 212 213 if (!result) { 214 this.allocateMappedFileService.shutdown(); 215 } 216 217 return result; 218 } 219 220 /** 221 * @throws Exception 222 */ 223 public void start() throws Exception { 224 225 lock = lockFile.getChannel().tryLock(0, 1, false); 226 if (lock == null || lock.isShared() || !lock.isValid()) { 227 throw new RuntimeException("Lock failed,MQ already started"); 228 } 229 230 lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); 231 lockFile.getChannel().force(true); 232 { 233 /** 234 * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; 235 * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; 236 * 3. Calculate the reput offset according to the consume queue; 237 * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. 238 */ 239 long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); 240 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { 241 for (ConsumeQueue logic : maps.values()) { 242 if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { 243 maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); 244 } 245 } 246 } 247 if (maxPhysicalPosInLogicQueue < 0) { 248 maxPhysicalPosInLogicQueue = 0; 249 } 250 if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { 251 maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); 252 /** 253 * This happens in following conditions: 254 * 1. If someone removes all the consumequeue files or the disk get damaged. 255 * 2. Launch a new broker, and copy the commitlog from other brokers. 256 * 257 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. 258 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. 259 */ 260 log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); 261 } 262 log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", 263 maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); 264 this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); 265 this.reputMessageService.start(); 266 267 /** 268 * 1. Finish dispatching the messages fall behind, then to start other services. 269 * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 270 */ 271 while (true) { 272 if (dispatchBehindBytes() <= 0) { 273 break; 274 } 275 Thread.sleep(1000); 276 log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); 277 } 278 this.recoverTopicQueueTable(); 279 } 280 281 if (!messageStoreConfig.isEnableDLegerCommitLog()) { 282 this.haService.start(); 283 this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); 284 } 285 286 this.flushConsumeQueueService.start(); 287 this.commitLog.start(); 288 this.storeStatsService.start(); 289 290 this.createTempFile(); 291 this.addScheduleTask(); 292 this.shutdown = false; 293 } 294 295 public void shutdown() { 296 if (!this.shutdown) { 297 this.shutdown = true; 298 299 this.scheduledExecutorService.shutdown(); 300 this.diskCheckScheduledExecutorService.shutdown(); 301 try { 302 303 Thread.sleep(1000); 304 } catch (InterruptedException e) { 305 log.error("shutdown Exception, ", e); 306 } 307 308 if (this.scheduleMessageService != null) { 309 this.scheduleMessageService.shutdown(); 310 } 311 if (this.haService != null) { 312 this.haService.shutdown(); 313 } 314 315 this.storeStatsService.shutdown(); 316 this.indexService.shutdown(); 317 this.commitLog.shutdown(); 318 this.reputMessageService.shutdown(); 319 this.flushConsumeQueueService.shutdown(); 320 this.allocateMappedFileService.shutdown(); 321 this.storeCheckpoint.flush(); 322 this.storeCheckpoint.shutdown(); 323 324 if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) { 325 this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); 326 shutDownNormal = true; 327 } else { 328 log.warn("the store may be wrong, so shutdown abnormally, and keep abort file."); 329 } 330 } 331 332 this.transientStorePool.destroy(); 333 334 if (lockFile != null && lock != null) { 335 try { 336 lock.release(); 337 lockFile.close(); 338 } catch (IOException e) { 339 } 340 } 341 } 342 343 public void destroy() { 344 this.destroyLogics(); 345 this.commitLog.destroy(); 346 this.indexService.destroy(); 347 this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())); 348 this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); 349 } 350 351 public void destroyLogics() { 352 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { 353 for (ConsumeQueue logic : maps.values()) { 354 logic.destroy(); 355 } 356 } 357 } 358 359 private PutMessageStatus checkMessage(MessageExtBrokerInner msg) { 360 if (msg.getTopic().length() > Byte.MAX_VALUE) { 361 log.warn("putMessage message topic length too long " + msg.getTopic().length()); 362 return PutMessageStatus.MESSAGE_ILLEGAL; 363 } 364 365 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { 366 log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); 367 return PutMessageStatus.MESSAGE_ILLEGAL; 368 } 369 return PutMessageStatus.PUT_OK; 370 } 371 372 private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) { 373 if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { 374 log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); 375 return PutMessageStatus.MESSAGE_ILLEGAL; 376 } 377 378 if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) { 379 log.warn("PutMessages body length too long " + messageExtBatch.getBody().length); 380 return PutMessageStatus.MESSAGE_ILLEGAL; 381 } 382 383 return PutMessageStatus.PUT_OK; 384 } 385 386 private PutMessageStatus checkStoreStatus() { 387 if (this.shutdown) { 388 log.warn("message store has shutdown, so putMessage is forbidden"); 389 return PutMessageStatus.SERVICE_NOT_AVAILABLE; 390 } 391 392 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { 393 long value = this.printTimes.getAndIncrement(); 394 if ((value % 50000) == 0) { 395 log.warn("broke role is slave, so putMessage is forbidden"); 396 } 397 return PutMessageStatus.SERVICE_NOT_AVAILABLE; 398 } 399 400 if (!this.runningFlags.isWriteable()) { 401 long value = this.printTimes.getAndIncrement(); 402 if ((value % 50000) == 0) { 403 log.warn("the message store is not writable. It may be caused by one of the following reasons: " + 404 "the broker's disk is full, write to logic queue error, write to index file error, etc"); 405 } 406 return PutMessageStatus.SERVICE_NOT_AVAILABLE; 407 } else { 408 this.printTimes.set(0); 409 } 410 411 if (this.isOSPageCacheBusy()) { 412 return PutMessageStatus.OS_PAGECACHE_BUSY; 413 } 414 return PutMessageStatus.PUT_OK; 415 } 416 417 @Override 418 public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { 419 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); 420 if (checkStoreStatus != PutMessageStatus.PUT_OK) { 421 return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); 422 } 423 424 PutMessageStatus msgCheckStatus = this.checkMessage(msg); 425 if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { 426 return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); 427 } 428 429 long beginTime = this.getSystemClock().now(); 430 CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); 431 432 putResultFuture.thenAccept((result) -> { 433 long elapsedTime = this.getSystemClock().now() - beginTime; 434 if (elapsedTime > 500) { 435 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 436 } 437 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 438 439 if (null == result || !result.isOk()) { 440 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 441 } 442 }); 443 444 return putResultFuture; 445 } 446 447 public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) { 448 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); 449 if (checkStoreStatus != PutMessageStatus.PUT_OK) { 450 return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); 451 } 452 453 PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); 454 if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { 455 return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); 456 } 457 458 long beginTime = this.getSystemClock().now(); 459 CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch); 460 461 resultFuture.thenAccept((result) -> { 462 long elapsedTime = this.getSystemClock().now() - beginTime; 463 if (elapsedTime > 500) { 464 log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); 465 } 466 467 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 468 469 if (null == result || !result.isOk()) { 470 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 471 } 472 }); 473 474 return resultFuture; 475 } 476 477 @Override 478 public PutMessageResult putMessage(MessageExtBrokerInner msg) { 479 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); 480 if (checkStoreStatus != PutMessageStatus.PUT_OK) { 481 return new PutMessageResult(checkStoreStatus, null); 482 } 483 484 PutMessageStatus msgCheckStatus = this.checkMessage(msg); 485 if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { 486 return new PutMessageResult(msgCheckStatus, null); 487 } 488 489 long beginTime = this.getSystemClock().now(); 490 PutMessageResult result = this.commitLog.putMessage(msg); 491 long elapsedTime = this.getSystemClock().now() - beginTime; 492 if (elapsedTime > 500) { 493 log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 494 } 495 496 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 497 498 if (null == result || !result.isOk()) { 499 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 500 } 501 502 return result; 503 } 504 505 @Override 506 public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { 507 PutMessageStatus checkStoreStatus = this.checkStoreStatus(); 508 if (checkStoreStatus != PutMessageStatus.PUT_OK) { 509 return new PutMessageResult(checkStoreStatus, null); 510 } 511 512 PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); 513 if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { 514 return new PutMessageResult(msgCheckStatus, null); 515 } 516 517 long beginTime = this.getSystemClock().now(); 518 PutMessageResult result = this.commitLog.putMessages(messageExtBatch); 519 long elapsedTime = this.getSystemClock().now() - beginTime; 520 if (elapsedTime > 500) { 521 log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); 522 } 523 524 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 525 526 if (null == result || !result.isOk()) { 527 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 528 } 529 530 return result; 531 } 532 533 @Override 534 public boolean isOSPageCacheBusy() { 535 long begin = this.getCommitLog().getBeginTimeInLock(); 536 long diff = this.systemClock.now() - begin; 537 538 return diff < 10000000 539 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); 540 } 541 542 @Override 543 public long lockTimeMills() { 544 return this.commitLog.lockTimeMills(); 545 } 546 547 public SystemClock getSystemClock() { 548 return systemClock; 549 } 550 551 public CommitLog getCommitLog() { 552 return commitLog; 553 } 554 555 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, 556 final int maxMsgNums, 557 final MessageFilter messageFilter) { 558 if (this.shutdown) { 559 log.warn("message store has shutdown, so getMessage is forbidden"); 560 return null; 561 } 562 563 if (!this.runningFlags.isReadable()) { 564 log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); 565 return null; 566 } 567 568 long beginTime = this.getSystemClock().now(); 569 570 GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; 571 long nextBeginOffset = offset; 572 long minOffset = 0; 573 long maxOffset = 0; 574 575 GetMessageResult getResult = new GetMessageResult(); 576 577 final long maxOffsetPy = this.commitLog.getMaxOffset(); 578 579 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); 580 if (consumeQueue != null) { 581 minOffset = consumeQueue.getMinOffsetInQueue(); 582 maxOffset = consumeQueue.getMaxOffsetInQueue(); 583 584 if (maxOffset == 0) { 585 status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; 586 nextBeginOffset = nextOffsetCorrection(offset, 0); 587 } else if (offset < minOffset) { 588 status = GetMessageStatus.OFFSET_TOO_SMALL; 589 nextBeginOffset = nextOffsetCorrection(offset, minOffset); 590 } else if (offset == maxOffset) { 591 status = GetMessageStatus.OFFSET_OVERFLOW_ONE; 592 nextBeginOffset = nextOffsetCorrection(offset, offset); 593 } else if (offset > maxOffset) { 594 status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; 595 if (0 == minOffset) { 596 nextBeginOffset = nextOffsetCorrection(offset, minOffset); 597 } else { 598 nextBeginOffset = nextOffsetCorrection(offset, maxOffset); 599 } 600 } else { 601 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); 602 if (bufferConsumeQueue != null) { 603 try { 604 status = GetMessageStatus.NO_MATCHED_MESSAGE; 605 606 long nextPhyFileStartOffset = Long.MIN_VALUE; 607 long maxPhyOffsetPulling = 0; 608 609 int i = 0; 610 final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); 611 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); 612 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); 613 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 614 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); 615 int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); 616 long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); 617 618 maxPhyOffsetPulling = offsetPy; 619 620 if (nextPhyFileStartOffset != Long.MIN_VALUE) { 621 if (offsetPy < nextPhyFileStartOffset) 622 continue; 623 } 624 625 boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); 626 627 if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), 628 isInDisk)) { 629 break; 630 } 631 632 boolean extRet = false, isTagsCodeLegal = true; 633 if (consumeQueue.isExtAddr(tagsCode)) { 634 extRet = consumeQueue.getExt(tagsCode, cqExtUnit); 635 if (extRet) { 636 tagsCode = cqExtUnit.getTagsCode(); 637 } else { 638 // can't find ext content.Client will filter messages by tag also. 639 log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", 640 tagsCode, offsetPy, sizePy, topic, group); 641 isTagsCodeLegal = false; 642 } 643 } 644 645 if (messageFilter != null 646 && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { 647 if (getResult.getBufferTotalSize() == 0) { 648 status = GetMessageStatus.NO_MATCHED_MESSAGE; 649 } 650 651 continue; 652 } 653 654 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); 655 if (null == selectResult) { 656 if (getResult.getBufferTotalSize() == 0) { 657 status = GetMessageStatus.MESSAGE_WAS_REMOVING; 658 } 659 660 nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); 661 continue; 662 } 663 664 if (messageFilter != null 665 && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { 666 if (getResult.getBufferTotalSize() == 0) { 667 status = GetMessageStatus.NO_MATCHED_MESSAGE; 668 } 669 // release... 670 selectResult.release(); 671 continue; 672 } 673 674 this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); 675 getResult.addMessage(selectResult); 676 status = GetMessageStatus.FOUND; 677 nextPhyFileStartOffset = Long.MIN_VALUE; 678 } 679 680 if (diskFallRecorded) { 681 long fallBehind = maxOffsetPy - maxPhyOffsetPulling; 682 brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); 683 } 684 685 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 686 687 long diff = maxOffsetPy - maxPhyOffsetPulling; 688 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE 689 * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); 690 getResult.setSuggestPullingFromSlave(diff > memory); 691 } finally { 692 693 bufferConsumeQueue.release(); 694 } 695 } else { 696 status = GetMessageStatus.OFFSET_FOUND_NULL; 697 nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); 698 log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " 699 + maxOffset + ", but access logic queue failed."); 700 } 701 } 702 } else { 703 status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; 704 nextBeginOffset = nextOffsetCorrection(offset, 0); 705 } 706 707 if (GetMessageStatus.FOUND == status) { 708 this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); 709 } else { 710 this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); 711 } 712 long elapsedTime = this.getSystemClock().now() - beginTime; 713 this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); 714 715 getResult.setStatus(status); 716 getResult.setNextBeginOffset(nextBeginOffset); 717 getResult.setMaxOffset(maxOffset); 718 getResult.setMinOffset(minOffset); 719 return getResult; 720 } 721 722 public long getMaxOffsetInQueue(String topic, int queueId) { 723 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); 724 if (logic != null) { 725 long offset = logic.getMaxOffsetInQueue(); 726 return offset; 727 } 728 729 return 0; 730 } 731 732 public long getMinOffsetInQueue(String topic, int queueId) { 733 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); 734 if (logic != null) { 735 return logic.getMinOffsetInQueue(); 736 } 737 738 return -1; 739 } 740 741 @Override 742 public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) { 743 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); 744 if (consumeQueue != null) { 745 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset); 746 if (bufferConsumeQueue != null) { 747 try { 748 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); 749 return offsetPy; 750 } finally { 751 bufferConsumeQueue.release(); 752 } 753 } 754 } 755 756 return 0; 757 } 758 759 public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { 760 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); 761 if (logic != null) { 762 return logic.getOffsetInQueueByTime(timestamp); 763 } 764 765 return 0; 766 } 767 768 public MessageExt lookMessageByOffset(long commitLogOffset) { 769 SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); 770 if (null != sbr) { 771 try { 772 // 1 TOTALSIZE 773 int size = sbr.getByteBuffer().getInt(); 774 return lookMessageByOffset(commitLogOffset, size); 775 } finally { 776 sbr.release(); 777 } 778 } 779 780 return null; 781 } 782 783 @Override 784 public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { 785 SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4); 786 if (null != sbr) { 787 try { 788 // 1 TOTALSIZE 789 int size = sbr.getByteBuffer().getInt(); 790 return this.commitLog.getMessage(commitLogOffset, size); 791 } finally { 792 sbr.release(); 793 } 794 } 795 796 return null; 797 } 798 799 @Override 800 public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { 801 return this.commitLog.getMessage(commitLogOffset, msgSize); 802 } 803 804 public String getRunningDataInfo() { 805 return this.storeStatsService.toString(); 806 } 807 808 private String getStorePathPhysic() { 809 String storePathPhysic = ""; 810 if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) { 811 storePathPhysic = ((DLedgerCommitLog)DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath(); 812 } else { 813 storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); 814 } 815 return storePathPhysic; 816 } 817 818 @Override 819 public HashMap<String, String> getRuntimeInfo() { 820 HashMap<String, String> result = this.storeStatsService.getRuntimeInfo(); 821 822 { 823 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); 824 result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); 825 826 } 827 828 { 829 830 String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); 831 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); 832 result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); 833 } 834 835 { 836 if (this.scheduleMessageService != null) { 837 this.scheduleMessageService.buildRunningStats(result); 838 } 839 } 840 841 result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset())); 842 result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset())); 843 844 return result; 845 } 846 847 @Override 848 public long getMaxPhyOffset() { 849 return this.commitLog.getMaxOffset(); 850 } 851 852 @Override 853 public long getMinPhyOffset() { 854 return this.commitLog.getMinOffset(); 855 } 856 857 @Override 858 public long getEarliestMessageTime(String topic, int queueId) { 859 ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); 860 if (logicQueue != null) { 861 long minLogicOffset = logicQueue.getMinLogicOffset(); 862 863 SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE); 864 return getStoreTime(result); 865 } 866 867 return -1; 868 } 869 870 private long getStoreTime(SelectMappedBufferResult result) { 871 if (result != null) { 872 try { 873 final long phyOffset = result.getByteBuffer().getLong(); 874 final int size = result.getByteBuffer().getInt(); 875 long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); 876 return storeTime; 877 } catch (Exception e) { 878 } finally { 879 result.release(); 880 } 881 } 882 return -1; 883 } 884 885 @Override 886 public long getEarliestMessageTime() { 887 final long minPhyOffset = this.getMinPhyOffset(); 888 final int size = this.messageStoreConfig.getMaxMessageSize() * 2; 889 return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size); 890 } 891 892 @Override 893 public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { 894 ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); 895 if (logicQueue != null) { 896 SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); 897 return getStoreTime(result); 898 } 899 900 return -1; 901 } 902 903 @Override 904 public long getMessageTotalInQueue(String topic, int queueId) { 905 ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); 906 if (logicQueue != null) { 907 return logicQueue.getMessageTotalInQueue(); 908 } 909 910 return -1; 911 } 912 913 @Override 914 public SelectMappedBufferResult getCommitLogData(final long offset) { 915 if (this.shutdown) { 916 log.warn("message store has shutdown, so getPhyQueueData is forbidden"); 917 return null; 918 } 919 920 return this.commitLog.getData(offset); 921 } 922 923 @Override 924 public boolean appendToCommitLog(long startOffset, byte[] data) { 925 if (this.shutdown) { 926 log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); 927 return false; 928 } 929 930 boolean result = this.commitLog.appendData(startOffset, data); 931 if (result) { 932 this.reputMessageService.wakeup(); 933 } else { 934 log.error("appendToPhyQueue failed " + startOffset + " " + data.length); 935 } 936 937 return result; 938 } 939 940 @Override 941 public void executeDeleteFilesManually() { 942 this.cleanCommitLogService.excuteDeleteFilesManualy(); 943 } 944 945 @Override 946 public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { 947 QueryMessageResult queryMessageResult = new QueryMessageResult(); 948 949 long lastQueryMsgTime = end; 950 951 for (int i = 0; i < 3; i++) { 952 QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); 953 if (queryOffsetResult.getPhyOffsets().isEmpty()) { 954 break; 955 } 956 957 Collections.sort(queryOffsetResult.getPhyOffsets()); 958 959 queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); 960 queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); 961 962 for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { 963 long offset = queryOffsetResult.getPhyOffsets().get(m); 964 965 try { 966 967 boolean match = true; 968 MessageExt msg = this.lookMessageByOffset(offset); 969 if (0 == m) { 970 lastQueryMsgTime = msg.getStoreTimestamp(); 971 } 972 973 // String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR); 974 // if (topic.equals(msg.getTopic())) { 975 // for (String k : keyArray) { 976 // if (k.equals(key)) { 977 // match = true; 978 // break; 979 // } 980 // } 981 // } 982 983 if (match) { 984 SelectMappedBufferResult result = this.commitLog.getData(offset, false); 985 if (result != null) { 986 int size = result.getByteBuffer().getInt(0); 987 result.getByteBuffer().limit(size); 988 result.setSize(size); 989 queryMessageResult.addMessage(result); 990 } 991 } else { 992 log.warn("queryMessage hash duplicate, {} {}", topic, key); 993 } 994 } catch (Exception e) { 995 log.error("queryMessage exception", e); 996 } 997 } 998 999 if (queryMessageResult.getBufferTotalSize() > 0) { 1000 break; 1001 } 1002 1003 if (lastQueryMsgTime < begin) { 1004 break; 1005 } 1006 } 1007 1008 return queryMessageResult; 1009 } 1010 1011 @Override 1012 public void updateHaMasterAddress(String newAddr) { 1013 this.haService.updateMasterAddress(newAddr); 1014 } 1015 1016 @Override 1017 public long slaveFallBehindMuch() { 1018 return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get(); 1019 } 1020 1021 @Override 1022 public long now() { 1023 return this.systemClock.now(); 1024 } 1025 1026 @Override 1027 public int cleanUnusedTopic(Set<String> topics) { 1028 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); 1029 while (it.hasNext()) { 1030 Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); 1031 String topic = next.getKey(); 1032 1033 if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { 1034 ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); 1035 for (ConsumeQueue cq : queueTable.values()) { 1036 cq.destroy(); 1037 log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", 1038 cq.getTopic(), 1039 cq.getQueueId() 1040 ); 1041 1042 this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); 1043 } 1044 it.remove(); 1045 1046 if (this.brokerConfig.isAutoDeleteUnusedStats()) { 1047 this.brokerStatsManager.onTopicDeleted(topic); 1048 } 1049 1050 log.info("cleanUnusedTopic: {},topic destroyed", topic); 1051 } 1052 } 1053 1054 return 0; 1055 } 1056 1057 public void cleanExpiredConsumerQueue() { 1058 long minCommitLogOffset = this.commitLog.getMinOffset(); 1059 1060 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); 1061 while (it.hasNext()) { 1062 Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); 1063 String topic = next.getKey(); 1064 if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) { 1065 ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); 1066 Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); 1067 while (itQT.hasNext()) { 1068 Entry<Integer, ConsumeQueue> nextQT = itQT.next(); 1069 long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); 1070 1071 if (maxCLOffsetInConsumeQueue == -1) { 1072 log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", 1073 nextQT.getValue().getTopic(), 1074 nextQT.getValue().getQueueId(), 1075 nextQT.getValue().getMaxPhysicOffset(), 1076 nextQT.getValue().getMinLogicOffset()); 1077 } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { 1078 log.info( 1079 "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", 1080 topic, 1081 nextQT.getKey(), 1082 minCommitLogOffset, 1083 maxCLOffsetInConsumeQueue); 1084 1085 DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), 1086 nextQT.getValue().getQueueId()); 1087 1088 nextQT.getValue().destroy(); 1089 itQT.remove(); 1090 } 1091 } 1092 1093 if (queueTable.isEmpty()) { 1094 log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic); 1095 it.remove(); 1096 } 1097 } 1098 } 1099 } 1100 1101 public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, 1102 SocketAddress storeHost) { 1103 Map<String, Long> messageIds = new HashMap<String, Long>(); 1104 if (this.shutdown) { 1105 return messageIds; 1106 } 1107 1108 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); 1109 if (consumeQueue != null) { 1110 minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue()); 1111 maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue()); 1112 1113 if (maxOffset == 0) { 1114 return messageIds; 1115 } 1116 1117 long nextOffset = minOffset; 1118 while (nextOffset < maxOffset) { 1119 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset); 1120 if (bufferConsumeQueue != null) { 1121 try { 1122 int i = 0; 1123 for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 1124 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); 1125 InetSocketAddress inetSocketAddress = (InetSocketAddress) storeHost; 1126 int msgIdLength = (inetSocketAddress.getAddress() instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8; 1127 final ByteBuffer msgIdMemory = ByteBuffer.allocate(msgIdLength); 1128 String msgId = 1129 MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy); 1130 messageIds.put(msgId, nextOffset++); 1131 if (nextOffset > maxOffset) { 1132 return messageIds; 1133 } 1134 } 1135 } finally { 1136 1137 bufferConsumeQueue.release(); 1138 } 1139 } else { 1140 return messageIds; 1141 } 1142 } 1143 } 1144 return messageIds; 1145 } 1146 1147 @Override 1148 public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) { 1149 1150 final long maxOffsetPy = this.commitLog.getMaxOffset(); 1151 1152 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); 1153 if (consumeQueue != null) { 1154 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset); 1155 if (bufferConsumeQueue != null) { 1156 try { 1157 for (int i = 0; i < bufferConsumeQueue.getSize(); ) { 1158 i += ConsumeQueue.CQ_STORE_UNIT_SIZE; 1159 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); 1160 return checkInDiskByCommitOffset(offsetPy, maxOffsetPy); 1161 } 1162 } finally { 1163 1164 bufferConsumeQueue.release(); 1165 } 1166 } else { 1167 return false; 1168 } 1169 } 1170 return false; 1171 } 1172 1173 @Override 1174 public long dispatchBehindBytes() { 1175 return this.reputMessageService.behind(); 1176 } 1177 1178 @Override 1179 public long flush() { 1180 return this.commitLog.flush(); 1181 } 1182 1183 @Override 1184 public boolean resetWriteOffset(long phyOffset) { 1185 return this.commitLog.resetOffset(phyOffset); 1186 } 1187 1188 @Override 1189 public long getConfirmOffset() { 1190 return this.commitLog.getConfirmOffset(); 1191 } 1192 1193 @Override 1194 public void setConfirmOffset(long phyOffset) { 1195 this.commitLog.setConfirmOffset(phyOffset); 1196 } 1197 1198 public MessageExt lookMessageByOffset(long commitLogOffset, int size) { 1199 SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size); 1200 if (null != sbr) { 1201 try { 1202 return MessageDecoder.decode(sbr.getByteBuffer(), true, false); 1203 } finally { 1204 sbr.release(); 1205 } 1206 } 1207 1208 return null; 1209 } 1210 1211 public ConsumeQueue findConsumeQueue(String topic, int queueId) { 1212 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); 1213 if (null == map) { 1214 ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); 1215 ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); 1216 if (oldMap != null) { 1217 map = oldMap; 1218 } else { 1219 map = newMap; 1220 } 1221 } 1222 1223 ConsumeQueue logic = map.get(queueId); 1224 if (null == logic) { 1225 ConsumeQueue newLogic = new ConsumeQueue( 1226 topic, 1227 queueId, 1228 StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), 1229 this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), 1230 this); 1231 ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); 1232 if (oldLogic != null) { 1233 logic = oldLogic; 1234 } else { 1235 logic = newLogic; 1236 } 1237 } 1238 1239 return logic; 1240 } 1241 1242 private long nextOffsetCorrection(long oldOffset, long newOffset) { 1243 long nextOffset = oldOffset; 1244 if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) { 1245 nextOffset = newOffset; 1246 } 1247 return nextOffset; 1248 } 1249 1250 private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { 1251 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); 1252 return (maxOffsetPy - offsetPy) > memory; 1253 } 1254 1255 private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) { 1256 1257 if (0 == bufferTotal || 0 == messageTotal) { 1258 return false; 1259 } 1260 1261 if (maxMsgNums <= messageTotal) { 1262 return true; 1263 } 1264 1265 if (isInDisk) { 1266 if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) { 1267 return true; 1268 } 1269 1270 if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) { 1271 return true; 1272 } 1273 } else { 1274 if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) { 1275 return true; 1276 } 1277 1278 if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) { 1279 return true; 1280 } 1281 } 1282 1283 return false; 1284 } 1285 1286 private void deleteFile(final String fileName) { 1287 File file = new File(fileName); 1288 boolean result = file.delete(); 1289 log.info(fileName + (result ? " delete OK" : " delete Failed")); 1290 } 1291 1292 /** 1293 * @throws IOException 1294 */ 1295 private void createTempFile() throws IOException { 1296 String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); 1297 File file = new File(fileName); 1298 MappedFile.ensureDirOK(file.getParent()); 1299 boolean result = file.createNewFile(); 1300 log.info(fileName + (result ? " create OK" : " already exists")); 1301 } 1302 1303 private void addScheduleTask() { 1304 1305 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 1306 @Override 1307 public void run() { 1308 DefaultMessageStore.this.cleanFilesPeriodically(); 1309 } 1310 }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); 1311 1312 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 1313 @Override 1314 public void run() { 1315 DefaultMessageStore.this.checkSelf(); 1316 } 1317 }, 1, 10, TimeUnit.MINUTES); 1318 1319 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 1320 @Override 1321 public void run() { 1322 if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { 1323 try { 1324 if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { 1325 long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); 1326 if (lockTime > 1000 && lockTime < 10000000) { 1327 1328 String stack = UtilAll.jstack(); 1329 final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" 1330 + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; 1331 MixAll.string2FileNotSafe(stack, fileName); 1332 } 1333 } 1334 } catch (Exception e) { 1335 } 1336 } 1337 } 1338 }, 1, 1, TimeUnit.SECONDS); 1339 1340 // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 1341 // @Override 1342 // public void run() { 1343 // DefaultMessageStore.this.cleanExpiredConsumerQueue(); 1344 // } 1345 // }, 1, 1, TimeUnit.HOURS); 1346 this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() { 1347 public void run() { 1348 DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); 1349 } 1350 }, 1000L, 10000L, TimeUnit.MILLISECONDS); 1351 } 1352 1353 private void cleanFilesPeriodically() { 1354 this.cleanCommitLogService.run(); 1355 this.cleanConsumeQueueService.run(); 1356 } 1357 1358 private void checkSelf() { 1359 this.commitLog.checkSelf(); 1360 1361 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); 1362 while (it.hasNext()) { 1363 Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); 1364 Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator(); 1365 while (itNext.hasNext()) { 1366 Entry<Integer, ConsumeQueue> cq = itNext.next(); 1367 cq.getValue().checkSelf(); 1368 } 1369 } 1370 } 1371 1372 private boolean isTempFileExist() { 1373 String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()); 1374 File file = new File(fileName); 1375 return file.exists(); 1376 } 1377 1378 private boolean loadConsumeQueue() { 1379 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); 1380 File[] fileTopicList = dirLogic.listFiles(); 1381 if (fileTopicList != null) { 1382 1383 for (File fileTopic : fileTopicList) { 1384 String topic = fileTopic.getName(); 1385 1386 File[] fileQueueIdList = fileTopic.listFiles(); 1387 if (fileQueueIdList != null) { 1388 for (File fileQueueId : fileQueueIdList) { 1389 int queueId; 1390 try { 1391 queueId = Integer.parseInt(fileQueueId.getName()); 1392 } catch (NumberFormatException e) { 1393 continue; 1394 } 1395 ConsumeQueue logic = new ConsumeQueue( 1396 topic, 1397 queueId, 1398 StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), 1399 this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), 1400 this); 1401 this.putConsumeQueue(topic, queueId, logic); 1402 if (!logic.load()) { 1403 return false; 1404 } 1405 } 1406 } 1407 } 1408 } 1409 1410 log.info("load logics queue all over, OK"); 1411 1412 return true; 1413 } 1414 1415 private void recover(final boolean lastExitOK) { 1416 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); 1417 1418 if (lastExitOK) { 1419 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); 1420 } else { 1421 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); 1422 } 1423 1424 this.recoverTopicQueueTable(); 1425 } 1426 1427 public MessageStoreConfig getMessageStoreConfig() { 1428 return messageStoreConfig; 1429 } 1430 1431 public TransientStorePool getTransientStorePool() { 1432 return transientStorePool; 1433 } 1434 1435 private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { 1436 ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); 1437 if (null == map) { 1438 map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>(); 1439 map.put(queueId, consumeQueue); 1440 this.consumeQueueTable.put(topic, map); 1441 } else { 1442 map.put(queueId, consumeQueue); 1443 } 1444 } 1445 1446 private long recoverConsumeQueue() { 1447 long maxPhysicOffset = -1; 1448 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { 1449 for (ConsumeQueue logic : maps.values()) { 1450 logic.recover(); 1451 if (logic.getMaxPhysicOffset() > maxPhysicOffset) { 1452 maxPhysicOffset = logic.getMaxPhysicOffset(); 1453 } 1454 } 1455 } 1456 1457 return maxPhysicOffset; 1458 } 1459 1460 public void recoverTopicQueueTable() { 1461 HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); 1462 long minPhyOffset = this.commitLog.getMinOffset(); 1463 for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { 1464 for (ConsumeQueue logic : maps.values()) { 1465 String key = logic.getTopic() + "-" + logic.getQueueId(); 1466 table.put(key, logic.getMaxOffsetInQueue()); 1467 logic.correctMinOffset(minPhyOffset); 1468 } 1469 } 1470 1471 this.commitLog.setTopicQueueTable(table); 1472 } 1473 1474 public AllocateMappedFileService getAllocateMappedFileService() { 1475 return allocateMappedFileService; 1476 } 1477 1478 public StoreStatsService getStoreStatsService() { 1479 return storeStatsService; 1480 } 1481 1482 public RunningFlags getAccessRights() { 1483 return runningFlags; 1484 } 1485 1486 public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() { 1487 return consumeQueueTable; 1488 } 1489 1490 public StoreCheckpoint getStoreCheckpoint() { 1491 return storeCheckpoint; 1492 } 1493 1494 public HAService getHaService() { 1495 return haService; 1496 } 1497 1498 public ScheduleMessageService getScheduleMessageService() { 1499 return scheduleMessageService; 1500 } 1501 1502 public RunningFlags getRunningFlags() { 1503 return runningFlags; 1504 } 1505 1506 public void doDispatch(DispatchRequest req) { 1507 for (CommitLogDispatcher dispatcher : this.dispatcherList) { 1508 dispatcher.dispatch(req); 1509 } 1510 } 1511 1512 public void putMessagePositionInfo(DispatchRequest dispatchRequest) { 1513 ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); 1514 cq.putMessagePositionInfoWrapper(dispatchRequest); 1515 } 1516 1517 @Override 1518 public BrokerStatsManager getBrokerStatsManager() { 1519 return brokerStatsManager; 1520 } 1521 1522 @Override 1523 public void handleScheduleMessageService(final BrokerRole brokerRole) { 1524 if (this.scheduleMessageService != null) { 1525 if (brokerRole == BrokerRole.SLAVE) { 1526 this.scheduleMessageService.shutdown(); 1527 } else { 1528 this.scheduleMessageService.start(); 1529 } 1530 } 1531 1532 } 1533 1534 public int remainTransientStoreBufferNumbs() { 1535 return this.transientStorePool.availableBufferNums(); 1536 } 1537 1538 @Override 1539 public boolean isTransientStorePoolDeficient() { 1540 return remainTransientStoreBufferNumbs() == 0; 1541 } 1542 1543 @Override 1544 public LinkedList<CommitLogDispatcher> getDispatcherList() { 1545 return this.dispatcherList; 1546 } 1547 1548 @Override 1549 public ConsumeQueue getConsumeQueue(String topic, int queueId) { 1550 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); 1551 if (map == null) { 1552 return null; 1553 } 1554 return map.get(queueId); 1555 } 1556 1557 public void unlockMappedFile(final MappedFile mappedFile) { 1558 this.scheduledExecutorService.schedule(new Runnable() { 1559 @Override 1560 public void run() { 1561 mappedFile.munlock(); 1562 } 1563 }, 6, TimeUnit.SECONDS); 1564 } 1565 1566 class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { 1567 1568 @Override 1569 public void dispatch(DispatchRequest request) { 1570 final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); 1571 switch (tranType) { 1572 case MessageSysFlag.TRANSACTION_NOT_TYPE: 1573 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 1574 DefaultMessageStore.this.putMessagePositionInfo(request); 1575 break; 1576 case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 1577 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 1578 break; 1579 } 1580 } 1581 } 1582 1583 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { 1584 1585 @Override 1586 public void dispatch(DispatchRequest request) { 1587 if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) { 1588 DefaultMessageStore.this.indexService.buildIndex(request); 1589 } 1590 } 1591 } 1592 1593 class CleanCommitLogService { 1594 1595 private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; 1596 private final double diskSpaceWarningLevelRatio = 1597 Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); 1598 1599 private final double diskSpaceCleanForciblyRatio = 1600 Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); 1601 private long lastRedeleteTimestamp = 0; 1602 1603 private volatile int manualDeleteFileSeveralTimes = 0; 1604 1605 private volatile boolean cleanImmediately = false; 1606 1607 public void excuteDeleteFilesManualy() { 1608 this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; 1609 DefaultMessageStore.log.info("executeDeleteFilesManually was invoked"); 1610 } 1611 1612 public void run() { 1613 try { 1614 this.deleteExpiredFiles(); 1615 1616 this.redeleteHangedFile(); 1617 } catch (Throwable e) { 1618 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 1619 } 1620 } 1621 1622 private void deleteExpiredFiles() { 1623 int deleteCount = 0; 1624 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); 1625 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); 1626 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); 1627 1628 boolean timeup = this.isTimeToDelete(); 1629 boolean spacefull = this.isSpaceToDelete(); 1630 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; 1631 1632 if (timeup || spacefull || manualDelete) { 1633 1634 if (manualDelete) 1635 this.manualDeleteFileSeveralTimes--; 1636 1637 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; 1638 1639 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", 1640 fileReservedTime, 1641 timeup, 1642 spacefull, 1643 manualDeleteFileSeveralTimes, 1644 cleanAtOnce); 1645 1646 fileReservedTime *= 60 * 60 * 1000; 1647 1648 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, 1649 destroyMapedFileIntervalForcibly, cleanAtOnce); 1650 if (deleteCount > 0) { 1651 } else if (spacefull) { 1652 log.warn("disk space will be full soon, but delete file failed."); 1653 } 1654 } 1655 } 1656 1657 private void redeleteHangedFile() { 1658 int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); 1659 long currentTimestamp = System.currentTimeMillis(); 1660 if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { 1661 this.lastRedeleteTimestamp = currentTimestamp; 1662 int destroyMapedFileIntervalForcibly = 1663 DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); 1664 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { 1665 } 1666 } 1667 } 1668 1669 public String getServiceName() { 1670 return CleanCommitLogService.class.getSimpleName(); 1671 } 1672 1673 private boolean isTimeToDelete() { 1674 String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); 1675 if (UtilAll.isItTimeToDo(when)) { 1676 DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); 1677 return true; 1678 } 1679 1680 return false; 1681 } 1682 1683 private boolean isSpaceToDelete() { 1684 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; 1685 1686 cleanImmediately = false; 1687 1688 { 1689 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); 1690 if (physicRatio > diskSpaceWarningLevelRatio) { 1691 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 1692 if (diskok) { 1693 DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); 1694 } 1695 1696 cleanImmediately = true; 1697 } else if (physicRatio > diskSpaceCleanForciblyRatio) { 1698 cleanImmediately = true; 1699 } else { 1700 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 1701 if (!diskok) { 1702 DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); 1703 } 1704 } 1705 1706 if (physicRatio < 0 || physicRatio > ratio) { 1707 DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); 1708 return true; 1709 } 1710 } 1711 1712 { 1713 String storePathLogics = StorePathConfigHelper 1714 .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); 1715 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); 1716 if (logicsRatio > diskSpaceWarningLevelRatio) { 1717 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 1718 if (diskok) { 1719 DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); 1720 } 1721 1722 cleanImmediately = true; 1723 } else if (logicsRatio > diskSpaceCleanForciblyRatio) { 1724 cleanImmediately = true; 1725 } else { 1726 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 1727 if (!diskok) { 1728 DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); 1729 } 1730 } 1731 1732 if (logicsRatio < 0 || logicsRatio > ratio) { 1733 DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); 1734 return true; 1735 } 1736 } 1737 1738 return false; 1739 } 1740 1741 public int getManualDeleteFileSeveralTimes() { 1742 return manualDeleteFileSeveralTimes; 1743 } 1744 1745 public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { 1746 this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; 1747 } 1748 public boolean isSpaceFull() { 1749 String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); 1750 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); 1751 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; 1752 if (physicRatio > ratio) { 1753 DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); 1754 } 1755 if (physicRatio > this.diskSpaceWarningLevelRatio) { 1756 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 1757 if (diskok) { 1758 DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full"); 1759 } 1760 1761 return true; 1762 } else { 1763 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 1764 1765 if (!diskok) { 1766 DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok"); 1767 } 1768 1769 return false; 1770 } 1771 } 1772 } 1773 1774 class CleanConsumeQueueService { 1775 private long lastPhysicalMinOffset = 0; 1776 1777 public void run() { 1778 try { 1779 this.deleteExpiredFiles(); 1780 } catch (Throwable e) { 1781 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 1782 } 1783 } 1784 1785 private void deleteExpiredFiles() { 1786 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); 1787 1788 long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); 1789 if (minOffset > this.lastPhysicalMinOffset) { 1790 this.lastPhysicalMinOffset = minOffset; 1791 1792 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; 1793 1794 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { 1795 for (ConsumeQueue logic : maps.values()) { 1796 int deleteCount = logic.deleteExpiredFile(minOffset); 1797 1798 if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { 1799 try { 1800 Thread.sleep(deleteLogicsFilesInterval); 1801 } catch (InterruptedException ignored) { 1802 } 1803 } 1804 } 1805 } 1806 1807 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); 1808 } 1809 } 1810 1811 public String getServiceName() { 1812 return CleanConsumeQueueService.class.getSimpleName(); 1813 } 1814 } 1815 1816 class FlushConsumeQueueService extends ServiceThread { 1817 private static final int RETRY_TIMES_OVER = 3; 1818 private long lastFlushTimestamp = 0; 1819 1820 private void doFlush(int retryTimes) { 1821 int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); 1822 1823 if (retryTimes == RETRY_TIMES_OVER) { 1824 flushConsumeQueueLeastPages = 0; 1825 } 1826 1827 long logicsMsgTimestamp = 0; 1828 1829 int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); 1830 long currentTimeMillis = System.currentTimeMillis(); 1831 if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { 1832 this.lastFlushTimestamp = currentTimeMillis; 1833 flushConsumeQueueLeastPages = 0; 1834 logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); 1835 } 1836 1837 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; 1838 1839 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { 1840 for (ConsumeQueue cq : maps.values()) { 1841 boolean result = false; 1842 for (int i = 0; i < retryTimes && !result; i++) { 1843 result = cq.flush(flushConsumeQueueLeastPages); 1844 } 1845 } 1846 } 1847 1848 if (0 == flushConsumeQueueLeastPages) { 1849 if (logicsMsgTimestamp > 0) { 1850 DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); 1851 } 1852 DefaultMessageStore.this.getStoreCheckpoint().flush(); 1853 } 1854 } 1855 1856 public void run() { 1857 DefaultMessageStore.log.info(this.getServiceName() + " service started"); 1858 1859 while (!this.isStopped()) { 1860 try { 1861 int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); 1862 this.waitForRunning(interval); 1863 this.doFlush(1); 1864 } catch (Exception e) { 1865 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 1866 } 1867 } 1868 1869 this.doFlush(RETRY_TIMES_OVER); 1870 1871 DefaultMessageStore.log.info(this.getServiceName() + " service end"); 1872 } 1873 1874 @Override 1875 public String getServiceName() { 1876 return FlushConsumeQueueService.class.getSimpleName(); 1877 } 1878 1879 @Override 1880 public long getJointime() { 1881 return 1000 * 60; 1882 } 1883 } 1884 1885 class ReputMessageService extends ServiceThread { 1886 1887 private volatile long reputFromOffset = 0; 1888 1889 public long getReputFromOffset() { 1890 return reputFromOffset; 1891 } 1892 1893 public void setReputFromOffset(long reputFromOffset) { 1894 this.reputFromOffset = reputFromOffset; 1895 } 1896 1897 @Override 1898 public void shutdown() { 1899 for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { 1900 try { 1901 Thread.sleep(100); 1902 } catch (InterruptedException ignored) { 1903 } 1904 } 1905 1906 if (this.isCommitLogAvailable()) { 1907 log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", 1908 DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset); 1909 } 1910 1911 super.shutdown(); 1912 } 1913 1914 public long behind() { 1915 return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; 1916 } 1917 1918 private boolean isCommitLogAvailable() { 1919 return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset(); 1920 } 1921 1922 private void doReput() { 1923 if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { 1924 log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", 1925 this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); 1926 this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); 1927 } 1928 for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { 1929 1930 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() 1931 && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { 1932 break; 1933 } 1934 1935 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); 1936 if (result != null) { 1937 try { 1938 this.reputFromOffset = result.getStartOffset(); 1939 1940 for (int readSize = 0; readSize < result.getSize() && doNext; ) { 1941 DispatchRequest dispatchRequest = 1942 DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); 1943 int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); 1944 1945 if (dispatchRequest.isSuccess()) { 1946 if (size > 0) { 1947 DefaultMessageStore.this.doDispatch(dispatchRequest); 1948 1949 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() 1950 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { 1951 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), 1952 dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, 1953 dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), 1954 dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); 1955 } 1956 1957 this.reputFromOffset += size; 1958 readSize += size; 1959 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { 1960 DefaultMessageStore.this.storeStatsService 1961 .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); 1962 DefaultMessageStore.this.storeStatsService 1963 .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) 1964 .addAndGet(dispatchRequest.getMsgSize()); 1965 } 1966 } else if (size == 0) { 1967 this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); 1968 readSize = result.getSize(); 1969 } 1970 } else if (!dispatchRequest.isSuccess()) { 1971 1972 if (size > 0) { 1973 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); 1974 this.reputFromOffset += size; 1975 } else { 1976 doNext = false; 1977 // If user open the dledger pattern or the broker is master node, 1978 // it will not ignore the exception and fix the reputFromOffset variable 1979 if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || 1980 DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { 1981 log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", 1982 this.reputFromOffset); 1983 this.reputFromOffset += result.getSize() - readSize; 1984 } 1985 } 1986 } 1987 } 1988 } finally { 1989 result.release(); 1990 } 1991 } else { 1992 doNext = false; 1993 } 1994 } 1995 } 1996 1997 @Override 1998 public void run() { 1999 DefaultMessageStore.log.info(this.getServiceName() + " service started"); 2000 2001 while (!this.isStopped()) { 2002 try { 2003 Thread.sleep(1); 2004 this.doReput(); 2005 } catch (Exception e) { 2006 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 2007 } 2008 } 2009 2010 DefaultMessageStore.log.info(this.getServiceName() + " service end"); 2011 } 2012 2013 @Override 2014 public String getServiceName() { 2015 return ReputMessageService.class.getSimpleName(); 2016 } 2017 2018 } 2019 }
ReputMessageService 服务启动后的执行过程如下所示:
doReput()方法用于创建索引的入口,通常通过以下几个步骤来创建索引:
第一步:从 CommitLog 中查找未创建索引的消息,将消息组装成 DispatchRequest 对象,该逻辑主要在D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中checkMessageAndReturnSize()方法中实现,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java。
1 /** 2 * check the message and returns the message size 3 * 4 * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure 5 */ 6 public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, 7 final boolean readBody) { 8 try { 9 // 1 TOTAL SIZE 10 int totalSize = byteBuffer.getInt(); 11 12 // 2 MAGIC CODE 13 int magicCode = byteBuffer.getInt(); 14 switch (magicCode) { 15 case MESSAGE_MAGIC_CODE: 16 break; 17 case BLANK_MAGIC_CODE: 18 return new DispatchRequest(0, true /* success */); 19 default: 20 log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode)); 21 return new DispatchRequest(-1, false /* success */); 22 } 23 24 byte[] bytesContent = new byte[totalSize]; 25 26 int bodyCRC = byteBuffer.getInt(); 27 28 int queueId = byteBuffer.getInt(); 29 30 int flag = byteBuffer.getInt(); 31 32 long queueOffset = byteBuffer.getLong(); 33 34 long physicOffset = byteBuffer.getLong(); 35 36 int sysFlag = byteBuffer.getInt(); 37 38 long bornTimeStamp = byteBuffer.getLong(); 39 40 ByteBuffer byteBuffer1; 41 if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) { 42 byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4); 43 } else { 44 byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4); 45 } 46 47 long storeTimestamp = byteBuffer.getLong(); 48 49 ByteBuffer byteBuffer2; 50 if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { 51 byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4); 52 } else { 53 byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4); 54 } 55 56 int reconsumeTimes = byteBuffer.getInt(); 57 58 long preparedTransactionOffset = byteBuffer.getLong(); 59 60 int bodyLen = byteBuffer.getInt(); 61 if (bodyLen > 0) { 62 if (readBody) { 63 byteBuffer.get(bytesContent, 0, bodyLen); 64 65 if (checkCRC) { 66 int crc = UtilAll.crc32(bytesContent, 0, bodyLen); 67 if (crc != bodyCRC) { 68 log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC); 69 return new DispatchRequest(-1, false/* success */); 70 } 71 } 72 } else { 73 byteBuffer.position(byteBuffer.position() + bodyLen); 74 } 75 } 76 77 byte topicLen = byteBuffer.get(); 78 byteBuffer.get(bytesContent, 0, topicLen); 79 String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8); 80 81 long tagsCode = 0; 82 String keys = ""; 83 String uniqKey = null; 84 85 short propertiesLength = byteBuffer.getShort(); 86 Map<String, String> propertiesMap = null; 87 if (propertiesLength > 0) { 88 byteBuffer.get(bytesContent, 0, propertiesLength); 89 String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8); 90 propertiesMap = MessageDecoder.string2messageProperties(properties); 91 92 keys = propertiesMap.get(MessageConst.PROPERTY_KEYS); 93 94 uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); 95 96 String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS); 97 if (tags != null && tags.length() > 0) { 98 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags); 99 } 100 101 // Timing message processing 102 { 103 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); 104 if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { 105 int delayLevel = Integer.parseInt(t); 106 107 if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { 108 delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); 109 } 110 111 if (delayLevel > 0) { 112 tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, 113 storeTimestamp); 114 } 115 } 116 } 117 } 118 119 int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength); 120 if (totalSize != readLength) { 121 doNothingForDeadCode(reconsumeTimes); 122 doNothingForDeadCode(flag); 123 doNothingForDeadCode(bornTimeStamp); 124 doNothingForDeadCode(byteBuffer1); 125 doNothingForDeadCode(byteBuffer2); 126 log.error( 127 "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}", 128 totalSize, readLength, bodyLen, topicLen, propertiesLength); 129 return new DispatchRequest(totalSize, false/* success */); 130 } 131 132 return new DispatchRequest( 133 topic, 134 queueId, 135 physicOffset, 136 totalSize, 137 tagsCode, 138 storeTimestamp, 139 queueOffset, 140 keys, 141 uniqKey, 142 sysFlag, 143 preparedTransactionOffset, 144 propertiesMap 145 ); 146 } catch (Exception e) { 147 } 148 149 return new DispatchRequest(-1, false /* success */); 150 }
第二步:调用 doDispathc() 方法,该方法会循环多个索引处理器(这里初始化了D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CommitLogDispatcherBuildConsumeQueue 和 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CommitLogDispatcherBuildIndex 两个索引处理器)并调用索引处理器的 dispatch() 方法来处理 DispatchRequest。
CommitLogDispatcherBuildConsumeQueue 索引处理器用于构建 Consume Queue,CommitLogDispatcherBuildIndex 用于构建 Index File。
Consume Queue 是必须创建的,Index File 是否需要创建则是通过设置 messageIndexEnable 为 True 或 False 来实现的,默认为 True。
Consume Queue 的索引信息被保存到 Page Cache 后,其持久化的过程和 CommitLog 异步刷盘的过程类似,执行 DefaultMessageStore.FlushConsumeQueueService()服务,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java。
2.2 索引创建失败怎么办?
如果消息写入 CommitLog 后 Broker 宕机了,那么 Consume Queue 和 Index File 索引肯定就创建失败了。此时 ReputMessageService 如何保证创建索引的可靠性呢?
Consume Queue 和 Index File 每次刷盘时都会做 Checkpoint 操作,Broker 每次重启的时候可以根据 Checkpoint信息得知哪些消息还未创建索引。
三、索引如何使用
3.1 按照位点查消息
RocketMQ 支持 Pull 和 Push 两种消费模式,Push 模式是基于 Pull 模式的,两种模式都是通过拉取消息进行消费和提交位点的。
Broker 在处理客户端拉取消息请求时是怎么查询消息的呢?
答:代码路径:D:
ocketmq-masterstoresrcmainjavaorgapache
ocketmqstoreDefaultMessageStore.java 中 第555行的 getMessage() 方法介绍了 Broker的代码实现,如下
1 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, 2 final int maxMsgNums, 3 final MessageFilter messageFilter)
# getMessage()方法的参数说明:
group:消费者组名。
topic:主题的名字,group订阅了 topic 才能拉取消息。
queueId:一般一个 topic 会有很多分区,客户端轮询全部分区,拉取并消费消息。
offset:拉取位点大于等于该值的消息。
maxMsgNums:一次拉取多少消息,在客户端由 pullBatchSize 进行配置。
messageFilter:消息过滤器。
{
4 if (this.shutdown) { 5 log.warn("message store has shutdown, so getMessage is forbidden"); 6 return null; 7 } 8 9 if (!this.runningFlags.isReadable()) { 10 log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits()); 11 return null; 12 } 13 14 long beginTime = this.getSystemClock().now(); 15 16 GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; 17 long nextBeginOffset = offset; 18 long minOffset = 0; 19 long maxOffset = 0; 20 21 GetMessageResult getResult = new GetMessageResult(); 22 23 final long maxOffsetPy = this.commitLog.getMaxOffset(); 24 25 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); 26 if (consumeQueue != null) { 27 minOffset = consumeQueue.getMinOffsetInQueue(); 28 maxOffset = consumeQueue.getMaxOffsetInQueue(); 29 30 if (maxOffset == 0) { 31 status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; 32 nextBeginOffset = nextOffsetCorrection(offset, 0); 33 } else if (offset < minOffset) { 34 status = GetMessageStatus.OFFSET_TOO_SMALL; 35 nextBeginOffset = nextOffsetCorrection(offset, minOffset); 36 } else if (offset == maxOffset) { 37 status = GetMessageStatus.OFFSET_OVERFLOW_ONE; 38 nextBeginOffset = nextOffsetCorrection(offset, offset); 39 } else if (offset > maxOffset) { 40 status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; 41 if (0 == minOffset) { 42 nextBeginOffset = nextOffsetCorrection(offset, minOffset); 43 } else { 44 nextBeginOffset = nextOffsetCorrection(offset, maxOffset); 45 } 46 } else { 47 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); 48 if (bufferConsumeQueue != null) { 49 try { 50 status = GetMessageStatus.NO_MATCHED_MESSAGE; 51 52 long nextPhyFileStartOffset = Long.MIN_VALUE; 53 long maxPhyOffsetPulling = 0; 54 55 int i = 0; 56 final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); 57 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); 58 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); 59 for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 60 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); 61 int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); 62 long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); 63 64 maxPhyOffsetPulling = offsetPy; 65 66 if (nextPhyFileStartOffset != Long.MIN_VALUE) { 67 if (offsetPy < nextPhyFileStartOffset) 68 continue; 69 } 70 71 boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); 72 73 if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), 74 isInDisk)) { 75 break; 76 } 77 78 boolean extRet = false, isTagsCodeLegal = true; 79 if (consumeQueue.isExtAddr(tagsCode)) { 80 extRet = consumeQueue.getExt(tagsCode, cqExtUnit); 81 if (extRet) { 82 tagsCode = cqExtUnit.getTagsCode(); 83 } else { 84 // can't find ext content.Client will filter messages by tag also. 85 log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", 86 tagsCode, offsetPy, sizePy, topic, group); 87 isTagsCodeLegal = false; 88 } 89 } 90 91 if (messageFilter != null 92 && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { 93 if (getResult.getBufferTotalSize() == 0) { 94 status = GetMessageStatus.NO_MATCHED_MESSAGE; 95 } 96 97 continue; 98 } 99 100 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); 101 if (null == selectResult) { 102 if (getResult.getBufferTotalSize() == 0) { 103 status = GetMessageStatus.MESSAGE_WAS_REMOVING; 104 } 105 106 nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); 107 continue; 108 } 109 110 if (messageFilter != null 111 && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { 112 if (getResult.getBufferTotalSize() == 0) { 113 status = GetMessageStatus.NO_MATCHED_MESSAGE; 114 } 115 // release... 116 selectResult.release(); 117 continue; 118 } 119 120 this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); 121 getResult.addMessage(selectResult); 122 status = GetMessageStatus.FOUND; 123 nextPhyFileStartOffset = Long.MIN_VALUE; 124 } 125 126 if (diskFallRecorded) { 127 long fallBehind = maxOffsetPy - maxPhyOffsetPulling; 128 brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); 129 } 130 131 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); 132 133 long diff = maxOffsetPy - maxPhyOffsetPulling; 134 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE 135 * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); 136 getResult.setSuggestPullingFromSlave(diff > memory); 137 } finally { 138 139 bufferConsumeQueue.release(); 140 } 141 } else { 142 status = GetMessageStatus.OFFSET_FOUND_NULL; 143 nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset)); 144 log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: " 145 + maxOffset + ", but access logic queue failed."); 146 } 147 } 148 } else { 149 status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; 150 nextBeginOffset = nextOffsetCorrection(offset, 0); 151 } 152 153 if (GetMessageStatus.FOUND == status) { 154 this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); 155 } else { 156 this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); 157 } 158 long elapsedTime = this.getSystemClock().now() - beginTime; 159 this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); 160 161 getResult.setStatus(status); 162 getResult.setNextBeginOffset(nextBeginOffset); 163 getResult.setMaxOffset(maxOffset); 164 getResult.setMinOffset(minOffset); 165 return getResult; 166 }
getMessage()方法查询消息的过程如下图:
getMessage() 方法查询消息的过程可以分为以下几个步骤:
第一步:拉取前校验,校验 DefaultMessageStore 服务是否已经关闭(正常关闭进程会被关闭),校验 DefaultMessageStore 服务是否可读。
第二步:根据 Topic 和 queueId 查找 ConsumeQueue 索引映射文件。判断根据查找到的 ConsumeQueue 索引文件校验传入的待查询的位点值是否合理,如果不合理,重新计算下一次可以拉去的位点值。
第三步:循环查询满足 maxMsgNums 条数的消息。循环从 ConsumeQueue 中读取消息物理位点、消息大小和消息 Tag 的 Hash 值。先做 Hash 过滤,再使用过滤后的消息物理位点到 CommitLog 中查找消息体,并放入结果列表中。
第四步:监控指标统计,返回拉取的消息结果。
3.2 按照时间段查消息
输入 Topic、起始时间、结束时间可以查到这段时间内的消息。这是一个 Consume Queue 索引查询消息的扩展查询,具体步骤如下:
第一步:查找这个 Topic 的所有 Queue。
第二步:在每一个队列中查询起始时间、结束时间对应的其实 offset 和 最后消息的 offset。
如何根据时间查找物理位点呢?主要在于构建 Consume Queue,这个文件是按照时间顺序来写的,每条消息的索引数据结构大小是固定20字节。可以根据时间做二分折半搜索,找到与时间最接近的一个位点。
第三步:根据起始点、最后消息位点和 Topic,循环拉取所有 Queue 就可以拉取到消息。
3.3 按照key查询消息
如果通过设置 messageIndexEnable=True(默认是True)来开启Index索引服务,那么在写入消息时会根据 key 自动构建 Index File 索引。用户可以通过 Topic 和 key 查询消息,查询方法为 org.apache.rocketmq.store.ConsumeQueue.queryMessage(String Topic, String key, int maxNum, long begin, long end)。