zoukankan      html  css  js  c++  java
  • Solr4.8.0源码分析(3)之index的线程池管理

    Solr4.8.0源码分析(3)之index的线程池管理

    Solr建索引时候是有最大的线程数限制的,它由solrconfig.xml的<maxIndexingThreads>8</maxIndexingThreads>控制的,该值等于8就是说Solr最多只能用8个线程来进行updatedocument。

    那么Solr建索引时候是怎么管理线程池的呢,主要是通过ThreadAffinityDocumentsWriterThreadPool来进行管理的,它继承了DocumentsWriterPerThreadPool类。ThreadAffinityDocumentsWriterThreadPool的结构并不复杂,主要的一个函数是getAndLock()。

    在建索引时候即updatedocuments时候,Solr先要调用getAndLock去获取ThreadState这个锁。而ThreadState这个锁就是存放在ThreadAffinityDocumentsWriterThreadPool的threadBings这个线程池里面。

    首先先看下什么是ThreadState锁,源码如下:

    ThreadState是DocumentsWriterPerThreadPool的一个内部类。它包含了一个DocumentsWriterPerThread类的实例以及状态控制,DocumentsWriterPerThread是线程池的一个线程,主要作用是索引的建立。该类比较简单就不详细介绍了。

     1  /**
     2    * {@link ThreadState} references and guards a
     3    * {@link DocumentsWriterPerThread} instance that is used during indexing to
     4    * build a in-memory index segment. {@link ThreadState} also holds all flush
     5    * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
     6    * <p>
     7    * A {@link ThreadState}, its methods and members should only accessed by one
     8    * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
     9    * and release the lock in a finally block via {@link ThreadState#unlock()}
    10    * before accessing the state.
    11    */
    12   @SuppressWarnings("serial")
    13   final static class ThreadState extends ReentrantLock {
    14     DocumentsWriterPerThread dwpt;
    15     // TODO this should really be part of DocumentsWriterFlushControl
    16     // write access guarded by DocumentsWriterFlushControl
    17     volatile boolean flushPending = false;
    18     // TODO this should really be part of DocumentsWriterFlushControl
    19     // write access guarded by DocumentsWriterFlushControl
    20     long bytesUsed = 0;
    21     // guarded by Reentrant lock
    22     private boolean isActive = true;
    23 
    24     ThreadState(DocumentsWriterPerThread dpwt) {
    25       this.dwpt = dpwt;
    26     }
    27     
    28     /**
    29      * Resets the internal {@link DocumentsWriterPerThread} with the given one. 
    30      * if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
    31      * for indexing anymore.
    32      * @see #isActive()  
    33      */
    34   
    35     private void deactivate() {
    36       assert this.isHeldByCurrentThread();
    37       isActive = false;
    38       reset();
    39     }
    40     
    41     private void reset() {
    42       assert this.isHeldByCurrentThread();
    43       this.dwpt = null;
    44       this.bytesUsed = 0;
    45       this.flushPending = false;
    46     }
    47     
    48     /**
    49      * Returns <code>true</code> if this ThreadState is still open. This will
    50      * only return <code>false</code> iff the DW has been closed and this
    51      * ThreadState is already checked out for flush.
    52      */
    53     boolean isActive() {
    54       assert this.isHeldByCurrentThread();
    55       return isActive;
    56     }
    57     
    58     boolean isInitialized() {
    59       assert this.isHeldByCurrentThread();
    60       return isActive() && dwpt != null;
    61     }
    62     
    63     /**
    64      * Returns the number of currently active bytes in this ThreadState's
    65      * {@link DocumentsWriterPerThread}
    66      */
    67     public long getBytesUsedPerThread() {
    68       assert this.isHeldByCurrentThread();
    69       // public for FlushPolicy
    70       return bytesUsed;
    71     }
    72     
    73     /**
    74      * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
    75      */
    76     public DocumentsWriterPerThread getDocumentsWriterPerThread() {
    77       assert this.isHeldByCurrentThread();
    78       // public for FlushPolicy
    79       return dwpt;
    80     }
    81     
    82     /**
    83      * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
    84      * pending otherwise <code>false</code>
    85      */
    86     public boolean isFlushPending() {
    87       return flushPending;
    88     }
    89   }
     1 /**
     2  * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
     3  * indexing thread to the same {@link ThreadState} each time the thread tries to
     4  * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
     5  * associated with the creating thread. Subsequently, if the threads associated
     6  * {@link ThreadState} is not in use it will be associated with the requesting
     7  * thread. Otherwise, if the {@link ThreadState} is used by another thread
     8  * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
     9  * minimal contended {@link ThreadState}.
    10  */
    11 class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
    12   private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<>();
    13   
    14   /**
    15    * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
    16    */
    17   public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
    18     super(maxNumPerThreads);
    19     assert getMaxThreadStates() >= 1;
    20   }
    21 
    22   @Override
    23   public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
    24     ThreadState threadState = threadBindings.get(requestingThread);
    25     if (threadState != null && threadState.tryLock()) {
    26       return threadState;
    27     }
    28     ThreadState minThreadState = null;
    29 
    30     
    31     /* TODO -- another thread could lock the minThreadState we just got while 
    32      we should somehow prevent this. */
    33     // Find the state that has minimum number of threads waiting
    34     minThreadState = minContendedThreadState();
    35     if (minThreadState == null || minThreadState.hasQueuedThreads()) {
    36       final ThreadState newState = newThreadState(); // state is already locked if non-null
    37       if (newState != null) {
    38         assert newState.isHeldByCurrentThread();
    39         threadBindings.put(requestingThread, newState);
    40         return newState;
    41       } else if (minThreadState == null) {
    42         /*
    43          * no new threadState available we just take the minContented one
    44          * This must return a valid thread state since we accessed the 
    45          * synced context in newThreadState() above.
    46          */
    47         minThreadState = minContendedThreadState();
    48       }
    49     }
    50     assert minThreadState != null: "ThreadState is null";
    51     
    52     minThreadState.lock();
    53     return minThreadState;
    54   }
    55 
    56   @Override
    57   public ThreadAffinityDocumentsWriterThreadPool clone() {
    58     ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool) super.clone();
    59     clone.threadBindings = new ConcurrentHashMap<>();
    60     return clone;
    61   }
    62 }

    再回到ThreadAffinityDocumentWriterThreadPool类。getAndLock的主要流程如下:

    1. 请求线程requestingThread需要进行updatedocument操作,它首先会尝试从线程池threadBings获取自身线程的ThreadState锁并尝试去锁它即trylock。如果锁成功了,那么它就能再度获取到自身线程的ThreadState,这是最好的一种情况。

    2. 如果自身线程的trylock失败,说明该ThreadState已经被别的requestingThread线程抢去,那么请求线程requestingThread只能去线程池threadBings获取别的线程。获取的规则是minContendedThreadState(),源码如下所示.

    minContendedThreadState的规则就是遍历所有活跃的ThreadState,如果ThreadState的队列内元素个数最少(即等待这个ThreadState的线程最少),那么这个ThreadState就是返回的那个ThreadState,即minThreadState.

     1   /**
     2    * Returns the ThreadState with the minimum estimated number of threads
     3    * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
     4    * is yet visible to the calling thread.
     5    */
     6   ThreadState minContendedThreadState() {
     7     ThreadState minThreadState = null;
     8     final int limit = numThreadStatesActive;
     9     for (int i = 0; i < limit; i++) {
    10       final ThreadState state = threadStates[i];
    11       if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
    12         minThreadState = state;
    13       }
    14     }
    15     return minThreadState;
    16   }

    3. 如果minThreadState==null(一般是第一个获取ThreadState这种情况)或者minThreadState有其他线程在等待(正常情况下都会有线程在等的),那么requestingThread会去申请新的ThreadState,即从maxIndexingThreads的线程里申请,源码如下。

    threadStates是一个ThreadStates的数组,当需要threadBings的ThreadState个数(也就是活跃的线程)小于threadStates的元素个数(maxIndexingThreads)时就能申请到新的ThreadState。

     1   /**
     2    * Returns a new {@link ThreadState} iff any new state is available otherwise
     3    * <code>null</code>.
     4    * <p>
     5    * NOTE: the returned {@link ThreadState} is already locked iff non-
     6    * <code>null</code>.
     7    * 
     8    * @return a new {@link ThreadState} iff any new state is available otherwise
     9    *         <code>null</code>
    10    */
    11   synchronized ThreadState newThreadState() {
    12     if (numThreadStatesActive < threadStates.length) {
    13       final ThreadState threadState = threadStates[numThreadStatesActive];
    14       threadState.lock(); // lock so nobody else will get this ThreadState
    15       boolean unlock = true;
    16       try {
    17         if (threadState.isActive()) {
    18           // unreleased thread states are deactivated during DW#close()
    19           numThreadStatesActive++; // increment will publish the ThreadState
    20           assert threadState.dwpt == null;
    21           unlock = false;
    22           return threadState;
    23         }
    24         // unlock since the threadstate is not active anymore - we are closed!
    25         assert assertUnreleasedThreadStatesInactive();
    26         return null;
    27       } finally {
    28         if (unlock) {
    29           // in any case make sure we unlock if we fail 
    30           threadState.unlock();
    31         }
    32       }
    33     }
    34     return null;
    35   }

    4. 如果minContentedThreadState获取成功,那么threadBings的线程池就会得到更新。如果minContentedThreadState获取失败,那么说明threadStates数组以及分配完全,那么请求线程会再去取获取minContentedThreadState。

    5. 最后请求线程会去lock minThreadState,如果lock失败就进入休眠,一直等到lock成功。这是最不好的一种结果。

    最后在源码说道,请求线程在获取minThreadState时候别的线程也有可能获取到该minThreadState,目前来说这是一种缺陷。

    <maxIndexingThreads>8</maxIndexingThreads>这个配置对建索引的性能有较大影响,如果太小那么建索引时候等待情况就会较多。如果太大又增加服务器的负荷,所以要综合选择。

    转载请注明地址http://www.cnblogs.com/rcfeng/
  • 相关阅读:
    d is undefined错误
    $ is not defined错误类型
    jsonp从服务器读取数据并且予以显示
    jquery来跨域提交表单
    json和jsonp的使用格式
    Compaction介绍
    mysql操作
    DNS安装配置
    FLUSH TABLES WITH READ LOCK 和 LOCK TABLES 之种种
    执行安装redis报错undefined reference to `__sync_add_and_fetch_4'
  • 原文地址:https://www.cnblogs.com/rcfeng/p/3915797.html
Copyright © 2011-2022 走看看