zoukankan      html  css  js  c++  java
  • Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四)

    Solr4.8.0源码分析(23)之SolrCloud的Recovery策略(四) 

    题记:本来计划的SolrCloud的Recovery策略的文章是3篇的,但是没想到Recovery的内容蛮多的,前面三章分别介绍了Recovery的原理和总体流程,PeerSync策略,Replication策略。本章主要介绍我在实际生产环境中碰到的recovery的几个问题,以及前面漏下的几个点。

    一. 日志中多次出现"Stopping recovery for zkNodeName= ..."

            我在公司的生产环境中总是会看到连续多次出现 " WARN : Stopping recovery for zkNodeName= ..." 或者 "INFO : Starting recovery process.  core=..." 这样的日志(由于公司的东西无法拿出了,所以只能意会下日志了)。

            这种现象的原因是因为:前文讲到过出现Recovery的原因之一是Leader转发update request到replica后没有接收到replica的表示成功的返回,那么这是Leader会发送RequestRecovery request给replia,命令它进行recovery。这是一次转发失败的过程。而每当Solr出现Leader转发update失败时候往往不会只出现一次,所以Leader会发送多次RequestRecovery request给replia。

            Relica的Recovery过程起始于DefaultSolrCoreState类的doRecovery()函数,在进行doRecovery()时候Replica会取消之前的Recovery。所以出现上述现象的根本原因就在于cancelRecovery上。需要指出的是DefaultSolrCoreState类的doRecovery()函数不但在RequestRecovery请求后会被调用,在leader 选举失败的时候也会被掉用。

     1   @Override
     2   public void cancelRecovery() {
     3     synchronized (recoveryLock) {
     4       if (recoveryStrat != null && recoveryRunning) {
     5         recoveryStrat.close();
     6         while (true) {
     7           try {
     8             recoveryStrat.join();
     9           } catch (InterruptedException e) {
    10             // not interruptible - keep waiting
    11             continue;
    12           }
    13           break;
    14         }
    15         
    16         recoveryRunning = false;
    17         recoveryLock.notifyAll();
    18       }
    19     }
    20   }
     1   @Override
     2   public void close() {
     3     close = true;
     4     try {
     5       prevSendPreRecoveryHttpUriRequest.abort();
     6     } catch (NullPointerException e) {
     7       // okay
     8     }
     9     log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName);
    10   }

    二. Recovery过程中的rollback

          之前有@从前 网友给我留言说出现了"持续向solrcloud提交数据的同时调用了optimize 方法。导致索引文件同步失败,就一直无法recovery。"的现象。造成这个现象的原因大致由以下两点:

    • optimize的操作的本质是Merge策略中的forceMerge,默认情况下一旦触发了forceMerge,那么Solr会把所有的Segment合并成一个Segment。可以想象下,几十甚至几百GB的数据合成一个Segment,这样的符合会有多大?而且这还不算,一旦触发了forceMerge,如果有实时数据进来,那么它会把新进来的数据也merge进去,也就是说会一直merge进去根本不会停下来。关于forceMerge的具体情况,将在接下来介绍Merge的文章中详述。
    • Replication策略介绍的时候提到,如果isFullCopyNeeded为false,那么Solr就会调用closeIndexWriter.
    1         if (!isFullCopyNeeded) {
    2           // rollback - and do it before we download any files
    3           // so we don't remove files we thought we didn't need
    4           // to download later
    5           solrCore.getUpdateHandler().getSolrCoreState()
    6           .closeIndexWriter(core, true);
    7         }

             我们很容会忽视closeIndexWriter传入的true参数,如果传入的为true,表示Solr关闭IndexWriter时候会进行回滚rollback,它的作用就是将IndexWriter退回到上次commit之后的状态,清空上次commit之后的所有add进来的数据。

     1       if (indexWriter != null) {
     2         if (!rollback) {
     3           try {
     4             log.info("Closing old IndexWriter... core=" + coreName);
     5             indexWriter.close();
     6           } catch (Exception e) {
     7             SolrException.log(log, "Error closing old IndexWriter. core="
     8                 + coreName, e);
     9           }
    10         } else {
    11           try {
    12             log.info("Rollback old IndexWriter... core=" + coreName);
    13             indexWriter.rollback();
    14           } catch (Exception e) {
    15             SolrException.log(log, "Error rolling back old IndexWriter. core="
    16                 + coreName, e);
    17           }
    18         }
    19       }

            那么问题就出在rollback中,Lucene的IndexWriter在进行回滚的时候会尝试去关闭正在进行的mergePolicy和mergeScheduler,如果发现还有segment正在进行那么它会一直等待,所以当optimize(forceMerge)进行时且有实时数据进来,那么Recovery就会一直停在那里直到超时。

     1 /** Wait for any running merge threads to finish. This call is not interruptible as used by {@link #close()}. */
     2   public void sync() {
     3     boolean interrupted = false;
     4     try {
     5       while (true) {
     6         MergeThread toSync = null;
     7         synchronized (this) {
     8           for (MergeThread t : mergeThreads) {
     9             if (t.isAlive()) {
    10               toSync = t;
    11               break;
    12             }
    13           }
    14         }
    15         if (toSync != null) {
    16           try {
    17             toSync.join();
    18           } catch (InterruptedException ie) {
    19             // ignore this Exception, we will retry until all threads are dead
    20             interrupted = true;
    21           }
    22         } else {
    23           break;
    24         }
    25       }
    26     } finally {
    27       // finally, restore interrupt status:
    28       if (interrupted) Thread.currentThread().interrupt();
    29     }
    30   }

            所以解决的方法有两个:

    • optimize时候保证没有实时数据进来。
    • 修改forceMerge的策略,只对启动forceMerge时候的Segment进行合并,之后的Segment选择无视(我司采用的策略)。

    三. Recovery触发的三个地方

          触发Recovery有三个地方,也就是上文中doRecovery()被调用的三个地方:

    • 之前一直在讲的RequestRecovery请求
     1 protected void handleRequestRecoveryAction(SolrQueryRequest req,
     2       SolrQueryResponse rsp) throws IOException {
     3     final SolrParams params = req.getParams();
     4     log.info("It has been requested that we recover");
     5     Thread thread = new Thread() {
     6       @Override
     7       public void run() {
     8         String cname = params.get(CoreAdminParams.CORE);
     9         if (cname == null) {
    10           cname = "";
    11         }
    12         try (SolrCore core = coreContainer.getCore(cname)) {
    13 
    14           if (core != null) {
    15             // try to publish as recovering right away
    16             try {
    17               coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
    18             }  catch (InterruptedException e) {
    19               Thread.currentThread().interrupt();
    20               SolrException.log(log, "", e);
    21             } catch (Throwable e) {
    22               SolrException.log(log, "", e);
    23               if (e instanceof Error) {
    24                 throw (Error) e;
    25               }
    26             }
    27             
    28             core.getUpdateHandler().getSolrCoreState().doRecovery(coreContainer, core.getCoreDescriptor());
    29           } else {
    30             SolrException.log(log, "Could not find core to call recovery:" + cname);
    31           }
    32         }
    33       }
    34     };
    35     
    36     thread.start();
    37   }
    • 当Leader选举失败的时候,它会先进行recovery,然后再重新加入选举。
     1   private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
     2       throws InterruptedException, KeeperException, IOException {
     3     // remove our ephemeral and re join the election
     4     if (cc.isShutDown()) {
     5       log.info("Not rejoining election because CoreContainer is shutdown");
     6       return;
     7     }
     8     
     9     log.info("There may be a better leader candidate than us - going back into recovery");
    10     
    11     cancelElection();
    12     
    13     core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
    14     
    15     leaderElector.joinElection(this, true);
    16   }
    • Register 注册shard的时候,会去检测shard是否处于recovery状态。如果满足recovery条件就会触发recovery。
     1   /**
     2    * Returns whether or not a recovery was started
     3    */
     4   private boolean checkRecovery(String coreName, final CoreDescriptor desc,
     5       boolean recoverReloadedCores, final boolean isLeader,
     6       final CloudDescriptor cloudDesc, final String collection,
     7       final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
     8       SolrCore core, CoreContainer cc) {
     9     if (SKIP_AUTO_RECOVERY) {
    10       log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
    11       return false;
    12     }
    13     boolean doRecovery = true;
    14     if (!isLeader) {
    15       
    16       if (core.isReloaded() && !recoverReloadedCores) {
    17         doRecovery = false;
    18       }
    19       
    20       if (doRecovery) {
    21         log.info("Core needs to recover:" + core.getName());
    22         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
    23         return true;
    24       }
    25     } else {
    26       log.info("I am the leader, no recovery necessary");
    27     }
    28     
    29     return false;
    30   }

     

    四. recoverFromLog

            之前写到Recovery过程中在Replicate之后都进行一次applyBufferedUpdates来实现doplay以获取UpdateLog内保存的request。那么除了applyBufferedUpdates还有一种方式recoverFromLog来获取UpdateLog内保存的request。它跟applyBufferedUpdates不同之处在于,它主要用于单机的Solr模式下。当创建core的时候就会触发:

     1   /**
     2    * Creates a new core based on a descriptor but does not register it.
     3    *
     4    * @param dcore a core descriptor
     5    * @return the newly created core
     6    */
     7   public SolrCore create(CoreDescriptor dcore) {
     8 
     9     if (isShutDown) {
    10       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Solr has shutdown.");
    11     }
    12 
    13     try {
    14 
    15       ConfigSet coreConfig = coreConfigService.getConfig(dcore);
    16       log.info("Creating SolrCore '{}' using configuration from {}", dcore.getName(), coreConfig.getName());
    17       SolrCore core = new SolrCore(dcore, coreConfig);
    18       solrCores.addCreated(core);
    19 
    20       // always kick off recovery if we are in non-Cloud mode
    21       if (!isZooKeeperAware() && core.getUpdateHandler().getUpdateLog() != null) {
    22         core.getUpdateHandler().getUpdateLog().recoverFromLog();
    23       }
    24 
    25       return core;
    26 
    27     }
    28     catch (Exception e) {
    29       throw recordAndThrow(dcore.getName(), "Unable to create core: " + dcore.getName(), e);
    30     }
    31 
    32   }

    总结:

         本节列举了几个Recovery过程中遇到的问题,以及补充说明了之前漏下的内容。下文会介绍Recovery系列的最后一文,Replication主从模式的配置。

  • 相关阅读:
    简明Python3教程 12.问题解决
    简明Python3教程 11.数据结构
    【SPOJ 694】Distinct Substrings
    【codeforces Manthan, Codefest 17 C】Helga Hufflepuff's Cup
    【CF Manthan, Codefest 17 B】Marvolo Gaunt's Ring
    【CF Manthan, Codefest 17 A】Tom Riddle's Diary
    【SPOJ 220】 PHRASES
    【POJ 3261】Milk Patterns
    【POJ 3294】Life Forms
    【POJ 1226】Substrings
  • 原文地址:https://www.cnblogs.com/rcfeng/p/4152183.html
Copyright © 2011-2022 走看看