zoukankan      html  css  js  c++  java
  • Solr4.8.0源码分析(22)之SolrCloud的Recovery策略(三)



    一. Replication策略介绍


    • 开始Replication的时候,首先进行一次commitOnLeader操作,即发送commit命令到leader。它的作用就是将leader的update中的数据刷入到索引文件中,使得快照snap完整。
     1   private void commitOnLeader(String leaderUrl) throws SolrServerException,
     2       IOException {
     3     HttpSolrServer server = new HttpSolrServer(leaderUrl);
     4     try {
     5       server.setConnectionTimeout(30000);
     6       UpdateRequest ureq = new UpdateRequest();
     7       ureq.setParams(new ModifiableSolrParams());
     8       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
     9       ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
    10       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
    11           server);
    12     } finally {
    13       server.shutdown();
    14     }
    15   }
    • 获取leader的lastVersion与lastGeneration,同本分片的进行比较来确定是否需要进行同步。
     1       //get the current 'replicateable' index version in the master
     2       NamedList response = null;
     3       try {
     4         response = getLatestVersion();
     5       } catch (Exception e) {
     6         LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
     7         return false;
     8       }
     9       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
    10       long latestGeneration = (Long) response.get(GENERATION);
    • 检查本分片是否打开IndexWriter,如果没有则Recovery失败。这是因为没有打开indexWriter就无法获取索引的generation以及version信息,replication无法进行下去。
     1       // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
     2       IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
     3       if (commit == null) {
     4         // Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't been updated with commit points
     5         RefCounted<SolrIndexSearcher> searcherRefCounted = null;
     6         try {
     7           searcherRefCounted = core.getNewestSearcher(false);
     8           if (searcherRefCounted == null) {
     9             LOG.warn("No open searcher found - fetch aborted");
    10             return false;
    11           }
    12           commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
    13         } finally {
    14           if (searcherRefCounted != null)
    15             searcherRefCounted.decref();
    16         }
    17       }
    • 如果获取的leader的lastestVersion为0,则表示leader没有索引数据,那么根本就不需要进行replication。所以返回true结果。
     1       if (latestVersion == 0L) {
     2         if (forceReplication && commit.getGeneration() != 0) {
     3           // since we won't get the files for an empty index,
     4           // we just clear ours and commit
     5           RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
     6           try {
     7             iw.get().deleteAll();
     8           } finally {
     9             iw.decref();
    10           }
    11           SolrQueryRequest req = new LocalSolrQueryRequest(core,
    12               new ModifiableSolrParams());
    13           core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
    14         }
    16         //there is nothing to be replicated
    17         successfulInstall = true;
    18         return true;
    19       }
    • 我们还需要通过比较分片的lastestVersion和leader的lastestVersion来确定是否需要继续进行replication,因为两者相等同样没必要进行replication,除非进行的时forceReplication
    1       if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
    2         //master and slave are already in sync just return
    3         LOG.info("Slave in sync with master.");
    4         successfulInstall = true;
    5         return true;
    6       }
    • 获取leader节点的lastestGeneration的索引文件列表以及相关文件信息,以及配置文件列表以及信息。如果文件列表为空,则退出replication。
     1       // get the list of files first
     2       fetchFileList(latestGeneration);
     3       // this can happen if the commit point is deleted before we fetch the file list.
     4       if(filesToDownload.isEmpty()) return false;
     6  private void fetchFileList(long gen) throws IOException {
     7     ModifiableSolrParams params = new ModifiableSolrParams();
     8     params.set(COMMAND,  CMD_GET_FILE_LIST);
     9     params.set(GENERATION, String.valueOf(gen));
    10     params.set(CommonParams.WT, "javabin");
    11     params.set(CommonParams.QT, "/replication");
    12     QueryRequest req = new QueryRequest(params);
    13     HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient);  //XXX modify to use shardhandler
    14     try {
    15       server.setSoTimeout(60000);
    16       server.setConnectionTimeout(15000);
    17       NamedList response = server.request(req);
    19       List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
    20       if (files != null)
    21         filesToDownload = Collections.synchronizedList(files);
    22       else {
    23         filesToDownload = Collections.emptyList();
    24         LOG.error("No files to download for index generation: "+ gen);
    25       }
    27       files = (List<Map<String,Object>>) response.get(CONF_FILES);
    28       if (files != null)
    29         confFilesToDownload = Collections.synchronizedList(files);
    31     } catch (SolrServerException e) {
    32       throw new IOException(e);
    33     } finally {
    34       server.shutdown();
    35     }
    36   }
    • 建立临时的index目录来存放同步过来的数据,临时index目录的格式是index.timestamp。它存放在data目录下。
    1 String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
    2       tmpIndex = createTempindexDir(core, tmpIdxDirName);
    4       tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
    6       // cindex dir...
    7       indexDirPath = core.getIndexDir();
    8       indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
    • 判断isFullCopyNeeded是否为true来决定是否需要关闭IndexWriter。如果本分片(slave)的数据的version或者generation新于master(leader)或者是forceReplication,那么必须进行所有数据的完整同步。
     1 // if the generation of master is older than that of the slave , it means they are not compatible to be copied
     2       // then a new index directory to be created and all the files need to be copied
     3       boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
     4           .getCommitTimestamp(commit) >= latestVersion
     5           || commit.getGeneration() >= latestGeneration || forceReplication;
     7         if (isIndexStale(indexDir)) {
     8           isFullCopyNeeded = true;
     9         }
    11         if (!isFullCopyNeeded) {
    12           // rollback - and do it before we download any files
    13           // so we don't remove files we thought we didn't need
    14           // to download later
    15           solrCore.getUpdateHandler().getSolrCoreState()
    16           .closeIndexWriter(core, true);
    17         }
    • 现在才开始真正的下载不同的索引文件,Replication是根据索引文件的大小来判断是否发生过变化.下载文件时候,Replication是以packet的大小为单位进行下载的,这可以在SolrConfig.xml中设置,下一篇文章将具体介绍这个。
     1   private void downloadIndexFiles(boolean downloadCompleteIndex,
     2       Directory indexDir, Directory tmpIndexDir, long latestGeneration)
     3       throws Exception {
     4     if (LOG.isDebugEnabled()) {
     5       LOG.debug("Download files to dir: " + Arrays.asList(indexDir.listAll()));
     6     }
     7     for (Map<String,Object> file : filesToDownload) {
     8       if (!slowFileExists(indexDir, (String) file.get(NAME))
     9           || downloadCompleteIndex) {
    10         dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
    11             (String) file.get(NAME), false, latestGeneration);
    12         currentFile = file;
    13         dirFileFetcher.fetchFile();
    14         filesDownloaded.add(new HashMap<>(file));
    15       } else {
    16         LOG.info("Skipping download for " + file.get(NAME)
    17             + " because it already exists");
    18       }
    19     }
    20   }
    22  /**
    23      * The main method which downloads file
    24      */
    25     void fetchFile() throws Exception {
    26       try {
    27         while (true) {
    28           final FastInputStream is = getStream();
    29           int result;
    30           try {
    31             //fetch packets one by one in a single request
    32             result = fetchPackets(is);
    33             if (result == 0 || result == NO_CONTENT) {
    35               return;
    36             }
    37             //if there is an error continue. But continue from the point where it got broken
    38           } finally {
    39             IOUtils.closeQuietly(is);
    40           }
    41         }
    42       } finally {
    43         cleanup();
    44         //if cleanup suceeds . The file is downloaded fully. do an fsync
    45         fsyncService.submit(new Runnable(){
    46           @Override
    47           public void run() {
    48             try {
    49               copy2Dir.sync(Collections.singleton(saveAs));
    50             } catch (IOException e) {
    51               fsyncException = e;
    52             }
    53           }
    54         });
    55       }
    56     }
    • 到这里已经完成了索引文件的同步,但是整一个同步过程才进行了一半。接下来要获取已经发生过修改的配置文件,如果没有修改过的配置文件则不需要下载配置文件。而比较配置文件是否发生变化则是比较文件的checksum信息。下载配置文件的过程downloadConfFiles()与下载索引文件的过程类似,就不具体介绍了。
     1     //get the details of the local conf files with the same alias/name
     2     List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache);
     3     //compare their size/checksum to see if
     4     for (Map<String, Object> fileInfo : localFilesInfo) {
     5       String name = (String) fileInfo.get(NAME);
     6       Map<String, Object> m = nameVsFile.get(name);
     7       if (m == null) continue; // the file is not even present locally (so must be downloaded)
     8       if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
     9         nameVsFile.remove(name); //checksums are same so the file need not be downloaded
    10       }
    11     }
     1   private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
     2     LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
     3     confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
     4     File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
     5     try {
     6       boolean status = tmpconfDir.mkdirs();
     7       if (!status) {
     8         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
     9                 "Failed to create temporary config folder: " + tmpconfDir.getName());
    10       }
    11       for (Map<String, Object> file : confFilesToDownload) {
    12         String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
    13         localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
    14         currentFile = file;
    15         localFileFetcher.fetchFile();
    16         confFilesDownloaded.add(new HashMap<>(file));
    17       }
    18       // this is called before copying the files to the original conf dir
    19       // so that if there is an exception avoid corrupting the original files.
    20       terminateAndWaitFsyncService();
    21       copyTmpConfFiles2Conf(tmpconfDir);
    22     } finally {
    23       delTree(tmpconfDir);
    24     }
    25   }
    • 下载完索引数据以及配置文件后,现在需要处理临时的索引数据了。不同于索引文件的下载,配置文件在下载的过程中就已经替换了原先的配置文件了,这是在copyTmpConfFiles2Conf过程中。而索引数据的替换则需要根据isFullCopyNeeded这个参数,如果该值为true,则临时的索引文件将全部替换旧的索引文件,否则只是部分的文件的替换,他们的实现分别为:modifyIndexProps和moveIndexFiles。
    1             if (isFullCopyNeeded) {
    2               successfulInstall = modifyIndexProps(tmpIdxDirName);
    3               deleteTmpIdxDir = false;
    4             } else {
    5               successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
    6             }

          接下来要重点介绍下modifyIndexProps和moveIndexFiles的实现。前文讲到,同步索引文件时候,下载下来的数据会存放在data目录下,以index. 加上同步开始时间的时间戳结构的目录下。当下载数据完成后,Replication会在同级目录下新建index.property文件。该文件内只会放入一句内容,index= index.2014XXXXXXXXXX,这样做的目的就是将索引目录index重定向到index.2014XXXXXXXXXX上,这个时候相当于index.2014XXXXXXXXXX成为了index目录。然后就可以删除原来的index目录了。


    • 将Replication的统计信息存于Replication.properties文件当中。统计信息较多,这里就不介绍了。
    • 如果配置文件发生变化,需要进行reloadcore操作才能使得配置生效。
    • 最后进行一次openNewSearcherAndUpdateCommitPoint,重新打开searcher以及更新commit信息。



    1. 因为一旦有新数据写入旧索引文件中,索引文件发送变化了,那么下载好后的数据(索引文件)就不好替换旧的索引文件。

    2. 在同步过程中,如果isFullCopyNeeded是false,那么就会close indexwriter,既然关闭了indexwriter就无法写入新的数据。而如果isFullCopyNeeded是true的话,因为整个index目录替换,所以是否关闭indexeriter也没啥意义。

    3. 在recovery过程中,当同步replication结束后,会进行replay过程,该过程就是将ulog中的请求重新进行一遍。



     2       if (indexWriter != null) {
     3         if (!rollback) {
     4           try {
     5             log.info("Closing old IndexWriter... core=" + coreName);
     6             indexWriter.close();
     7           } catch (Exception e) {
     8             SolrException.log(log, "Error closing old IndexWriter. core="
     9                 + coreName, e);
    10           }
    11         } else {
    12           try {
    13             log.info("Rollback old IndexWriter... core=" + coreName);
    14             indexWriter.rollback();
    15           } catch (Exception e) {
    16             SolrException.log(log, "Error rolling back old IndexWriter. core="
    17                 + coreName, e);
    18           }
    19         }
    20       }


     1       try {
     2         if (indexWriter != null) {
     3           if (!rollback) {
     4             try {
     5               log.info("Closing old IndexWriter... core=" + coreName);
     6               indexWriter.close();
     7             } catch (Exception e) {
     8               SolrException.log(log, "Error closing old IndexWriter. core="
     9                   + coreName, e);
    10             }
    11           } else {
    12             try {
    13               log.info("Rollback old IndexWriter... core=" + coreName);
    14               indexWriter.rollback();
    15             } catch (Exception e) {
    16               SolrException.log(log, "Error rolling back old IndexWriter. core="
    17                   + coreName, e);
    18             }
    19           }
    20         }
    21         indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
    22         log.info("New IndexWriter is ready to be used.");
    23         // we need to null this so it picks up the new writer next get call
    24         refCntWriter = null;
    25       } finally {
    27         pauseWriter = false;
    28         writerPauseLock.notifyAll();
    29       }

    二. Replay过程


    • LogReplayer是以transactionlog为单位的。
    1         for(;;) {
    2           TransactionLog translog = translogs.pollFirst();
    3           if (translog == null) break;
    4           doReplay(translog);
    5         }
    • doReplay会重新获取索引链,读取transctionlog的update命令然后重新走一遍索引链三步骤,这些内容在<Solr4.8.0源码分析(14)之SolrCloud索引深入(1)>已经介绍过了,这里就不再介绍了。需要指出的是在进行doReplay时候会设置updatecmd为replay,而一旦cmd=UpdateCmd.Replay,因为无法获取到nodes所以不会分发给其他分片包括leader,所以doReplay只会对本分片有效,且不会记录ulog中。
    1         tlogReader = translog.getReader(recoveryInfo.positionOfStart);
    3         // NOTE: we don't currently handle a core reload during recovery.  This would cause the core
    4         // to change underneath us.
    6         UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null);
    7         UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
          if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
            isLeader = false;     // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
            forwardToLeader = false;
            return nodes;
    • LogReplayer主要用于applyBufferedUpdates(replication策略中)以及recoverFromLog(单机模式下的recovery,即从ulog进行recovery)。



  • 相关阅读:
    实验五 Web项目开发
    实验三 一个标准的Windows应用程序
  • 原文地址:https://www.cnblogs.com/rcfeng/p/4148733.html
Copyright © 2011-2022 走看看