Solr4.8.0源码分析(3)之index的线程池管理
Solr建索引时候是有最大的线程数限制的,它由solrconfig.xml的
那么Solr建索引时候是怎么管理线程池的呢,主要是通过ThreadAffinityDocumentsWriterThreadPool来进行管理的,它继承了DocumentsWriterPerThreadPool类。ThreadAffinityDocumentsWriterThreadPool的结构并不复杂,主要的一个函数是getAndLock()。
在建索引时候即updatedocuments时候,Solr先要调用getAndLock去获取ThreadState这个锁。而ThreadState这个锁就是存放在ThreadAffinityDocumentsWriterThreadPool的threadBings这个线程池里面。
首先先看下什么是ThreadState锁,源码如下:
ThreadState是DocumentsWriterPerThreadPool的一个内部类。它包含了一个DocumentsWriterPerThread类的实例以及状态控制,DocumentsWriterPerThread是线程池的一个线程,主要作用是索引的建立。该类比较简单就不详细介绍了。
1 /**
2 * {@link ThreadState} references and guards a
3 * {@link DocumentsWriterPerThread} instance that is used during indexing to
4 * build a in-memory index segment. {@link ThreadState} also holds all flush
5 * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
6 * <p>
7 * A {@link ThreadState}, its methods and members should only accessed by one
8 * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
9 * and release the lock in a finally block via {@link ThreadState#unlock()}
10 * before accessing the state.
11 */
12 @SuppressWarnings("serial")
13 final static class ThreadState extends ReentrantLock {
14 DocumentsWriterPerThread dwpt;
15 // TODO this should really be part of DocumentsWriterFlushControl
16 // write access guarded by DocumentsWriterFlushControl
17 volatile boolean flushPending = false;
18 // TODO this should really be part of DocumentsWriterFlushControl
19 // write access guarded by DocumentsWriterFlushControl
20 long bytesUsed = 0;
21 // guarded by Reentrant lock
22 private boolean isActive = true;
23
24 ThreadState(DocumentsWriterPerThread dpwt) {
25 this.dwpt = dpwt;
26 }
27
28 /**
29 * Resets the internal {@link DocumentsWriterPerThread} with the given one.
30 * if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
31 * for indexing anymore.
32 * @see #isActive()
33 */
34
35 private void deactivate() {
36 assert this.isHeldByCurrentThread();
37 isActive = false;
38 reset();
39 }
40
41 private void reset() {
42 assert this.isHeldByCurrentThread();
43 this.dwpt = null;
44 this.bytesUsed = 0;
45 this.flushPending = false;
46 }
47
48 /**
49 * Returns <code>true</code> if this ThreadState is still open. This will
50 * only return <code>false</code> iff the DW has been closed and this
51 * ThreadState is already checked out for flush.
52 */
53 boolean isActive() {
54 assert this.isHeldByCurrentThread();
55 return isActive;
56 }
57
58 boolean isInitialized() {
59 assert this.isHeldByCurrentThread();
60 return isActive() && dwpt != null;
61 }
62
63 /**
64 * Returns the number of currently active bytes in this ThreadState's
65 * {@link DocumentsWriterPerThread}
66 */
67 public long getBytesUsedPerThread() {
68 assert this.isHeldByCurrentThread();
69 // public for FlushPolicy
70 return bytesUsed;
71 }
72
73 /**
74 * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
75 */
76 public DocumentsWriterPerThread getDocumentsWriterPerThread() {
77 assert this.isHeldByCurrentThread();
78 // public for FlushPolicy
79 return dwpt;
80 }
81
82 /**
83 * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
84 * pending otherwise <code>false</code>
85 */
86 public boolean isFlushPending() {
87 return flushPending;
88 }
89 }
1 /**
2 * A {@link DocumentsWriterPerThreadPool} implementation that tries to assign an
3 * indexing thread to the same {@link ThreadState} each time the thread tries to
4 * obtain a {@link ThreadState}. Once a new {@link ThreadState} is created it is
5 * associated with the creating thread. Subsequently, if the threads associated
6 * {@link ThreadState} is not in use it will be associated with the requesting
7 * thread. Otherwise, if the {@link ThreadState} is used by another thread
8 * {@link ThreadAffinityDocumentsWriterThreadPool} tries to find the currently
9 * minimal contended {@link ThreadState}.
10 */
11 class ThreadAffinityDocumentsWriterThreadPool extends DocumentsWriterPerThreadPool {
12 private Map<Thread, ThreadState> threadBindings = new ConcurrentHashMap<>();
13
14 /**
15 * Creates a new {@link ThreadAffinityDocumentsWriterThreadPool} with a given maximum of {@link ThreadState}s.
16 */
17 public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
18 super(maxNumPerThreads);
19 assert getMaxThreadStates() >= 1;
20 }
21
22 @Override
23 public ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter) {
24 ThreadState threadState = threadBindings.get(requestingThread);
25 if (threadState != null && threadState.tryLock()) {
26 return threadState;
27 }
28 ThreadState minThreadState = null;
29
30
31 /* TODO -- another thread could lock the minThreadState we just got while
32 we should somehow prevent this. */
33 // Find the state that has minimum number of threads waiting
34 minThreadState = minContendedThreadState();
35 if (minThreadState == null || minThreadState.hasQueuedThreads()) {
36 final ThreadState newState = newThreadState(); // state is already locked if non-null
37 if (newState != null) {
38 assert newState.isHeldByCurrentThread();
39 threadBindings.put(requestingThread, newState);
40 return newState;
41 } else if (minThreadState == null) {
42 /*
43 * no new threadState available we just take the minContented one
44 * This must return a valid thread state since we accessed the
45 * synced context in newThreadState() above.
46 */
47 minThreadState = minContendedThreadState();
48 }
49 }
50 assert minThreadState != null: "ThreadState is null";
51
52 minThreadState.lock();
53 return minThreadState;
54 }
55
56 @Override
57 public ThreadAffinityDocumentsWriterThreadPool clone() {
58 ThreadAffinityDocumentsWriterThreadPool clone = (ThreadAffinityDocumentsWriterThreadPool) super.clone();
59 clone.threadBindings = new ConcurrentHashMap<>();
60 return clone;
61 }
62 }
再回到ThreadAffinityDocumentWriterThreadPool类。getAndLock的主要流程如下:
1、 请求线程requestingThread需要进行updatedocument操作,它首先会尝试从线程池threadBings获取自身线程的ThreadState锁并尝试去锁它即trylock如果锁成功了,那么它就能再度获取到自身线程的ThreadState,这是最好的一种情况;
2、 如果自身线程的trylock失败,说明该ThreadState已经被别的requestingThread线程抢去,那么请求线程requestingThread只能去线程池threadBings获取别的线程获取的规则是minContendedThreadState(),源码如下所示.;
minContendedThreadState的规则就是遍历所有活跃的ThreadState,如果ThreadState的队列内元素个数最少(即等待这个ThreadState的线程最少),那么这个ThreadState就是返回的那个ThreadState,即minThreadState.
1 /**
2 * Returns the ThreadState with the minimum estimated number of threads
3 * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
4 * is yet visible to the calling thread.
5 */
6 ThreadState minContendedThreadState() {
7 ThreadState minThreadState = null;
8 final int limit = numThreadStatesActive;
9 for (int i = 0; i < limit; i++) {
10 final ThreadState state = threadStates[i];
11 if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
12 minThreadState = state;
13 }
14 }
15 return minThreadState;
16 }
3、 如果minThreadState==null(一般是第一个获取ThreadState这种情况)或者minThreadState有其他线程在等待(正常情况下都会有线程在等的),那么requestingThread会去申请新的ThreadState,即从maxIndexingThreads的线程里申请,源码如下;
threadStates是一个ThreadStates的数组,当需要threadBings的ThreadState个数(也就是活跃的线程)小于threadStates的元素个数(maxIndexingThreads)时就能申请到新的ThreadState。
1 /**
2 * Returns a new {@link ThreadState} iff any new state is available otherwise
3 * <code>null</code>.
4 * <p>
5 * NOTE: the returned {@link ThreadState} is already locked iff non-
6 * <code>null</code>.
7 *
8 * @return a new {@link ThreadState} iff any new state is available otherwise
9 * <code>null</code>
10 */
11 synchronized ThreadState newThreadState() {
12 if (numThreadStatesActive < threadStates.length) {
13 final ThreadState threadState = threadStates[numThreadStatesActive];
14 threadState.lock(); // lock so nobody else will get this ThreadState
15 boolean unlock = true;
16 try {
17 if (threadState.isActive()) {
18 // unreleased thread states are deactivated during DW#close()
19 numThreadStatesActive++; // increment will publish the ThreadState
20 assert threadState.dwpt == null;
21 unlock = false;
22 return threadState;
23 }
24 // unlock since the threadstate is not active anymore - we are closed!
25 assert assertUnreleasedThreadStatesInactive();
26 return null;
27 } finally {
28 if (unlock) {
29 // in any case make sure we unlock if we fail
30 threadState.unlock();
31 }
32 }
33 }
34 return null;
35 }
4、 如果minContentedThreadState获取成功,那么threadBings的线程池就会得到更新如果minContentedThreadState获取失败,那么说明threadStates数组以及分配完全,那么请求线程会再去取获取minContentedThreadState;
5、 最后请求线程会去lockminThreadState,如果lock失败就进入休眠,一直等到lock成功这是最不好的一种结果;
最后在源码说道,请求线程在获取minThreadState时候别的线程也有可能获取到该minThreadState,目前来说这是一种缺陷。
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: