zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——Broker CommitLog 索引机制

    Broker CommitLog 索引机制

      绝大部分存储组件都有索引机制,RocketMQ 也一样,有巨量堆积能力的同时,通过索引可以加快读取和查询。

    一、索引的数据结构:

      索引,是为增加查询速度而设计的一种数据结构。在 RocketMQ 中也是以文件形式保存在 Broker 中的。

      Broker中有2种索引:

      • Consumer Queue
      • Index File

      第一种,Consumer Queue:消费队列,主要用于消费拉取消息、更新消费位点等所用的索引。文件内保存了消息的物理位点、消息体大小、消息Tag的Hash值。

      物理位点:消息在 CommitLog 中的位点值。

      消息体大小:包含消息 Topic 值大小、CRC 值大小、消息体大小等全部数据的总大小,单位是字节。

      Tag 的 Hash 值:由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreMessageExtBrokerInner.java 方法计算得来。如果消息有 Tag 值,那么该值可以通过 String 的 Hashcode 获得。

     1 /*
     2  * Licensed to the Apache Software Foundation (ASF) under one or more
     3  * contributor license agreements.  See the NOTICE file distributed with
     4  * this work for additional information regarding copyright ownership.
     5  * The ASF licenses this file to You under the Apache License, Version 2.0
     6  * (the "License"); you may not use this file except in compliance with
     7  * the License.  You may obtain a copy of the License at
     8  *
     9  *     http://www.apache.org/licenses/LICENSE-2.0
    10  *
    11  * Unless required by applicable law or agreed to in writing, software
    12  * distributed under the License is distributed on an "AS IS" BASIS,
    13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  * See the License for the specific language governing permissions and
    15  * limitations under the License.
    16  */
    17 package org.apache.rocketmq.store;
    18 
    19 import org.apache.rocketmq.common.TopicFilterType;
    20 import org.apache.rocketmq.common.message.MessageExt;
    21 
    22 public class MessageExtBrokerInner extends MessageExt {
    23     private static final long serialVersionUID = 7256001576878700634L;
    24     private String propertiesString;
    25     private long tagsCode;
    26 
    27     public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
    28         if (null == tags || tags.length() == 0) { return 0; }
    29 
    30         return tags.hashCode();
    31     }
    32 
    33     public static long tagsString2tagsCode(final String tags) {
    34         return tagsString2tagsCode(null, tags);
    35     }
    36 
    37     public String getPropertiesString() {
    38         return propertiesString;
    39     }
    40 
    41     public void setPropertiesString(String propertiesString) {
    42         this.propertiesString = propertiesString;
    43     }
    44 
    45     public long getTagsCode() {
    46         return tagsCode;
    47     }
    48 
    49     public void setTagsCode(long tagsCode) {
    50         this.tagsCode = tagsCode;
    51     }
    52 }

      第二种,Index File:是一个 RocketMQ 实现的 Hash 索引,主要在用户用消息 key 查询时使用,该索引是通过 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreindexIndexFile.java 中 public class IndexFile{} 类实现的。

      在 RocketMQ 中同时存在多个 Index File 文件,这些文件按照消息生产的时间顺序排序,每个Index File 文件包含文件头、Hash槽位、索引数据。每个文件的 Hash 槽位个数、索引数据个数都是固定的。Hash 槽位可以通过 Broker 启动参数 maxHashSlotNum 进行配置,默认值为 500 万。索引数据可以通过 Broker 启动参数 maxIndexNum 进行配置,默认值为 500万*4=2000万,一个Index File 约为400MB。

      Index File 的索引设计在一定程度上参考了 Java的 HashMap 设计,只是当 Index file 遇到 Hash 碰撞时只会用链表,而 Java 8 中在一定情况下链表会转化为 红黑树。

      具体 Index File 的 Hash 槽和索引数据之间是如何处理 Hash 碰撞的呢?

      在 Hash 碰撞时,Hash 槽位中保存的总是最新消息的指针,这是因为在消息队列中,用户最关心的总是最新的数据。

    二、索引的构建过程:

      2.1 创建 Consume Queue 和 Index File

      Consume Queue 和 Index File 两个索引都是由 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java  ,代码如下:

       1 /*
       2  * Licensed to the Apache Software Foundation (ASF) under one or more
       3  * contributor license agreements.  See the NOTICE file distributed with
       4  * this work for additional information regarding copyright ownership.
       5  * The ASF licenses this file to You under the Apache License, Version 2.0
       6  * (the "License"); you may not use this file except in compliance with
       7  * the License.  You may obtain a copy of the License at
       8  *
       9  *     http://www.apache.org/licenses/LICENSE-2.0
      10  *
      11  * Unless required by applicable law or agreed to in writing, software
      12  * distributed under the License is distributed on an "AS IS" BASIS,
      13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14  * See the License for the specific language governing permissions and
      15  * limitations under the License.
      16  */
      17 package org.apache.rocketmq.store;
      18 
      19 import java.io.File;
      20 import java.io.IOException;
      21 import java.io.RandomAccessFile;
      22 import java.net.Inet6Address;
      23 import java.net.InetSocketAddress;
      24 import java.net.SocketAddress;
      25 import java.nio.ByteBuffer;
      26 import java.nio.channels.FileLock;
      27 import java.util.Collections;
      28 import java.util.HashMap;
      29 import java.util.Iterator;
      30 import java.util.LinkedList;
      31 import java.util.Map;
      32 import java.util.Map.Entry;
      33 import java.util.Set;
      34 import java.util.concurrent.CompletableFuture;
      35 import java.util.concurrent.ConcurrentHashMap;
      36 import java.util.concurrent.ConcurrentMap;
      37 import java.util.concurrent.Executors;
      38 import java.util.concurrent.ScheduledExecutorService;
      39 import java.util.concurrent.TimeUnit;
      40 import java.util.concurrent.atomic.AtomicLong;
      41 import org.apache.rocketmq.common.BrokerConfig;
      42 import org.apache.rocketmq.common.MixAll;
      43 import org.apache.rocketmq.common.ServiceThread;
      44 import org.apache.rocketmq.common.SystemClock;
      45 import org.apache.rocketmq.common.ThreadFactoryImpl;
      46 import org.apache.rocketmq.common.UtilAll;
      47 import org.apache.rocketmq.common.constant.LoggerName;
      48 import org.apache.rocketmq.common.message.MessageDecoder;
      49 import org.apache.rocketmq.common.message.MessageExt;
      50 import org.apache.rocketmq.common.message.MessageExtBatch;
      51 import org.apache.rocketmq.common.running.RunningStats;
      52 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
      53 import org.apache.rocketmq.common.topic.TopicValidator;
      54 import org.apache.rocketmq.logging.InternalLogger;
      55 import org.apache.rocketmq.logging.InternalLoggerFactory;
      56 import org.apache.rocketmq.store.config.BrokerRole;
      57 import org.apache.rocketmq.store.config.MessageStoreConfig;
      58 import org.apache.rocketmq.store.config.StorePathConfigHelper;
      59 import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
      60 import org.apache.rocketmq.store.ha.HAService;
      61 import org.apache.rocketmq.store.index.IndexService;
      62 import org.apache.rocketmq.store.index.QueryOffsetResult;
      63 import org.apache.rocketmq.store.schedule.ScheduleMessageService;
      64 import org.apache.rocketmq.store.stats.BrokerStatsManager;
      65 
      66 public class DefaultMessageStore implements MessageStore {
      67     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
      68 
      69     private final MessageStoreConfig messageStoreConfig;
      70     // CommitLog
      71     private final CommitLog commitLog;
      72 
      73     private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
      74 
      75     private final FlushConsumeQueueService flushConsumeQueueService;
      76 
      77     private final CleanCommitLogService cleanCommitLogService;
      78 
      79     private final CleanConsumeQueueService cleanConsumeQueueService;
      80 
      81     private final IndexService indexService;
      82 
      83     private final AllocateMappedFileService allocateMappedFileService;
      84 
      85     private final ReputMessageService reputMessageService;
      86 
      87     private final HAService haService;
      88 
      89     private final ScheduleMessageService scheduleMessageService;
      90 
      91     private final StoreStatsService storeStatsService;
      92 
      93     private final TransientStorePool transientStorePool;
      94 
      95     private final RunningFlags runningFlags = new RunningFlags();
      96     private final SystemClock systemClock = new SystemClock();
      97 
      98     private final ScheduledExecutorService scheduledExecutorService =
      99         Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
     100     private final BrokerStatsManager brokerStatsManager;
     101     private final MessageArrivingListener messageArrivingListener;
     102     private final BrokerConfig brokerConfig;
     103 
     104     private volatile boolean shutdown = true;
     105 
     106     private StoreCheckpoint storeCheckpoint;
     107 
     108     private AtomicLong printTimes = new AtomicLong(0);
     109 
     110     private final LinkedList<CommitLogDispatcher> dispatcherList;
     111 
     112     private RandomAccessFile lockFile;
     113 
     114     private FileLock lock;
     115 
     116     boolean shutDownNormal = false;
     117 
     118     private final ScheduledExecutorService diskCheckScheduledExecutorService =
     119             Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));
     120 
     121     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
     122         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
     123         this.messageArrivingListener = messageArrivingListener;
     124         this.brokerConfig = brokerConfig;
     125         this.messageStoreConfig = messageStoreConfig;
     126         this.brokerStatsManager = brokerStatsManager;
     127         this.allocateMappedFileService = new AllocateMappedFileService(this);
     128         if (messageStoreConfig.isEnableDLegerCommitLog()) {
     129             this.commitLog = new DLedgerCommitLog(this);
     130         } else {
     131             this.commitLog = new CommitLog(this);
     132         }
     133         this.consumeQueueTable = new ConcurrentHashMap<>(32);
     134 
     135         this.flushConsumeQueueService = new FlushConsumeQueueService();
     136         this.cleanCommitLogService = new CleanCommitLogService();
     137         this.cleanConsumeQueueService = new CleanConsumeQueueService();
     138         this.storeStatsService = new StoreStatsService();
     139         this.indexService = new IndexService(this);
     140         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
     141             this.haService = new HAService(this);
     142         } else {
     143             this.haService = null;
     144         }
     145         this.reputMessageService = new ReputMessageService();
     146 
     147         this.scheduleMessageService = new ScheduleMessageService(this);
     148 
     149         this.transientStorePool = new TransientStorePool(messageStoreConfig);
     150 
     151         if (messageStoreConfig.isTransientStorePoolEnable()) {
     152             this.transientStorePool.init();
     153         }
     154 
     155         this.allocateMappedFileService.start();
     156 
     157         this.indexService.start();
     158 
     159         this.dispatcherList = new LinkedList<>();
     160         this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
     161         this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
     162 
     163         File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
     164         MappedFile.ensureDirOK(file.getParent());
     165         lockFile = new RandomAccessFile(file, "rw");
     166     }
     167 
     168     public void truncateDirtyLogicFiles(long phyOffset) {
     169         ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
     170 
     171         for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
     172             for (ConsumeQueue logic : maps.values()) {
     173                 logic.truncateDirtyLogicFiles(phyOffset);
     174             }
     175         }
     176     }
     177 
     178     /**
     179      * @throws IOException
     180      */
     181     public boolean load() {
     182         boolean result = true;
     183 
     184         try {
     185             boolean lastExitOK = !this.isTempFileExist();
     186             log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
     187 
     188             if (null != scheduleMessageService) {
     189                 result = result && this.scheduleMessageService.load();
     190             }
     191 
     192             // load Commit Log
     193             result = result && this.commitLog.load();
     194 
     195             // load Consume Queue
     196             result = result && this.loadConsumeQueue();
     197 
     198             if (result) {
     199                 this.storeCheckpoint =
     200                     new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
     201 
     202                 this.indexService.load(lastExitOK);
     203 
     204                 this.recover(lastExitOK);
     205 
     206                 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
     207             }
     208         } catch (Exception e) {
     209             log.error("load exception", e);
     210             result = false;
     211         }
     212 
     213         if (!result) {
     214             this.allocateMappedFileService.shutdown();
     215         }
     216 
     217         return result;
     218     }
     219 
     220     /**
     221      * @throws Exception
     222      */
     223     public void start() throws Exception {
     224 
     225         lock = lockFile.getChannel().tryLock(0, 1, false);
     226         if (lock == null || lock.isShared() || !lock.isValid()) {
     227             throw new RuntimeException("Lock failed,MQ already started");
     228         }
     229 
     230         lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
     231         lockFile.getChannel().force(true);
     232         {
     233             /**
     234              * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
     235              * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
     236              * 3. Calculate the reput offset according to the consume queue;
     237              * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
     238              */
     239             long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
     240             for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
     241                 for (ConsumeQueue logic : maps.values()) {
     242                     if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
     243                         maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
     244                     }
     245                 }
     246             }
     247             if (maxPhysicalPosInLogicQueue < 0) {
     248                 maxPhysicalPosInLogicQueue = 0;
     249             }
     250             if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
     251                 maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
     252                 /**
     253                  * This happens in following conditions:
     254                  * 1. If someone removes all the consumequeue files or the disk get damaged.
     255                  * 2. Launch a new broker, and copy the commitlog from other brokers.
     256                  *
     257                  * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
     258                  * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
     259                  */
     260                 log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
     261             }
     262             log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
     263                 maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
     264             this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
     265             this.reputMessageService.start();
     266 
     267             /**
     268              *  1. Finish dispatching the messages fall behind, then to start other services.
     269              *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
     270              */
     271             while (true) {
     272                 if (dispatchBehindBytes() <= 0) {
     273                     break;
     274                 }
     275                 Thread.sleep(1000);
     276                 log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
     277             }
     278             this.recoverTopicQueueTable();
     279         }
     280 
     281         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
     282             this.haService.start();
     283             this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
     284         }
     285 
     286         this.flushConsumeQueueService.start();
     287         this.commitLog.start();
     288         this.storeStatsService.start();
     289 
     290         this.createTempFile();
     291         this.addScheduleTask();
     292         this.shutdown = false;
     293     }
     294 
     295     public void shutdown() {
     296         if (!this.shutdown) {
     297             this.shutdown = true;
     298 
     299             this.scheduledExecutorService.shutdown();
     300             this.diskCheckScheduledExecutorService.shutdown();
     301             try {
     302 
     303                 Thread.sleep(1000);
     304             } catch (InterruptedException e) {
     305                 log.error("shutdown Exception, ", e);
     306             }
     307 
     308             if (this.scheduleMessageService != null) {
     309                 this.scheduleMessageService.shutdown();
     310             }
     311             if (this.haService != null) {
     312                 this.haService.shutdown();
     313             }
     314 
     315             this.storeStatsService.shutdown();
     316             this.indexService.shutdown();
     317             this.commitLog.shutdown();
     318             this.reputMessageService.shutdown();
     319             this.flushConsumeQueueService.shutdown();
     320             this.allocateMappedFileService.shutdown();
     321             this.storeCheckpoint.flush();
     322             this.storeCheckpoint.shutdown();
     323 
     324             if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
     325                 this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
     326                 shutDownNormal = true;
     327             } else {
     328                 log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
     329             }
     330         }
     331 
     332         this.transientStorePool.destroy();
     333 
     334         if (lockFile != null && lock != null) {
     335             try {
     336                 lock.release();
     337                 lockFile.close();
     338             } catch (IOException e) {
     339             }
     340         }
     341     }
     342 
     343     public void destroy() {
     344         this.destroyLogics();
     345         this.commitLog.destroy();
     346         this.indexService.destroy();
     347         this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
     348         this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
     349     }
     350 
     351     public void destroyLogics() {
     352         for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
     353             for (ConsumeQueue logic : maps.values()) {
     354                 logic.destroy();
     355             }
     356         }
     357     }
     358 
     359     private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
     360         if (msg.getTopic().length() > Byte.MAX_VALUE) {
     361             log.warn("putMessage message topic length too long " + msg.getTopic().length());
     362             return PutMessageStatus.MESSAGE_ILLEGAL;
     363         }
     364 
     365         if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
     366             log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
     367             return PutMessageStatus.MESSAGE_ILLEGAL;
     368         }
     369         return PutMessageStatus.PUT_OK;
     370     }
     371 
     372     private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
     373         if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
     374             log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
     375             return PutMessageStatus.MESSAGE_ILLEGAL;
     376         }
     377 
     378         if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
     379             log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
     380             return PutMessageStatus.MESSAGE_ILLEGAL;
     381         }
     382 
     383         return PutMessageStatus.PUT_OK;
     384     }
     385 
     386     private PutMessageStatus checkStoreStatus() {
     387         if (this.shutdown) {
     388             log.warn("message store has shutdown, so putMessage is forbidden");
     389             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
     390         }
     391 
     392         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
     393             long value = this.printTimes.getAndIncrement();
     394             if ((value % 50000) == 0) {
     395                 log.warn("broke role is slave, so putMessage is forbidden");
     396             }
     397             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
     398         }
     399 
     400         if (!this.runningFlags.isWriteable()) {
     401             long value = this.printTimes.getAndIncrement();
     402             if ((value % 50000) == 0) {
     403                 log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
     404                     "the broker's disk is full, write to logic queue error, write to index file error, etc");
     405             }
     406             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
     407         } else {
     408             this.printTimes.set(0);
     409         }
     410 
     411         if (this.isOSPageCacheBusy()) {
     412             return PutMessageStatus.OS_PAGECACHE_BUSY;
     413         }
     414         return PutMessageStatus.PUT_OK;
     415     }
     416 
     417     @Override
     418     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
     419         PutMessageStatus checkStoreStatus = this.checkStoreStatus();
     420         if (checkStoreStatus != PutMessageStatus.PUT_OK) {
     421             return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
     422         }
     423 
     424         PutMessageStatus msgCheckStatus = this.checkMessage(msg);
     425         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
     426             return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
     427         }
     428 
     429         long beginTime = this.getSystemClock().now();
     430         CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
     431 
     432         putResultFuture.thenAccept((result) -> {
     433             long elapsedTime = this.getSystemClock().now() - beginTime;
     434             if (elapsedTime > 500) {
     435                 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
     436             }
     437             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
     438 
     439             if (null == result || !result.isOk()) {
     440                 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
     441             }
     442         });
     443 
     444         return putResultFuture;
     445     }
     446 
     447     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
     448         PutMessageStatus checkStoreStatus = this.checkStoreStatus();
     449         if (checkStoreStatus != PutMessageStatus.PUT_OK) {
     450             return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
     451         }
     452 
     453         PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
     454         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
     455             return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
     456         }
     457 
     458         long beginTime = this.getSystemClock().now();
     459         CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
     460 
     461         resultFuture.thenAccept((result) -> {
     462             long elapsedTime = this.getSystemClock().now() - beginTime;
     463             if (elapsedTime > 500) {
     464                 log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
     465             }
     466 
     467             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
     468 
     469             if (null == result || !result.isOk()) {
     470                 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
     471             }
     472         });
     473 
     474         return resultFuture;
     475     }
     476 
     477     @Override
     478     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
     479         PutMessageStatus checkStoreStatus = this.checkStoreStatus();
     480         if (checkStoreStatus != PutMessageStatus.PUT_OK) {
     481             return new PutMessageResult(checkStoreStatus, null);
     482         }
     483 
     484         PutMessageStatus msgCheckStatus = this.checkMessage(msg);
     485         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
     486             return new PutMessageResult(msgCheckStatus, null);
     487         }
     488 
     489         long beginTime = this.getSystemClock().now();
     490         PutMessageResult result = this.commitLog.putMessage(msg);
     491         long elapsedTime = this.getSystemClock().now() - beginTime;
     492         if (elapsedTime > 500) {
     493             log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
     494         }
     495 
     496         this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
     497 
     498         if (null == result || !result.isOk()) {
     499             this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
     500         }
     501 
     502         return result;
     503     }
     504 
     505     @Override
     506     public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
     507         PutMessageStatus checkStoreStatus = this.checkStoreStatus();
     508         if (checkStoreStatus != PutMessageStatus.PUT_OK) {
     509             return new PutMessageResult(checkStoreStatus, null);
     510         }
     511 
     512         PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
     513         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
     514             return new PutMessageResult(msgCheckStatus, null);
     515         }
     516 
     517         long beginTime = this.getSystemClock().now();
     518         PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
     519         long elapsedTime = this.getSystemClock().now() - beginTime;
     520         if (elapsedTime > 500) {
     521             log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
     522         }
     523 
     524         this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
     525 
     526         if (null == result || !result.isOk()) {
     527             this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
     528         }
     529 
     530         return result;
     531     }
     532 
     533     @Override
     534     public boolean isOSPageCacheBusy() {
     535         long begin = this.getCommitLog().getBeginTimeInLock();
     536         long diff = this.systemClock.now() - begin;
     537 
     538         return diff < 10000000
     539             && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
     540     }
     541 
     542     @Override
     543     public long lockTimeMills() {
     544         return this.commitLog.lockTimeMills();
     545     }
     546 
     547     public SystemClock getSystemClock() {
     548         return systemClock;
     549     }
     550 
     551     public CommitLog getCommitLog() {
     552         return commitLog;
     553     }
     554 
     555     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
     556         final int maxMsgNums,
     557         final MessageFilter messageFilter) {
     558         if (this.shutdown) {
     559             log.warn("message store has shutdown, so getMessage is forbidden");
     560             return null;
     561         }
     562 
     563         if (!this.runningFlags.isReadable()) {
     564             log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
     565             return null;
     566         }
     567 
     568         long beginTime = this.getSystemClock().now();
     569 
     570         GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
     571         long nextBeginOffset = offset;
     572         long minOffset = 0;
     573         long maxOffset = 0;
     574 
     575         GetMessageResult getResult = new GetMessageResult();
     576 
     577         final long maxOffsetPy = this.commitLog.getMaxOffset();
     578 
     579         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
     580         if (consumeQueue != null) {
     581             minOffset = consumeQueue.getMinOffsetInQueue();
     582             maxOffset = consumeQueue.getMaxOffsetInQueue();
     583 
     584             if (maxOffset == 0) {
     585                 status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
     586                 nextBeginOffset = nextOffsetCorrection(offset, 0);
     587             } else if (offset < minOffset) {
     588                 status = GetMessageStatus.OFFSET_TOO_SMALL;
     589                 nextBeginOffset = nextOffsetCorrection(offset, minOffset);
     590             } else if (offset == maxOffset) {
     591                 status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
     592                 nextBeginOffset = nextOffsetCorrection(offset, offset);
     593             } else if (offset > maxOffset) {
     594                 status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
     595                 if (0 == minOffset) {
     596                     nextBeginOffset = nextOffsetCorrection(offset, minOffset);
     597                 } else {
     598                     nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
     599                 }
     600             } else {
     601                 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
     602                 if (bufferConsumeQueue != null) {
     603                     try {
     604                         status = GetMessageStatus.NO_MATCHED_MESSAGE;
     605 
     606                         long nextPhyFileStartOffset = Long.MIN_VALUE;
     607                         long maxPhyOffsetPulling = 0;
     608 
     609                         int i = 0;
     610                         final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
     611                         final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
     612                         ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
     613                         for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
     614                             long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
     615                             int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
     616                             long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
     617 
     618                             maxPhyOffsetPulling = offsetPy;
     619 
     620                             if (nextPhyFileStartOffset != Long.MIN_VALUE) {
     621                                 if (offsetPy < nextPhyFileStartOffset)
     622                                     continue;
     623                             }
     624 
     625                             boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
     626 
     627                             if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
     628                                 isInDisk)) {
     629                                 break;
     630                             }
     631 
     632                             boolean extRet = false, isTagsCodeLegal = true;
     633                             if (consumeQueue.isExtAddr(tagsCode)) {
     634                                 extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
     635                                 if (extRet) {
     636                                     tagsCode = cqExtUnit.getTagsCode();
     637                                 } else {
     638                                     // can't find ext content.Client will filter messages by tag also.
     639                                     log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
     640                                         tagsCode, offsetPy, sizePy, topic, group);
     641                                     isTagsCodeLegal = false;
     642                                 }
     643                             }
     644 
     645                             if (messageFilter != null
     646                                 && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
     647                                 if (getResult.getBufferTotalSize() == 0) {
     648                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
     649                                 }
     650 
     651                                 continue;
     652                             }
     653 
     654                             SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
     655                             if (null == selectResult) {
     656                                 if (getResult.getBufferTotalSize() == 0) {
     657                                     status = GetMessageStatus.MESSAGE_WAS_REMOVING;
     658                                 }
     659 
     660                                 nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
     661                                 continue;
     662                             }
     663 
     664                             if (messageFilter != null
     665                                 && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
     666                                 if (getResult.getBufferTotalSize() == 0) {
     667                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
     668                                 }
     669                                 // release...
     670                                 selectResult.release();
     671                                 continue;
     672                             }
     673 
     674                             this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
     675                             getResult.addMessage(selectResult);
     676                             status = GetMessageStatus.FOUND;
     677                             nextPhyFileStartOffset = Long.MIN_VALUE;
     678                         }
     679 
     680                         if (diskFallRecorded) {
     681                             long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
     682                             brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
     683                         }
     684 
     685                         nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
     686 
     687                         long diff = maxOffsetPy - maxPhyOffsetPulling;
     688                         long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
     689                             * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
     690                         getResult.setSuggestPullingFromSlave(diff > memory);
     691                     } finally {
     692 
     693                         bufferConsumeQueue.release();
     694                     }
     695                 } else {
     696                     status = GetMessageStatus.OFFSET_FOUND_NULL;
     697                     nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
     698                     log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
     699                         + maxOffset + ", but access logic queue failed.");
     700                 }
     701             }
     702         } else {
     703             status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
     704             nextBeginOffset = nextOffsetCorrection(offset, 0);
     705         }
     706 
     707         if (GetMessageStatus.FOUND == status) {
     708             this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
     709         } else {
     710             this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
     711         }
     712         long elapsedTime = this.getSystemClock().now() - beginTime;
     713         this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
     714 
     715         getResult.setStatus(status);
     716         getResult.setNextBeginOffset(nextBeginOffset);
     717         getResult.setMaxOffset(maxOffset);
     718         getResult.setMinOffset(minOffset);
     719         return getResult;
     720     }
     721 
     722     public long getMaxOffsetInQueue(String topic, int queueId) {
     723         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
     724         if (logic != null) {
     725             long offset = logic.getMaxOffsetInQueue();
     726             return offset;
     727         }
     728 
     729         return 0;
     730     }
     731 
     732     public long getMinOffsetInQueue(String topic, int queueId) {
     733         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
     734         if (logic != null) {
     735             return logic.getMinOffsetInQueue();
     736         }
     737 
     738         return -1;
     739     }
     740 
     741     @Override
     742     public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
     743         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
     744         if (consumeQueue != null) {
     745             SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeQueueOffset);
     746             if (bufferConsumeQueue != null) {
     747                 try {
     748                     long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
     749                     return offsetPy;
     750                 } finally {
     751                     bufferConsumeQueue.release();
     752                 }
     753             }
     754         }
     755 
     756         return 0;
     757     }
     758 
     759     public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
     760         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
     761         if (logic != null) {
     762             return logic.getOffsetInQueueByTime(timestamp);
     763         }
     764 
     765         return 0;
     766     }
     767 
     768     public MessageExt lookMessageByOffset(long commitLogOffset) {
     769         SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
     770         if (null != sbr) {
     771             try {
     772                 // 1 TOTALSIZE
     773                 int size = sbr.getByteBuffer().getInt();
     774                 return lookMessageByOffset(commitLogOffset, size);
     775             } finally {
     776                 sbr.release();
     777             }
     778         }
     779 
     780         return null;
     781     }
     782 
     783     @Override
     784     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
     785         SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
     786         if (null != sbr) {
     787             try {
     788                 // 1 TOTALSIZE
     789                 int size = sbr.getByteBuffer().getInt();
     790                 return this.commitLog.getMessage(commitLogOffset, size);
     791             } finally {
     792                 sbr.release();
     793             }
     794         }
     795 
     796         return null;
     797     }
     798 
     799     @Override
     800     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
     801         return this.commitLog.getMessage(commitLogOffset, msgSize);
     802     }
     803 
     804     public String getRunningDataInfo() {
     805         return this.storeStatsService.toString();
     806     }
     807 
     808     private String getStorePathPhysic() {
     809         String storePathPhysic = "";
     810         if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) {
     811             storePathPhysic = ((DLedgerCommitLog)DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath();
     812         } else {
     813             storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
     814         }
     815         return storePathPhysic;
     816     }
     817 
     818     @Override
     819     public HashMap<String, String> getRuntimeInfo() {
     820         HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
     821 
     822         {
     823             double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
     824             result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
     825 
     826         }
     827 
     828         {
     829 
     830             String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
     831             double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
     832             result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
     833         }
     834 
     835         {
     836             if (this.scheduleMessageService != null) {
     837                 this.scheduleMessageService.buildRunningStats(result);
     838             }
     839         }
     840 
     841         result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
     842         result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
     843 
     844         return result;
     845     }
     846 
     847     @Override
     848     public long getMaxPhyOffset() {
     849         return this.commitLog.getMaxOffset();
     850     }
     851 
     852     @Override
     853     public long getMinPhyOffset() {
     854         return this.commitLog.getMinOffset();
     855     }
     856 
     857     @Override
     858     public long getEarliestMessageTime(String topic, int queueId) {
     859         ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
     860         if (logicQueue != null) {
     861             long minLogicOffset = logicQueue.getMinLogicOffset();
     862 
     863             SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE);
     864             return getStoreTime(result);
     865         }
     866 
     867         return -1;
     868     }
     869 
     870     private long getStoreTime(SelectMappedBufferResult result) {
     871         if (result != null) {
     872             try {
     873                 final long phyOffset = result.getByteBuffer().getLong();
     874                 final int size = result.getByteBuffer().getInt();
     875                 long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
     876                 return storeTime;
     877             } catch (Exception e) {
     878             } finally {
     879                 result.release();
     880             }
     881         }
     882         return -1;
     883     }
     884 
     885     @Override
     886     public long getEarliestMessageTime() {
     887         final long minPhyOffset = this.getMinPhyOffset();
     888         final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
     889         return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
     890     }
     891 
     892     @Override
     893     public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
     894         ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
     895         if (logicQueue != null) {
     896             SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
     897             return getStoreTime(result);
     898         }
     899 
     900         return -1;
     901     }
     902 
     903     @Override
     904     public long getMessageTotalInQueue(String topic, int queueId) {
     905         ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
     906         if (logicQueue != null) {
     907             return logicQueue.getMessageTotalInQueue();
     908         }
     909 
     910         return -1;
     911     }
     912 
     913     @Override
     914     public SelectMappedBufferResult getCommitLogData(final long offset) {
     915         if (this.shutdown) {
     916             log.warn("message store has shutdown, so getPhyQueueData is forbidden");
     917             return null;
     918         }
     919 
     920         return this.commitLog.getData(offset);
     921     }
     922 
     923     @Override
     924     public boolean appendToCommitLog(long startOffset, byte[] data) {
     925         if (this.shutdown) {
     926             log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
     927             return false;
     928         }
     929 
     930         boolean result = this.commitLog.appendData(startOffset, data);
     931         if (result) {
     932             this.reputMessageService.wakeup();
     933         } else {
     934             log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
     935         }
     936 
     937         return result;
     938     }
     939 
     940     @Override
     941     public void executeDeleteFilesManually() {
     942         this.cleanCommitLogService.excuteDeleteFilesManualy();
     943     }
     944 
     945     @Override
     946     public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
     947         QueryMessageResult queryMessageResult = new QueryMessageResult();
     948 
     949         long lastQueryMsgTime = end;
     950 
     951         for (int i = 0; i < 3; i++) {
     952             QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
     953             if (queryOffsetResult.getPhyOffsets().isEmpty()) {
     954                 break;
     955             }
     956 
     957             Collections.sort(queryOffsetResult.getPhyOffsets());
     958 
     959             queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
     960             queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
     961 
     962             for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
     963                 long offset = queryOffsetResult.getPhyOffsets().get(m);
     964 
     965                 try {
     966 
     967                     boolean match = true;
     968                     MessageExt msg = this.lookMessageByOffset(offset);
     969                     if (0 == m) {
     970                         lastQueryMsgTime = msg.getStoreTimestamp();
     971                     }
     972 
     973 //                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
     974 //                    if (topic.equals(msg.getTopic())) {
     975 //                        for (String k : keyArray) {
     976 //                            if (k.equals(key)) {
     977 //                                match = true;
     978 //                                break;
     979 //                            }
     980 //                        }
     981 //                    }
     982 
     983                     if (match) {
     984                         SelectMappedBufferResult result = this.commitLog.getData(offset, false);
     985                         if (result != null) {
     986                             int size = result.getByteBuffer().getInt(0);
     987                             result.getByteBuffer().limit(size);
     988                             result.setSize(size);
     989                             queryMessageResult.addMessage(result);
     990                         }
     991                     } else {
     992                         log.warn("queryMessage hash duplicate, {} {}", topic, key);
     993                     }
     994                 } catch (Exception e) {
     995                     log.error("queryMessage exception", e);
     996                 }
     997             }
     998 
     999             if (queryMessageResult.getBufferTotalSize() > 0) {
    1000                 break;
    1001             }
    1002 
    1003             if (lastQueryMsgTime < begin) {
    1004                 break;
    1005             }
    1006         }
    1007 
    1008         return queryMessageResult;
    1009     }
    1010 
    1011     @Override
    1012     public void updateHaMasterAddress(String newAddr) {
    1013         this.haService.updateMasterAddress(newAddr);
    1014     }
    1015 
    1016     @Override
    1017     public long slaveFallBehindMuch() {
    1018         return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
    1019     }
    1020 
    1021     @Override
    1022     public long now() {
    1023         return this.systemClock.now();
    1024     }
    1025 
    1026     @Override
    1027     public int cleanUnusedTopic(Set<String> topics) {
    1028         Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
    1029         while (it.hasNext()) {
    1030             Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
    1031             String topic = next.getKey();
    1032 
    1033             if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
    1034                 ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
    1035                 for (ConsumeQueue cq : queueTable.values()) {
    1036                     cq.destroy();
    1037                     log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
    1038                         cq.getTopic(),
    1039                         cq.getQueueId()
    1040                     );
    1041 
    1042                     this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
    1043                 }
    1044                 it.remove();
    1045 
    1046                 if (this.brokerConfig.isAutoDeleteUnusedStats()) {
    1047                     this.brokerStatsManager.onTopicDeleted(topic);
    1048                 }
    1049 
    1050                 log.info("cleanUnusedTopic: {},topic destroyed", topic);
    1051             }
    1052         }
    1053 
    1054         return 0;
    1055     }
    1056 
    1057     public void cleanExpiredConsumerQueue() {
    1058         long minCommitLogOffset = this.commitLog.getMinOffset();
    1059 
    1060         Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
    1061         while (it.hasNext()) {
    1062             Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
    1063             String topic = next.getKey();
    1064             if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
    1065                 ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
    1066                 Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
    1067                 while (itQT.hasNext()) {
    1068                     Entry<Integer, ConsumeQueue> nextQT = itQT.next();
    1069                     long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
    1070 
    1071                     if (maxCLOffsetInConsumeQueue == -1) {
    1072                         log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
    1073                             nextQT.getValue().getTopic(),
    1074                             nextQT.getValue().getQueueId(),
    1075                             nextQT.getValue().getMaxPhysicOffset(),
    1076                             nextQT.getValue().getMinLogicOffset());
    1077                     } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
    1078                         log.info(
    1079                             "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
    1080                             topic,
    1081                             nextQT.getKey(),
    1082                             minCommitLogOffset,
    1083                             maxCLOffsetInConsumeQueue);
    1084 
    1085                         DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
    1086                             nextQT.getValue().getQueueId());
    1087 
    1088                         nextQT.getValue().destroy();
    1089                         itQT.remove();
    1090                     }
    1091                 }
    1092 
    1093                 if (queueTable.isEmpty()) {
    1094                     log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
    1095                     it.remove();
    1096                 }
    1097             }
    1098         }
    1099     }
    1100 
    1101     public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
    1102         SocketAddress storeHost) {
    1103         Map<String, Long> messageIds = new HashMap<String, Long>();
    1104         if (this.shutdown) {
    1105             return messageIds;
    1106         }
    1107 
    1108         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    1109         if (consumeQueue != null) {
    1110             minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQueue());
    1111             maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQueue());
    1112 
    1113             if (maxOffset == 0) {
    1114                 return messageIds;
    1115             }
    1116 
    1117             long nextOffset = minOffset;
    1118             while (nextOffset < maxOffset) {
    1119                 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset);
    1120                 if (bufferConsumeQueue != null) {
    1121                     try {
    1122                         int i = 0;
    1123                         for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    1124                             long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
    1125                             InetSocketAddress inetSocketAddress = (InetSocketAddress) storeHost;
    1126                             int msgIdLength = (inetSocketAddress.getAddress() instanceof Inet6Address) ? 16 + 4 + 8 : 4 + 4 + 8;
    1127                             final ByteBuffer msgIdMemory = ByteBuffer.allocate(msgIdLength);
    1128                             String msgId =
    1129                                 MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
    1130                             messageIds.put(msgId, nextOffset++);
    1131                             if (nextOffset > maxOffset) {
    1132                                 return messageIds;
    1133                             }
    1134                         }
    1135                     } finally {
    1136 
    1137                         bufferConsumeQueue.release();
    1138                     }
    1139                 } else {
    1140                     return messageIds;
    1141                 }
    1142             }
    1143         }
    1144         return messageIds;
    1145     }
    1146 
    1147     @Override
    1148     public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {
    1149 
    1150         final long maxOffsetPy = this.commitLog.getMaxOffset();
    1151 
    1152         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    1153         if (consumeQueue != null) {
    1154             SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);
    1155             if (bufferConsumeQueue != null) {
    1156                 try {
    1157                     for (int i = 0; i < bufferConsumeQueue.getSize(); ) {
    1158                         i += ConsumeQueue.CQ_STORE_UNIT_SIZE;
    1159                         long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
    1160                         return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
    1161                     }
    1162                 } finally {
    1163 
    1164                     bufferConsumeQueue.release();
    1165                 }
    1166             } else {
    1167                 return false;
    1168             }
    1169         }
    1170         return false;
    1171     }
    1172 
    1173     @Override
    1174     public long dispatchBehindBytes() {
    1175         return this.reputMessageService.behind();
    1176     }
    1177 
    1178     @Override
    1179     public long flush() {
    1180         return this.commitLog.flush();
    1181     }
    1182 
    1183     @Override
    1184     public boolean resetWriteOffset(long phyOffset) {
    1185         return this.commitLog.resetOffset(phyOffset);
    1186     }
    1187 
    1188     @Override
    1189     public long getConfirmOffset() {
    1190         return this.commitLog.getConfirmOffset();
    1191     }
    1192 
    1193     @Override
    1194     public void setConfirmOffset(long phyOffset) {
    1195         this.commitLog.setConfirmOffset(phyOffset);
    1196     }
    1197 
    1198     public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
    1199         SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
    1200         if (null != sbr) {
    1201             try {
    1202                 return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
    1203             } finally {
    1204                 sbr.release();
    1205             }
    1206         }
    1207 
    1208         return null;
    1209     }
    1210 
    1211     public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    1212         ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    1213         if (null == map) {
    1214             ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
    1215             ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
    1216             if (oldMap != null) {
    1217                 map = oldMap;
    1218             } else {
    1219                 map = newMap;
    1220             }
    1221         }
    1222 
    1223         ConsumeQueue logic = map.get(queueId);
    1224         if (null == logic) {
    1225             ConsumeQueue newLogic = new ConsumeQueue(
    1226                 topic,
    1227                 queueId,
    1228                 StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
    1229                 this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
    1230                 this);
    1231             ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
    1232             if (oldLogic != null) {
    1233                 logic = oldLogic;
    1234             } else {
    1235                 logic = newLogic;
    1236             }
    1237         }
    1238 
    1239         return logic;
    1240     }
    1241 
    1242     private long nextOffsetCorrection(long oldOffset, long newOffset) {
    1243         long nextOffset = oldOffset;
    1244         if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
    1245             nextOffset = newOffset;
    1246         }
    1247         return nextOffset;
    1248     }
    1249 
    1250     private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
    1251         long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    1252         return (maxOffsetPy - offsetPy) > memory;
    1253     }
    1254 
    1255     private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
    1256 
    1257         if (0 == bufferTotal || 0 == messageTotal) {
    1258             return false;
    1259         }
    1260 
    1261         if (maxMsgNums <= messageTotal) {
    1262             return true;
    1263         }
    1264 
    1265         if (isInDisk) {
    1266             if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
    1267                 return true;
    1268             }
    1269 
    1270             if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
    1271                 return true;
    1272             }
    1273         } else {
    1274             if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
    1275                 return true;
    1276             }
    1277 
    1278             if (messageTotal > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
    1279                 return true;
    1280             }
    1281         }
    1282 
    1283         return false;
    1284     }
    1285 
    1286     private void deleteFile(final String fileName) {
    1287         File file = new File(fileName);
    1288         boolean result = file.delete();
    1289         log.info(fileName + (result ? " delete OK" : " delete Failed"));
    1290     }
    1291 
    1292     /**
    1293      * @throws IOException
    1294      */
    1295     private void createTempFile() throws IOException {
    1296         String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    1297         File file = new File(fileName);
    1298         MappedFile.ensureDirOK(file.getParent());
    1299         boolean result = file.createNewFile();
    1300         log.info(fileName + (result ? " create OK" : " already exists"));
    1301     }
    1302 
    1303     private void addScheduleTask() {
    1304 
    1305         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    1306             @Override
    1307             public void run() {
    1308                 DefaultMessageStore.this.cleanFilesPeriodically();
    1309             }
    1310         }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    1311 
    1312         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    1313             @Override
    1314             public void run() {
    1315                 DefaultMessageStore.this.checkSelf();
    1316             }
    1317         }, 1, 10, TimeUnit.MINUTES);
    1318 
    1319         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    1320             @Override
    1321             public void run() {
    1322                 if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
    1323                     try {
    1324                         if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
    1325                             long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
    1326                             if (lockTime > 1000 && lockTime < 10000000) {
    1327 
    1328                                 String stack = UtilAll.jstack();
    1329                                 final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
    1330                                     + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
    1331                                 MixAll.string2FileNotSafe(stack, fileName);
    1332                             }
    1333                         }
    1334                     } catch (Exception e) {
    1335                     }
    1336                 }
    1337             }
    1338         }, 1, 1, TimeUnit.SECONDS);
    1339 
    1340         // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    1341         // @Override
    1342         // public void run() {
    1343         // DefaultMessageStore.this.cleanExpiredConsumerQueue();
    1344         // }
    1345         // }, 1, 1, TimeUnit.HOURS);
    1346         this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    1347             public void run() {
    1348                 DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
    1349             }
    1350         }, 1000L, 10000L, TimeUnit.MILLISECONDS);
    1351     }
    1352 
    1353     private void cleanFilesPeriodically() {
    1354         this.cleanCommitLogService.run();
    1355         this.cleanConsumeQueueService.run();
    1356     }
    1357 
    1358     private void checkSelf() {
    1359         this.commitLog.checkSelf();
    1360 
    1361         Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
    1362         while (it.hasNext()) {
    1363             Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
    1364             Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator();
    1365             while (itNext.hasNext()) {
    1366                 Entry<Integer, ConsumeQueue> cq = itNext.next();
    1367                 cq.getValue().checkSelf();
    1368             }
    1369         }
    1370     }
    1371 
    1372     private boolean isTempFileExist() {
    1373         String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    1374         File file = new File(fileName);
    1375         return file.exists();
    1376     }
    1377 
    1378     private boolean loadConsumeQueue() {
    1379         File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    1380         File[] fileTopicList = dirLogic.listFiles();
    1381         if (fileTopicList != null) {
    1382 
    1383             for (File fileTopic : fileTopicList) {
    1384                 String topic = fileTopic.getName();
    1385 
    1386                 File[] fileQueueIdList = fileTopic.listFiles();
    1387                 if (fileQueueIdList != null) {
    1388                     for (File fileQueueId : fileQueueIdList) {
    1389                         int queueId;
    1390                         try {
    1391                             queueId = Integer.parseInt(fileQueueId.getName());
    1392                         } catch (NumberFormatException e) {
    1393                             continue;
    1394                         }
    1395                         ConsumeQueue logic = new ConsumeQueue(
    1396                             topic,
    1397                             queueId,
    1398                             StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
    1399                             this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
    1400                             this);
    1401                         this.putConsumeQueue(topic, queueId, logic);
    1402                         if (!logic.load()) {
    1403                             return false;
    1404                         }
    1405                     }
    1406                 }
    1407             }
    1408         }
    1409 
    1410         log.info("load logics queue all over, OK");
    1411 
    1412         return true;
    1413     }
    1414 
    1415     private void recover(final boolean lastExitOK) {
    1416         long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    1417 
    1418         if (lastExitOK) {
    1419             this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    1420         } else {
    1421             this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    1422         }
    1423 
    1424         this.recoverTopicQueueTable();
    1425     }
    1426 
    1427     public MessageStoreConfig getMessageStoreConfig() {
    1428         return messageStoreConfig;
    1429     }
    1430 
    1431     public TransientStorePool getTransientStorePool() {
    1432         return transientStorePool;
    1433     }
    1434 
    1435     private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
    1436         ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
    1437         if (null == map) {
    1438             map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
    1439             map.put(queueId, consumeQueue);
    1440             this.consumeQueueTable.put(topic, map);
    1441         } else {
    1442             map.put(queueId, consumeQueue);
    1443         }
    1444     }
    1445 
    1446     private long recoverConsumeQueue() {
    1447         long maxPhysicOffset = -1;
    1448         for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
    1449             for (ConsumeQueue logic : maps.values()) {
    1450                 logic.recover();
    1451                 if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
    1452                     maxPhysicOffset = logic.getMaxPhysicOffset();
    1453                 }
    1454             }
    1455         }
    1456 
    1457         return maxPhysicOffset;
    1458     }
    1459 
    1460     public void recoverTopicQueueTable() {
    1461         HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
    1462         long minPhyOffset = this.commitLog.getMinOffset();
    1463         for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
    1464             for (ConsumeQueue logic : maps.values()) {
    1465                 String key = logic.getTopic() + "-" + logic.getQueueId();
    1466                 table.put(key, logic.getMaxOffsetInQueue());
    1467                 logic.correctMinOffset(minPhyOffset);
    1468             }
    1469         }
    1470 
    1471         this.commitLog.setTopicQueueTable(table);
    1472     }
    1473 
    1474     public AllocateMappedFileService getAllocateMappedFileService() {
    1475         return allocateMappedFileService;
    1476     }
    1477 
    1478     public StoreStatsService getStoreStatsService() {
    1479         return storeStatsService;
    1480     }
    1481 
    1482     public RunningFlags getAccessRights() {
    1483         return runningFlags;
    1484     }
    1485 
    1486     public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
    1487         return consumeQueueTable;
    1488     }
    1489 
    1490     public StoreCheckpoint getStoreCheckpoint() {
    1491         return storeCheckpoint;
    1492     }
    1493 
    1494     public HAService getHaService() {
    1495         return haService;
    1496     }
    1497 
    1498     public ScheduleMessageService getScheduleMessageService() {
    1499         return scheduleMessageService;
    1500     }
    1501 
    1502     public RunningFlags getRunningFlags() {
    1503         return runningFlags;
    1504     }
    1505 
    1506     public void doDispatch(DispatchRequest req) {
    1507         for (CommitLogDispatcher dispatcher : this.dispatcherList) {
    1508             dispatcher.dispatch(req);
    1509         }
    1510     }
    1511 
    1512     public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    1513         ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    1514         cq.putMessagePositionInfoWrapper(dispatchRequest);
    1515     }
    1516 
    1517     @Override
    1518     public BrokerStatsManager getBrokerStatsManager() {
    1519         return brokerStatsManager;
    1520     }
    1521 
    1522     @Override
    1523     public void handleScheduleMessageService(final BrokerRole brokerRole) {
    1524         if (this.scheduleMessageService != null) {
    1525             if (brokerRole == BrokerRole.SLAVE) {
    1526                 this.scheduleMessageService.shutdown();
    1527             } else {
    1528                 this.scheduleMessageService.start();
    1529             }
    1530         }
    1531 
    1532     }
    1533 
    1534     public int remainTransientStoreBufferNumbs() {
    1535         return this.transientStorePool.availableBufferNums();
    1536     }
    1537 
    1538     @Override
    1539     public boolean isTransientStorePoolDeficient() {
    1540         return remainTransientStoreBufferNumbs() == 0;
    1541     }
    1542 
    1543     @Override
    1544     public LinkedList<CommitLogDispatcher> getDispatcherList() {
    1545         return this.dispatcherList;
    1546     }
    1547 
    1548     @Override
    1549     public ConsumeQueue getConsumeQueue(String topic, int queueId) {
    1550         ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    1551         if (map == null) {
    1552             return null;
    1553         }
    1554         return map.get(queueId);
    1555     }
    1556 
    1557     public void unlockMappedFile(final MappedFile mappedFile) {
    1558         this.scheduledExecutorService.schedule(new Runnable() {
    1559             @Override
    1560             public void run() {
    1561                 mappedFile.munlock();
    1562             }
    1563         }, 6, TimeUnit.SECONDS);
    1564     }
    1565 
    1566     class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    1567 
    1568         @Override
    1569         public void dispatch(DispatchRequest request) {
    1570             final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
    1571             switch (tranType) {
    1572                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
    1573                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    1574                     DefaultMessageStore.this.putMessagePositionInfo(request);
    1575                     break;
    1576                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    1577                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
    1578                     break;
    1579             }
    1580         }
    1581     }
    1582 
    1583     class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    1584 
    1585         @Override
    1586         public void dispatch(DispatchRequest request) {
    1587             if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
    1588                 DefaultMessageStore.this.indexService.buildIndex(request);
    1589             }
    1590         }
    1591     }
    1592 
    1593     class CleanCommitLogService {
    1594 
    1595         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
    1596         private final double diskSpaceWarningLevelRatio =
    1597             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
    1598 
    1599         private final double diskSpaceCleanForciblyRatio =
    1600             Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
    1601         private long lastRedeleteTimestamp = 0;
    1602 
    1603         private volatile int manualDeleteFileSeveralTimes = 0;
    1604 
    1605         private volatile boolean cleanImmediately = false;
    1606 
    1607         public void excuteDeleteFilesManualy() {
    1608             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
    1609             DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
    1610         }
    1611 
    1612         public void run() {
    1613             try {
    1614                 this.deleteExpiredFiles();
    1615 
    1616                 this.redeleteHangedFile();
    1617             } catch (Throwable e) {
    1618                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    1619             }
    1620         }
    1621 
    1622         private void deleteExpiredFiles() {
    1623             int deleteCount = 0;
    1624             long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    1625             int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    1626             int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    1627 
    1628             boolean timeup = this.isTimeToDelete();
    1629             boolean spacefull = this.isSpaceToDelete();
    1630             boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    1631 
    1632             if (timeup || spacefull || manualDelete) {
    1633 
    1634                 if (manualDelete)
    1635                     this.manualDeleteFileSeveralTimes--;
    1636 
    1637                 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
    1638 
    1639                 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
    1640                     fileReservedTime,
    1641                     timeup,
    1642                     spacefull,
    1643                     manualDeleteFileSeveralTimes,
    1644                     cleanAtOnce);
    1645 
    1646                 fileReservedTime *= 60 * 60 * 1000;
    1647 
    1648                 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
    1649                     destroyMapedFileIntervalForcibly, cleanAtOnce);
    1650                 if (deleteCount > 0) {
    1651                 } else if (spacefull) {
    1652                     log.warn("disk space will be full soon, but delete file failed.");
    1653                 }
    1654             }
    1655         }
    1656 
    1657         private void redeleteHangedFile() {
    1658             int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
    1659             long currentTimestamp = System.currentTimeMillis();
    1660             if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
    1661                 this.lastRedeleteTimestamp = currentTimestamp;
    1662                 int destroyMapedFileIntervalForcibly =
    1663                     DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    1664                 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
    1665                 }
    1666             }
    1667         }
    1668 
    1669         public String getServiceName() {
    1670             return CleanCommitLogService.class.getSimpleName();
    1671         }
    1672 
    1673         private boolean isTimeToDelete() {
    1674             String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
    1675             if (UtilAll.isItTimeToDo(when)) {
    1676                 DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
    1677                 return true;
    1678             }
    1679 
    1680             return false;
    1681         }
    1682 
    1683         private boolean isSpaceToDelete() {
    1684             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    1685 
    1686             cleanImmediately = false;
    1687 
    1688             {
    1689                 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
    1690                 if (physicRatio > diskSpaceWarningLevelRatio) {
    1691                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    1692                     if (diskok) {
    1693                         DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
    1694                     }
    1695 
    1696                     cleanImmediately = true;
    1697                 } else if (physicRatio > diskSpaceCleanForciblyRatio) {
    1698                     cleanImmediately = true;
    1699                 } else {
    1700                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    1701                     if (!diskok) {
    1702                         DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
    1703                     }
    1704                 }
    1705 
    1706                 if (physicRatio < 0 || physicRatio > ratio) {
    1707                     DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
    1708                     return true;
    1709                 }
    1710             }
    1711 
    1712             {
    1713                 String storePathLogics = StorePathConfigHelper
    1714                     .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
    1715                 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
    1716                 if (logicsRatio > diskSpaceWarningLevelRatio) {
    1717                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    1718                     if (diskok) {
    1719                         DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
    1720                     }
    1721 
    1722                     cleanImmediately = true;
    1723                 } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
    1724                     cleanImmediately = true;
    1725                 } else {
    1726                     boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    1727                     if (!diskok) {
    1728                         DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
    1729                     }
    1730                 }
    1731 
    1732                 if (logicsRatio < 0 || logicsRatio > ratio) {
    1733                     DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
    1734                     return true;
    1735                 }
    1736             }
    1737 
    1738             return false;
    1739         }
    1740 
    1741         public int getManualDeleteFileSeveralTimes() {
    1742             return manualDeleteFileSeveralTimes;
    1743         }
    1744 
    1745         public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
    1746             this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
    1747         }
    1748         public boolean isSpaceFull() {
    1749             String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    1750             double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
    1751             double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    1752             if (physicRatio > ratio) {
    1753                 DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
    1754             }
    1755             if (physicRatio > this.diskSpaceWarningLevelRatio) {
    1756                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
    1757                 if (diskok) {
    1758                     DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full");
    1759                 }
    1760 
    1761                 return true;
    1762             } else {
    1763                 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
    1764 
    1765                 if (!diskok) {
    1766                     DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok");
    1767                 }
    1768 
    1769                 return false;
    1770             }
    1771         }
    1772     }
    1773 
    1774     class CleanConsumeQueueService {
    1775         private long lastPhysicalMinOffset = 0;
    1776 
    1777         public void run() {
    1778             try {
    1779                 this.deleteExpiredFiles();
    1780             } catch (Throwable e) {
    1781                 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    1782             }
    1783         }
    1784 
    1785         private void deleteExpiredFiles() {
    1786             int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    1787 
    1788             long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    1789             if (minOffset > this.lastPhysicalMinOffset) {
    1790                 this.lastPhysicalMinOffset = minOffset;
    1791 
    1792                 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    1793 
    1794                 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
    1795                     for (ConsumeQueue logic : maps.values()) {
    1796                         int deleteCount = logic.deleteExpiredFile(minOffset);
    1797 
    1798                         if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
    1799                             try {
    1800                                 Thread.sleep(deleteLogicsFilesInterval);
    1801                             } catch (InterruptedException ignored) {
    1802                             }
    1803                         }
    1804                     }
    1805                 }
    1806 
    1807                 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    1808             }
    1809         }
    1810 
    1811         public String getServiceName() {
    1812             return CleanConsumeQueueService.class.getSimpleName();
    1813         }
    1814     }
    1815 
    1816     class FlushConsumeQueueService extends ServiceThread {
    1817         private static final int RETRY_TIMES_OVER = 3;
    1818         private long lastFlushTimestamp = 0;
    1819 
    1820         private void doFlush(int retryTimes) {
    1821             int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
    1822 
    1823             if (retryTimes == RETRY_TIMES_OVER) {
    1824                 flushConsumeQueueLeastPages = 0;
    1825             }
    1826 
    1827             long logicsMsgTimestamp = 0;
    1828 
    1829             int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
    1830             long currentTimeMillis = System.currentTimeMillis();
    1831             if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
    1832                 this.lastFlushTimestamp = currentTimeMillis;
    1833                 flushConsumeQueueLeastPages = 0;
    1834                 logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
    1835             }
    1836 
    1837             ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    1838 
    1839             for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
    1840                 for (ConsumeQueue cq : maps.values()) {
    1841                     boolean result = false;
    1842                     for (int i = 0; i < retryTimes && !result; i++) {
    1843                         result = cq.flush(flushConsumeQueueLeastPages);
    1844                     }
    1845                 }
    1846             }
    1847 
    1848             if (0 == flushConsumeQueueLeastPages) {
    1849                 if (logicsMsgTimestamp > 0) {
    1850                     DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
    1851                 }
    1852                 DefaultMessageStore.this.getStoreCheckpoint().flush();
    1853             }
    1854         }
    1855 
    1856         public void run() {
    1857             DefaultMessageStore.log.info(this.getServiceName() + " service started");
    1858 
    1859             while (!this.isStopped()) {
    1860                 try {
    1861                     int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
    1862                     this.waitForRunning(interval);
    1863                     this.doFlush(1);
    1864                 } catch (Exception e) {
    1865                     DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    1866                 }
    1867             }
    1868 
    1869             this.doFlush(RETRY_TIMES_OVER);
    1870 
    1871             DefaultMessageStore.log.info(this.getServiceName() + " service end");
    1872         }
    1873 
    1874         @Override
    1875         public String getServiceName() {
    1876             return FlushConsumeQueueService.class.getSimpleName();
    1877         }
    1878 
    1879         @Override
    1880         public long getJointime() {
    1881             return 1000 * 60;
    1882         }
    1883     }
    1884 
    1885     class ReputMessageService extends ServiceThread {
    1886 
    1887         private volatile long reputFromOffset = 0;
    1888 
    1889         public long getReputFromOffset() {
    1890             return reputFromOffset;
    1891         }
    1892 
    1893         public void setReputFromOffset(long reputFromOffset) {
    1894             this.reputFromOffset = reputFromOffset;
    1895         }
    1896 
    1897         @Override
    1898         public void shutdown() {
    1899             for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
    1900                 try {
    1901                     Thread.sleep(100);
    1902                 } catch (InterruptedException ignored) {
    1903                 }
    1904             }
    1905 
    1906             if (this.isCommitLogAvailable()) {
    1907                 log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
    1908                     DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
    1909             }
    1910 
    1911             super.shutdown();
    1912         }
    1913 
    1914         public long behind() {
    1915             return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
    1916         }
    1917 
    1918         private boolean isCommitLogAvailable() {
    1919             return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
    1920         }
    1921 
    1922         private void doReput() {
    1923             if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
    1924                 log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
    1925                     this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
    1926                 this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    1927             }
    1928             for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
    1929 
    1930                 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
    1931                     && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
    1932                     break;
    1933                 }
    1934 
    1935                 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
    1936                 if (result != null) {
    1937                     try {
    1938                         this.reputFromOffset = result.getStartOffset();
    1939 
    1940                         for (int readSize = 0; readSize < result.getSize() && doNext; ) {
    1941                             DispatchRequest dispatchRequest =
    1942                                 DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    1943                             int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    1944 
    1945                             if (dispatchRequest.isSuccess()) {
    1946                                 if (size > 0) {
    1947                                     DefaultMessageStore.this.doDispatch(dispatchRequest);
    1948 
    1949                                     if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
    1950                                         && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
    1951                                         DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
    1952                                             dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
    1953                                             dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
    1954                                             dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
    1955                                     }
    1956 
    1957                                     this.reputFromOffset += size;
    1958                                     readSize += size;
    1959                                     if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
    1960                                         DefaultMessageStore.this.storeStatsService
    1961                                             .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
    1962                                         DefaultMessageStore.this.storeStatsService
    1963                                             .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
    1964                                             .addAndGet(dispatchRequest.getMsgSize());
    1965                                     }
    1966                                 } else if (size == 0) {
    1967                                     this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
    1968                                     readSize = result.getSize();
    1969                                 }
    1970                             } else if (!dispatchRequest.isSuccess()) {
    1971 
    1972                                 if (size > 0) {
    1973                                     log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
    1974                                     this.reputFromOffset += size;
    1975                                 } else {
    1976                                     doNext = false;
    1977                                     // If user open the dledger pattern or the broker is master node,
    1978                                     // it will not ignore the exception and fix the reputFromOffset variable
    1979                                     if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
    1980                                         DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
    1981                                         log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
    1982                                             this.reputFromOffset);
    1983                                         this.reputFromOffset += result.getSize() - readSize;
    1984                                     }
    1985                                 }
    1986                             }
    1987                         }
    1988                     } finally {
    1989                         result.release();
    1990                     }
    1991                 } else {
    1992                     doNext = false;
    1993                 }
    1994             }
    1995         }
    1996 
    1997         @Override
    1998         public void run() {
    1999             DefaultMessageStore.log.info(this.getServiceName() + " service started");
    2000 
    2001             while (!this.isStopped()) {
    2002                 try {
    2003                     Thread.sleep(1);
    2004                     this.doReput();
    2005                 } catch (Exception e) {
    2006                     DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    2007                 }
    2008             }
    2009 
    2010             DefaultMessageStore.log.info(this.getServiceName() + " service end");
    2011         }
    2012 
    2013         @Override
    2014         public String getServiceName() {
    2015             return ReputMessageService.class.getSimpleName();
    2016         }
    2017 
    2018     }
    2019 }
    View Code

      ReputMessageService 服务启动后的执行过程如下所示:

      doReput()方法用于创建索引的入口,通常通过以下几个步骤来创建索引

    第一步:从 CommitLog 中查找未创建索引的消息,将消息组装成 DispatchRequest 对象,该逻辑主要在D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java 中checkMessageAndReturnSize()方法中实现,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreCommitLog.java。

      1     /**
      2      * check the message and returns the message size
      3      *
      4      * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
      5      */
      6     public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
      7         final boolean readBody) {
      8         try {
      9             // 1 TOTAL SIZE
     10             int totalSize = byteBuffer.getInt();
     11 
     12             // 2 MAGIC CODE
     13             int magicCode = byteBuffer.getInt();
     14             switch (magicCode) {
     15                 case MESSAGE_MAGIC_CODE:
     16                     break;
     17                 case BLANK_MAGIC_CODE:
     18                     return new DispatchRequest(0, true /* success */);
     19                 default:
     20                     log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
     21                     return new DispatchRequest(-1, false /* success */);
     22             }
     23 
     24             byte[] bytesContent = new byte[totalSize];
     25 
     26             int bodyCRC = byteBuffer.getInt();
     27 
     28             int queueId = byteBuffer.getInt();
     29 
     30             int flag = byteBuffer.getInt();
     31 
     32             long queueOffset = byteBuffer.getLong();
     33 
     34             long physicOffset = byteBuffer.getLong();
     35 
     36             int sysFlag = byteBuffer.getInt();
     37 
     38             long bornTimeStamp = byteBuffer.getLong();
     39 
     40             ByteBuffer byteBuffer1;
     41             if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
     42                 byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
     43             } else {
     44                 byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
     45             }
     46 
     47             long storeTimestamp = byteBuffer.getLong();
     48 
     49             ByteBuffer byteBuffer2;
     50             if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
     51                 byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
     52             } else {
     53                 byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
     54             }
     55 
     56             int reconsumeTimes = byteBuffer.getInt();
     57 
     58             long preparedTransactionOffset = byteBuffer.getLong();
     59 
     60             int bodyLen = byteBuffer.getInt();
     61             if (bodyLen > 0) {
     62                 if (readBody) {
     63                     byteBuffer.get(bytesContent, 0, bodyLen);
     64 
     65                     if (checkCRC) {
     66                         int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
     67                         if (crc != bodyCRC) {
     68                             log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
     69                             return new DispatchRequest(-1, false/* success */);
     70                         }
     71                     }
     72                 } else {
     73                     byteBuffer.position(byteBuffer.position() + bodyLen);
     74                 }
     75             }
     76 
     77             byte topicLen = byteBuffer.get();
     78             byteBuffer.get(bytesContent, 0, topicLen);
     79             String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
     80 
     81             long tagsCode = 0;
     82             String keys = "";
     83             String uniqKey = null;
     84 
     85             short propertiesLength = byteBuffer.getShort();
     86             Map<String, String> propertiesMap = null;
     87             if (propertiesLength > 0) {
     88                 byteBuffer.get(bytesContent, 0, propertiesLength);
     89                 String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
     90                 propertiesMap = MessageDecoder.string2messageProperties(properties);
     91 
     92                 keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
     93 
     94                 uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
     95 
     96                 String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
     97                 if (tags != null && tags.length() > 0) {
     98                     tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
     99                 }
    100 
    101                 // Timing message processing
    102                 {
    103                     String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    104                     if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
    105                         int delayLevel = Integer.parseInt(t);
    106 
    107                         if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    108                             delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
    109                         }
    110 
    111                         if (delayLevel > 0) {
    112                             tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
    113                                 storeTimestamp);
    114                         }
    115                     }
    116                 }
    117             }
    118 
    119             int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
    120             if (totalSize != readLength) {
    121                 doNothingForDeadCode(reconsumeTimes);
    122                 doNothingForDeadCode(flag);
    123                 doNothingForDeadCode(bornTimeStamp);
    124                 doNothingForDeadCode(byteBuffer1);
    125                 doNothingForDeadCode(byteBuffer2);
    126                 log.error(
    127                     "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
    128                     totalSize, readLength, bodyLen, topicLen, propertiesLength);
    129                 return new DispatchRequest(totalSize, false/* success */);
    130             }
    131 
    132             return new DispatchRequest(
    133                 topic,
    134                 queueId,
    135                 physicOffset,
    136                 totalSize,
    137                 tagsCode,
    138                 storeTimestamp,
    139                 queueOffset,
    140                 keys,
    141                 uniqKey,
    142                 sysFlag,
    143                 preparedTransactionOffset,
    144                 propertiesMap
    145             );
    146         } catch (Exception e) {
    147         }
    148 
    149         return new DispatchRequest(-1, false /* success */);
    150     }

    第二步:调用 doDispathc() 方法,该方法会循环多个索引处理器(这里初始化了D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CommitLogDispatcherBuildConsumeQueue 和 D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.CommitLogDispatcherBuildIndex 两个索引处理器)并调用索引处理器的 dispatch() 方法来处理 DispatchRequest。

      CommitLogDispatcherBuildConsumeQueue 索引处理器用于构建 Consume QueueCommitLogDispatcherBuildIndex 用于构建 Index File

      Consume Queue 是必须创建的,Index File 是否需要创建则是通过设置 messageIndexEnable 为 True 或 False 来实现的,默认为 True。

      Consume Queue 的索引信息被保存到 Page Cache 后,其持久化的过程和 CommitLog 异步刷盘的过程类似,执行 DefaultMessageStore.FlushConsumeQueueService()服务,代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java。

      2.2 索引创建失败怎么办?

      如果消息写入 CommitLog 后 Broker 宕机了,那么 Consume Queue 和 Index File 索引肯定就创建失败了。此时 ReputMessageService 如何保证创建索引的可靠性呢?

      Consume Queue 和 Index File 每次刷盘时都会做 Checkpoint 操作,Broker 每次重启的时候可以根据 Checkpoint信息得知哪些消息还未创建索引。

    三、索引如何使用

      3.1 按照位点查消息

      RocketMQ 支持 Pull 和 Push 两种消费模式,Push 模式是基于 Pull 模式的,两种模式都是通过拉取消息进行消费和提交位点的。

      Broker 在处理客户端拉取消息请求时是怎么查询消息的呢?

      答:代码路径:D: ocketmq-masterstoresrcmainjavaorgapache ocketmqstoreDefaultMessageStore.java 中 第555行的 getMessage() 方法介绍了 Broker的代码实现,如下

      1     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
      2         final int maxMsgNums,
      3         final MessageFilter messageFilter) 
    # getMessage()方法的参数说明:
              group:消费者组名。
              topic:主题的名字,group订阅了 topic 才能拉取消息。
              queueId:一般一个 topic 会有很多分区,客户端轮询全部分区,拉取并消费消息。
              offset:拉取位点大于等于该值的消息。
              maxMsgNums:一次拉取多少消息,在客户端由 pullBatchSize 进行配置。
              messageFilter:消息过滤器。
    {
      4         if (this.shutdown) {
      5             log.warn("message store has shutdown, so getMessage is forbidden");
      6             return null;
      7         }
      8 
      9         if (!this.runningFlags.isReadable()) {
     10             log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
     11             return null;
     12         }
     13 
     14         long beginTime = this.getSystemClock().now();
     15 
     16         GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
     17         long nextBeginOffset = offset;
     18         long minOffset = 0;
     19         long maxOffset = 0;
     20 
     21         GetMessageResult getResult = new GetMessageResult();
     22 
     23         final long maxOffsetPy = this.commitLog.getMaxOffset();
     24 
     25         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
     26         if (consumeQueue != null) {
     27             minOffset = consumeQueue.getMinOffsetInQueue();
     28             maxOffset = consumeQueue.getMaxOffsetInQueue();
     29 
     30             if (maxOffset == 0) {
     31                 status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
     32                 nextBeginOffset = nextOffsetCorrection(offset, 0);
     33             } else if (offset < minOffset) {
     34                 status = GetMessageStatus.OFFSET_TOO_SMALL;
     35                 nextBeginOffset = nextOffsetCorrection(offset, minOffset);
     36             } else if (offset == maxOffset) {
     37                 status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
     38                 nextBeginOffset = nextOffsetCorrection(offset, offset);
     39             } else if (offset > maxOffset) {
     40                 status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
     41                 if (0 == minOffset) {
     42                     nextBeginOffset = nextOffsetCorrection(offset, minOffset);
     43                 } else {
     44                     nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
     45                 }
     46             } else {
     47                 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
     48                 if (bufferConsumeQueue != null) {
     49                     try {
     50                         status = GetMessageStatus.NO_MATCHED_MESSAGE;
     51 
     52                         long nextPhyFileStartOffset = Long.MIN_VALUE;
     53                         long maxPhyOffsetPulling = 0;
     54 
     55                         int i = 0;
     56                         final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
     57                         final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
     58                         ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
     59                         for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
     60                             long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
     61                             int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
     62                             long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
     63 
     64                             maxPhyOffsetPulling = offsetPy;
     65 
     66                             if (nextPhyFileStartOffset != Long.MIN_VALUE) {
     67                                 if (offsetPy < nextPhyFileStartOffset)
     68                                     continue;
     69                             }
     70 
     71                             boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
     72 
     73                             if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
     74                                 isInDisk)) {
     75                                 break;
     76                             }
     77 
     78                             boolean extRet = false, isTagsCodeLegal = true;
     79                             if (consumeQueue.isExtAddr(tagsCode)) {
     80                                 extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
     81                                 if (extRet) {
     82                                     tagsCode = cqExtUnit.getTagsCode();
     83                                 } else {
     84                                     // can't find ext content.Client will filter messages by tag also.
     85                                     log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
     86                                         tagsCode, offsetPy, sizePy, topic, group);
     87                                     isTagsCodeLegal = false;
     88                                 }
     89                             }
     90 
     91                             if (messageFilter != null
     92                                 && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
     93                                 if (getResult.getBufferTotalSize() == 0) {
     94                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
     95                                 }
     96 
     97                                 continue;
     98                             }
     99 
    100                             SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    101                             if (null == selectResult) {
    102                                 if (getResult.getBufferTotalSize() == 0) {
    103                                     status = GetMessageStatus.MESSAGE_WAS_REMOVING;
    104                                 }
    105 
    106                                 nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
    107                                 continue;
    108                             }
    109 
    110                             if (messageFilter != null
    111                                 && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
    112                                 if (getResult.getBufferTotalSize() == 0) {
    113                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
    114                                 }
    115                                 // release...
    116                                 selectResult.release();
    117                                 continue;
    118                             }
    119 
    120                             this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
    121                             getResult.addMessage(selectResult);
    122                             status = GetMessageStatus.FOUND;
    123                             nextPhyFileStartOffset = Long.MIN_VALUE;
    124                         }
    125 
    126                         if (diskFallRecorded) {
    127                             long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
    128                             brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
    129                         }
    130 
    131                         nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    132 
    133                         long diff = maxOffsetPy - maxPhyOffsetPulling;
    134                         long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    135                             * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    136                         getResult.setSuggestPullingFromSlave(diff > memory);
    137                     } finally {
    138 
    139                         bufferConsumeQueue.release();
    140                     }
    141                 } else {
    142                     status = GetMessageStatus.OFFSET_FOUND_NULL;
    143                     nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
    144                     log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
    145                         + maxOffset + ", but access logic queue failed.");
    146                 }
    147             }
    148         } else {
    149             status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
    150             nextBeginOffset = nextOffsetCorrection(offset, 0);
    151         }
    152 
    153         if (GetMessageStatus.FOUND == status) {
    154             this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
    155         } else {
    156             this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
    157         }
    158         long elapsedTime = this.getSystemClock().now() - beginTime;
    159         this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
    160 
    161         getResult.setStatus(status);
    162         getResult.setNextBeginOffset(nextBeginOffset);
    163         getResult.setMaxOffset(maxOffset);
    164         getResult.setMinOffset(minOffset);
    165         return getResult;
    166     }

    getMessage()方法查询消息的过程如下图:

    getMessage() 方法查询消息的过程可以分为以下几个步骤:

      第一步:拉取前校验,校验 DefaultMessageStore 服务是否已经关闭(正常关闭进程会被关闭),校验 DefaultMessageStore 服务是否可读。

      第二步:根据 Topic 和 queueId 查找 ConsumeQueue 索引映射文件。判断根据查找到的 ConsumeQueue 索引文件校验传入的待查询的位点值是否合理,如果不合理,重新计算下一次可以拉去的位点值。

      第三步:循环查询满足 maxMsgNums 条数的消息。循环从 ConsumeQueue 中读取消息物理位点、消息大小和消息 Tag 的 Hash 值。先做 Hash 过滤,再使用过滤后的消息物理位点到 CommitLog 中查找消息体,并放入结果列表中。

      第四步:监控指标统计,返回拉取的消息结果。

      3.2 按照时间段查消息

      输入 Topic、起始时间、结束时间可以查到这段时间内的消息。这是一个 Consume Queue 索引查询消息的扩展查询,具体步骤如下:

      第一步:查找这个 Topic 的所有 Queue。

      第二步:在每一个队列中查询起始时间、结束时间对应的其实 offset 和 最后消息的 offset。

      如何根据时间查找物理位点呢?主要在于构建 Consume Queue,这个文件是按照时间顺序来写的,每条消息的索引数据结构大小是固定20字节。可以根据时间做二分折半搜索,找到与时间最接近的一个位点。

      第三步:根据起始点、最后消息位点和 Topic,循环拉取所有 Queue 就可以拉取到消息。

      3.3 按照key查询消息

      如果通过设置 messageIndexEnable=True(默认是True)来开启Index索引服务,那么在写入消息时会根据 key 自动构建 Index File 索引。用户可以通过 Topic 和 key 查询消息,查询方法为 org.apache.rocketmq.store.ConsumeQueue.queryMessage(String Topic, String key, int maxNum, long begin, long end)。

  • 相关阅读:
    Hadoop集群VSFTP和SecureCRT安装配置
    Hadoop集群完全分布式坏境搭建
    Hadoop集群坏境搭建配置
    Hadoop集群坏境CentOS安装
    Pr视频剪辑基础技巧学习
    SAN和NAS之间的基本区别
    原始容量、可用容量和有效容量的区别
    解释一下什么是网盘与云盘
    纠错技术之FEC(向前纠错)
    分布式存储的冗余类型(N+2:1)
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14464317.html
Copyright © 2011-2022 走看看