Solr4.8.0源码分析(3)之index的线程池管理
Solr建索引时候是有最大的线程数限制的,它由solrconfig.xml的
那么Solr建索引时候是怎么管理线程池的呢,主要是通过ThreadAffinityDocumentsWriterThreadPool来进行管理的,它继承了DocumentsWriterPerThreadPool类。ThreadAffinityDocumentsWriterThreadPool的结构并不复杂,主要的一个函数是getAndLock()。
在建索引时候即updatedocuments时候,Solr先要调用getAndLock去获取ThreadState这个锁。而ThreadState这个锁就是存放在ThreadAffinityDocumentsWriterThreadPool的threadBings这个线程池里面。
首先先看下什么是ThreadState锁,源码如下:
ThreadState是DocumentsWriterPerThreadPool的一个内部类。它包含了一个DocumentsWriterPerThread类的实例以及状态控制,DocumentsWriterPerThread是线程池的一个线程,主要作用是索引的建立。该类比较简单就不详细介绍了。
 1  /**
 2    * {@link ThreadState} references and guards a
 3    * {@link DocumentsWriterPerThread} instance that is used during indexing to
 4    * build a in-memory index segment. {@link ThreadState} also holds all flush
 5    * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
 6    * <p>
 7    * A {@link ThreadState}, its methods and members should only accessed by one
 8    * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
 9    * and release the lock in a finally block via {@link ThreadState#unlock()}
10    * before accessing the state.
11    */
12   @SuppressWarnings("serial")
13   final static class ThreadState extends ReentrantLock {
14     DocumentsWriterPerThread dwpt;
15     // TODO this should really be part of DocumentsWriterFlushControl
16     // write access guarded by DocumentsWriterFlushControl
17     volatile boolean flushPending = false;
18     // TODO this should really be part of DocumentsWriterFlushControl
19     // write access guarded by DocumentsWriterFlushControl
20     long bytesUsed = 0;
21     // guarded by Reentrant lock
22     private boolean isActive = true;
23 
24     ThreadState(DocumentsWriterPerThread dpwt) {
25       this.dwpt = dpwt;
26     }
27     
28     /**
29      * Resets the internal {@link DocumentsWriterPerThread} with the given one. 
30      * if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
31      * for indexing anymore.
32      * @see #isActive()  
33      */
34   
35     private void deactivate() {
36       assert this.isHeldByCurrentThread();
37       isActive = false;
38       reset();
39     }
40     
41     private void reset() {
42       assert this.isHeldByCurrentThread();
43       this.dwpt = null;
44       this.bytesUsed = 0;
45       this.flushPending = false;
46     }
47     
48     /**
49      * Returns <code>true</code> if this ThreadState is still open. This will
50      * only return <code>false</code> iff the DW has been closed and this
51      * ThreadState is already checked out for flush.
52      */
53     boolean isActive() {
54       assert this.isHeldByCurrentThread();
55       return isActive;
56     }
57     
58     boolean isInitialized() {
59       assert this.isHeldByCurrentThread();
60       return isActive() && dwpt != null;
61     }
62     
63     /**
64      * Returns the number of currently active bytes in this ThreadState's
65      * {@link DocumentsWriterPerThread}
66      */
67     public long getBytesUsedPerThread() {
68       assert this.isHeldByCurrentThread();
69       // public for FlushPolicy
70       return bytesUsed;
71     }
72     
73     /**
74      * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
75      */
76     public DocumentsWriterPerThread getDocumentsWriterPerThread() {
77       assert this.isHeldByCurrentThread();
78       // public for FlushPolicy
79       return dwpt;
80     }
81     
82     /**
83      * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
84      * pending otherwise <code>false</code>
85      */
86     public boolean isFlushPending() {
87       return flushPending;
88     }
89   }
 1 /**
 2  * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
 3  * indexing thread to the same {@link ThreadState} each time the thread tries to
 4  * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
 5  * associated with the creating thread. Subsequently, if the threads associated
 6  * {@link ThreadState} is not in use it will be associated with the requesting
 7  * thread. Otherwise, if the {@link ThreadState} is used by another thread
 8  * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
 9  * minimal contended {@link ThreadState}.
10  */
11 class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
12   private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<>();
13   
14   /**
15    * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
16    */
17   public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
18     super(maxNumPerThreads);
19     assert getMaxThreadStates() >= 1;
20   }
21 
22   @Override
23   public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
24     ThreadState threadState = threadBindings.get(requestingThread);
25     if (threadState != null && threadState.tryLock()) {
26       return threadState;
27     }
28     ThreadState minThreadState = null;
29 
30     
31     /* TODO -- another thread could lock the minThreadState we just got while 
32      we should somehow prevent this. */
33     // Find the state that has minimum number of threads waiting
34     minThreadState = minContendedThreadState();
35     if (minThreadState == null || minThreadState.hasQueuedThreads()) {
36       final ThreadState newState = newThreadState(); // state is already locked if non-null
37       if (newState != null) {
38         assert newState.isHeldByCurrentThread();
39         threadBindings.put(requestingThread, newState);
40         return newState;
41       } else if (minThreadState == null) {
42         /*
43          * no new threadState available we just take the minContented one
44          * This must return a valid thread state since we accessed the 
45          * synced context in newThreadState() above.
46          */
47         minThreadState = minContendedThreadState();
48       }
49     }
50     assert minThreadState != null: "ThreadState is null";
51     
52     minThreadState.lock();
53     return minThreadState;
54   }
55 
56   @Override
57   public ThreadAffinityDocumentsWriterThreadPool clone() {
58     ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool) super.clone();
59     clone.threadBindings = new ConcurrentHashMap<>();
60     return clone;
61   }
62 }
再回到ThreadAffinityDocumentWriterThreadPool类。getAndLock的主要流程如下:
1、 请求线程requestingThread需要进行updatedocument操作,它首先会尝试从线程池threadBings获取自身线程的ThreadState锁并尝试去锁它即trylock如果锁成功了,那么它就能再度获取到自身线程的ThreadState,这是最好的一种情况;
2、 如果自身线程的trylock失败,说明该ThreadState已经被别的requestingThread线程抢去,那么请求线程requestingThread只能去线程池threadBings获取别的线程获取的规则是minContendedThreadState(),源码如下所示.;
minContendedThreadState的规则就是遍历所有活跃的ThreadState,如果ThreadState的队列内元素个数最少(即等待这个ThreadState的线程最少),那么这个ThreadState就是返回的那个ThreadState,即minThreadState.
 1   /**
 2    * Returns the ThreadState with the minimum estimated number of threads
 3    * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
 4    * is yet visible to the calling thread.
 5    */
 6   ThreadState minContendedThreadState() {
 7     ThreadState minThreadState = null;
 8     final int limit = numThreadStatesActive;
 9     for (int i = 0; i < limit; i++) {
10       final ThreadState state = threadStates[i];
11       if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
12         minThreadState = state;
13       }
14     }
15     return minThreadState;
16   }
3、 如果minThreadState==null(一般是第一个获取ThreadState这种情况)或者minThreadState有其他线程在等待(正常情况下都会有线程在等的),那么requestingThread会去申请新的ThreadState,即从maxIndexingThreads的线程里申请,源码如下;
threadStates是一个ThreadStates的数组,当需要threadBings的ThreadState个数(也就是活跃的线程)小于threadStates的元素个数(maxIndexingThreads)时就能申请到新的ThreadState。
 1   /**
 2    * Returns a new {@link ThreadState} iff any new state is available otherwise
 3    * <code>null</code>.
 4    * <p>
 5    * NOTE: the returned {@link ThreadState} is already locked iff non-
 6    * <code>null</code>.
 7    * 
 8    * @return a new {@link ThreadState} iff any new state is available otherwise
 9    *         <code>null</code>
10    */
11   synchronized ThreadState newThreadState() {
12     if (numThreadStatesActive < threadStates.length) {
13       final ThreadState threadState = threadStates[numThreadStatesActive];
14       threadState.lock(); // lock so nobody else will get this ThreadState
15       boolean unlock = true;
16       try {
17         if (threadState.isActive()) {
18           // unreleased thread states are deactivated during DW#close()
19           numThreadStatesActive++; // increment will publish the ThreadState
20           assert threadState.dwpt == null;
21           unlock = false;
22           return threadState;
23         }
24         // unlock since the threadstate is not active anymore - we are closed!
25         assert assertUnreleasedThreadStatesInactive();
26         return null;
27       } finally {
28         if (unlock) {
29           // in any case make sure we unlock if we fail 
30           threadState.unlock();
31         }
32       }
33     }
34     return null;
35   }
4、 如果minContentedThreadState获取成功,那么threadBings的线程池就会得到更新如果minContentedThreadState获取失败,那么说明threadStates数组以及分配完全,那么请求线程会再去取获取minContentedThreadState;
5、 最后请求线程会去lockminThreadState,如果lock失败就进入休眠,一直等到lock成功这是最不好的一种结果;
最后在源码说道,请求线程在获取minThreadState时候别的线程也有可能获取到该minThreadState,目前来说这是一种缺陷。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: