JUC-CountDownLatch基础篇
JUC-CountDownLatch源码分析
JUC-Semaphore基础篇
JUC-Semaphore源码分析
JUC-ReentrantReadWriteLock锁基础篇
JUC-ReentrantReadWriteLock锁源码分析
JUC-ReentrantLock锁基础篇
JUC-ReentrantLock锁源码分析
JUC-CyclicBarrier基础篇
JUC-CyclicBarrier源码分析
上一篇讲了AQS的实现原理这一篇讲讲AQS的的具体源码实现
文章目录
- 1.AQS中的代码主要是干什么的?
- 2.共享锁实现的源码解读
-
- 2.1获取共享锁的源码
-
- 2.1.1 acquireShared(int arg)方法
- 2.1.2 tryAcquireShared方法(arg):
- 2.1.3 doAcquireShared(int arg)方法
- 2.1.4 addWaiter(Node mode)方法
- 2.1.5 compareAndSetTail方法
- 2.1.6 enq(final Node node)方法
- 2.1.7 setHeadAndPropagate(Node node, int propagate)方法
- 2.1.8 doReleaseShared()方法
- 2.1.9 unparkSuccessor(Node node)方法
- 2.1.10 selfInterrupt()方法
- 2.1.11 shouldParkAfterFailedAcquire(Node pred, Node node)方法
- 2.1.12 parkAndCheckInterrupt() 方法
- 2.1.13 cancelAcquire(Node node)方法
- 2.2释放共享锁的源码
-
- 2.2.1 releaseShared(int arg)方法
- 2.2.1 tryReleaseShared(arg)方法
- 3.独占锁(排它锁)实现的源码解读
-
- 3.1 获取独占锁的源码
-
- 3.1.1 acquire(int arg)方法
- 3.1.2 acquireQueued(final Node node, int arg)方法
- 3.2释放独占锁的源码
-
- 3.2.1 release(int arg)方法
- 3.2.2 tryRelease(arg)方法
1.AQS中的代码主要是干什么的?
AQS这个抽象类中的代码其实主要就是干了两件事,一个是获取锁,另一个就是释放锁了。
获取锁:主要就是调用由AQS子类来实现的能不能获取到锁的方法,如果获取到了,则当前线程接着执行它后面的代码,如果获取失败了,则就将当前线程放入到同步队列中阻塞住,等待其他的释放锁的线程来把它唤醒。
获取锁的大致方法调用链如下:
共享锁:
acquireShared(arg)方法—>tryAcquireShared(arg)方法—>doAcquireShared(arg)方法
排他锁(独占锁):
acquire(arg)方法—>tryAcquire(arg)方法—>acquireQueued(node, arg)方法
上面的acquireShared(arg)方法和acquire(arg)方法是获取锁的入口代码,由具体AQS的子类来调用它们。tryAcquireShared(arg)方法和tryAcquire(arg)方法是用来判断锁有没有获取成功的,它们在AQS中是没有实现的,具体是由AQS的子类来实现的(这一点很重要,因为每个AQS的子类它们的作用是不同的,一把锁,怎么样才算获取成功,怎么样才算获取失败,每个AQS的子类的逻辑应该是不同的,所以才会将实现交给具体的子类来完成)。doAcquireShared(arg)方法和acquireQueued(node, arg)方法是在获取锁失败后,将当前线程放入到同步队列中并且阻塞住的。
释放锁: 主要就是调用由AQS子类来实现的能不能把锁释放成功的方法。如果释放成功,则就将同步队列中阻塞的线程唤醒(是head头部的后继节点这个线程)。如果释放失败,那就看具体的业务代码怎么处理这种情况了。
释放锁的大致方法调用链如下:
共享锁:
releaseShared(arg)方法—>tryReleaseShared(arg)方法—>doReleaseShared()方法
排他锁(独占锁):
release(arg)方法—>tryRelease(arg)方法—>unparkSuccessor(h)方法
上面的releaseShared(arg)方法和 release(arg)方法是释放锁的入口代码,由具体AQS的子类来调用它们。tryReleaseShared(arg)方法和tryRelease(arg)方法是用来判断锁有没有释放成功的,它们在AQS中是没有实现的,具体是由AQS的子类来实现的(这一点很重要,因为每个AQS的子类它们的作用是不同的,一把锁,怎么样才算释放成功,怎么样才算释放失败,每个AQS的子类的逻辑应该是不同的,所以才会将实现交给具体的子类来完成)。doReleaseShared()方法和unparkSuccessor(h)方法是在释放锁成功情况下,将同步队列中阻塞的线程唤醒的。unparkSuccessor(h)是在独占锁释放成功的情况下调用的,直接将同步队列中head头节点的后继节点唤醒。doReleaseShared()方法是在共享锁释放成功的情况下调用的,有人会问,不是直接调用unparkSuccessor(h)方法唤醒head头节点的后继节点不就行了吗?这里面涉及一个共享锁的锁释放传播PROPAGATE的知识点。要知道,独占锁只能有一个线程持有,所有不存在同一时刻会有多个线程同时释放独占锁的事情发生。而对于共享锁来说,共享锁可以被多个线程同时持有,所以存在在同一时刻会有多个线程同时释放共享锁的事情发生。如果直接调用unparkSuccessor(h)方法,都是唤醒head头节点的后继节点。本来是一个是唤醒head头节点的后继节点,一个是唤醒head头节点的后继节点的后继节点。那么不就丢失了一次唤醒机会么,所有对于共享锁的唤醒是调用doReleaseShared()方法,这个方法里面处理了这种情况。当然这个方法里面还是最终来调用unparkSuccessor(h)方法来释放阻塞住的节点。
AQS的设计模式是基于模板⽅法的,它有⼀些⽅法必须要⼦类去实现的,它们主要有:
- tryAcquireShared(int): 共享模式,尝试获取资源(共享锁),返回值
<0,表示没有资源了,获取锁失败,=0表示资源刚好等于0,获取锁成功。>
0表示资源还有剩余的,获取锁成功。 - tryAcquire(int): 独占模式,尝试获取资源(独占锁),返回值为true代表获取成功,为false代表获取失败。
- tryReleaseShared(int): 共享模式,尝试释放资源(共享锁),返回值为true代表释放成功,为false代表释放失败。
- tryRelease(int): 独占模式,尝试释放资源(独占锁),返回值为true代表释放成功,为false代表释放失败。
- isHeldExclusively():该线程是否正在独占资源。只有⽤到condition才需要去实现它
这些方法在AQS中都是没有具体实现的,直接抛出UnsupportedOperationException异常,由具体的AQS子类来实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
2.共享锁实现的源码解读
2.1获取共享锁的源码
获取共享锁的入口方法是acquireShared(int arg)方法。
2.1.1 acquireShared(int arg)方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
上面arg参数代表需要获取资源的个数,调用AQS子类的实现方法tryAcquireShared(arg)尝试获取锁,如果此方法返回值<0,代表获取锁失败,则接着调用doAcquireShared(arg)方法,将当前线程放入到同步队列中阻塞住(注意:并不是立马就将当前线程阻塞住,而是在阻塞之前还有尝试获取锁的机会)。
2.1.2 tryAcquireShared方法(arg):
此方法是由AQS的子类来具体实现的,尝试去获取锁,返回值为资源剩余的可用个数。
2.1.3 doAcquireShared(int arg)方法
private void doAcquireShared(int arg) {
//将当前线程加入到同步队列中,Node节点的nextWaiter=Node.SHARED
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是头结点的话,就尝试再一次获取锁
//此if判断也确保了,如果从同步队列中唤醒一个节点,那必然是head节点的后继节点
if (p == head) {
//再次尝试获取一把锁
int r = tryAcquireShared(arg);
//获取锁成功的话
if (r >= 0) {
//将此节点设置成新的head头节点并且传播Propagate状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
//如果在此期间,发生了中断,则调用此方法
selfInterrupt();
//走到这一步说明,没有发生什么失败情况,将失败标志设置成false
failed = false;
return;
}
}
//一般情况下shouldParkAfterFailedAcquire方法会调用两遍。
//第一遍是将node节点的前驱节点的waitStatus值设置成-1,返回值为false。
//调用第二遍时,node节点的前驱节点的waitStatus值已经是-1,直接返回true,接着执行parkAndCheckInterrupt方法。
//parkAndCheckInterrupt方法才是真正实现将当前线程阻塞住的代码,阻塞住后,等待唤醒。如果唤醒后,发现此线程被中断了,就将interrupted 标识设置成true。
//注意:阻塞住的线程有可能是被正常唤醒的,也有可能是此线程被别的线程中断了,唤醒的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//发生以上逻辑没有正常的走下去,就说明发生了异常情况。
//则执行失败移除,或通知后续节点继续
cancelAcquire(node);
}
}
此方法先是调用addWaiter方法将当前线程封装进Node节点中,并以尾加入的方式加入到同步队列当中去。然后进入到死循环for中,在每次for循环时,先判断此节点的前驱节点是不是head头节点。如果是的话,则就再次尝试的去获取锁,如果获取成功的话,就将当前节点设置成新的head头节点并且将Propagate状态传播下去。如果在阻塞期间检测到中断的话,就调用selfInterrupt()。接着就直接return返回了。如果此节点的前驱节点不是head头节点,那么就接着往下执行。一般情况下shouldParkAfterFailedAcquire方法需要执行两遍。第一遍发现此节点的前驱节点的waitStatus不是-1(因为一般情况下新建一个Node节点后,此节点的waitStatus初始值为0,在Condition情况下,例外初始值为-2)就将此节点的前驱节点的waitStatus设置为-1。第二遍的时候发现此节点的前驱节点的waitStatus已经是-1,直接返回true(shouldParkAfterFailedAcquire方法也可以执行一遍直接返回true,然后直接执行parkAndCheckInterrupt方法,那就是此线程之前已经被阻塞了,已经将它的前驱节点的waitStatus值赋值为-1了,然后被唤醒了,才又执行一次for循环的内容,很不幸只是又没成功获取锁,又得被阻塞了,这种情况也是有可能发生的)接着执行parkAndCheckInterrupt方法将当前线程park阻塞住,此时当前线程的代码执行到这一步就不继续执行后面的代码,等待着其他的线程unpark唤醒它。等它被唤醒后,会检查此线程有没有被中断过,如果有parkAndCheckInterrupt返回true,将中断标志interrupted 设置成true。接着又会走以上流程,继续执行for循环的那一部分代码。如果在执行for循环代码的期间,发生了什么异常,导致for循环的那部分代码不是以正常的情况下退出死循环的。那么在执行finally代码块的时候就会执行cancelAcquire方法了。
2.1.4 addWaiter(Node mode)方法
private Node addWaiter(Node mode) {
//新建一个Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
//如果同步队列已经初始化了,则将新建的Node节点以尾插入的方式入队,并将尾指针tail执行新建的Node节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果同步队列没有初始化,则调用enq方法进行初始化并且将新建的Node节点以尾插入的方式入队,并将尾指针tail执行新建的Node节点
enq(node);
return node;
}
addWaiter方法主要是将当前线程封装成一个新的Node节点,并以尾插入的方式入队,并将尾指针tail执行新建的Node节点。如果同步队列没有初始化的话,顺便初始化它。
2.1.5 compareAndSetTail方法
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
compareAndSetTail方法主要是利用unsafe类的native compareAndSwapObject方法以CAS的方式将同步队列的尾节点更新为新建的Node节点。
2.1.6 enq(final Node node)方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法主要是将node节点加入到同步队列中去,并且通过CAS的方式将尾指针tail指向node节点。如果同步队列没有初始化的话,就先初始化它,先通过CAS的方式将头指针head指向一个无参构造器new出来的Node节点。让后尾指针tail也指向head节点。然后再通过CAS的方式将尾指针tail指向参数node节点
2.1.7 setHeadAndPropagate(Node node, int propagate)方法
private void setHeadAndPropagate(Node node, int propagate) {
//先保存一下旧head头节点
Node h = head;
//将node设置为新的head头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//如果if条件满足,获取新head节点的后续节点
Node s = node.next;
if (s == null || s.isShared())
//如果这个后续节点为null或者是共享节点的话,就执行一次唤醒动作
doReleaseShared();
}
}
setHeadAndPropagate方法做的事情
- 保存一下旧的head节点
- 将node节点设置成新的head节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- 判断if条件是否成立,如果成立的话并且满足if (s == null || s.isShared())条件的话执行doReleaseShared方法继续唤醒同步队列中的节点。
- 如果if条件成立的话,获取新head节点的后续节点,如果这个后续节点为null或者是共享节点类型的话,就执行一次唤醒动作doReleaseShared(),来唤醒这个新head节点的后继节点。
来分析一下if条件成立的情况有哪些: - 如果参数propagate大于0的话,if条件直接成立,不用管后面的判断了。我们知道参数propagate的值实际上传的是tryAcquireShared方法返回的值,而tryAcquireShared方法返回的值代表的是剩余可以用资源的个数,如果大于0了,说明还有剩余的资源可以去获取,也就是说这把共享锁虽然已经被调用setHeadAndPropagate方法的线程获取了,但是这把共享锁还可以接着被其他的线程继续获取(因为共享锁是可以被多个线程同时持有的),也就是说此时去获取共享锁可能会获取到。那么我们应该就接着去唤醒下一个因为要想获取共享锁而阻塞住的线程,来继续尝试获取这把共享锁。
- 再来说一下h == null || h.waitStatus < 0这种情况,首先小编认为h等于null的这种情况是不存在的,因为h指向的是旧head节点,而旧head节点不可能被gc掉,因为有强引用h指向它,那么就只有可能在h=head时,head就已经是null了,但是在执行setHeadAndPropagate方法之前一定执行过addWaiter方法,而addWaiter有判断如果同步队列为null,就初始化它一下,所以同步队列不可能为空的,所以旧head一定不可能为null。那么h等于null这种情况就不存在了。如果说h等于null是为了防止NPE的,那么为什么不判断一下h!=null呢?对此小编不是很明白,如果有读者明白的话,欢迎留言告诉小编。既然h==null不可能成立的话,那就看h.waitStatus < 0这种情况了。
首先旧head的waitStatus 取值有0,-3,-1
(1)旧head的waitStatus 取值为0的情况有,情况一:当doAcquireShared方法的for循环代码刚刚开始执行第一遍的时候,刚好有其他的线程释放了锁,此时执行for循环中的tryAcquireShared方法时,就获取到了锁,不用调用shouldParkAfterFailedAcquire方法将head节点的waitStatus 设置成-1,进行阻塞了,此时head的waitStatus值就是初始值0。情况二:当有其他的线程被唤醒了,执行了自己的setHeadAndPropagate方法将自己的节点设置成head头节点,同时执行了setHeadAndPropagate方法里面的doReleaseShared方法来唤醒自己的后继节点时,会将自己的节点(此时是head头节点)的waitStatus值从-1变成了0,用来唤醒自己的后继节点。此时这个后继节点执行自己的setHeadAndPropagate方法是就会发现head的waitStatus值变成了0。
(2)旧head的waitStatus 取值为-3的情况是一个线程执行了doReleaseShared方法释放了锁,导致head的值从-1变成了0,此时head的后继节点被唤醒了,继续执行doAcquireShared方法的那个for循环代码,但是还没来的及执行到setHeadAndPropagate方法来改变head时,又有一个线程执行了doReleaseShared方法释放了锁,导致head的值从0变成了-3,此时那个被唤醒的后继节点执行到setHeadAndPropagate方法里面时就会发现head的值为-3了。
(3)旧head的waitStatus 取值为-1的情况是当doAcquireShared方法的for循环代码已经执行完第一遍将head的waitStatus 值从0变成-1,执行第二遍的时候,刚好有其他的线程释放了锁,此时执行for循环中的tryAcquireShared方法时,就获取到了锁,不用再调用parkAndCheckInterrupt方法进行阻塞了,此时调用setHeadAndPropagate方法时发现head的waitStatus 值为-1。
- 再来说一下(h = head) == null || h.waitStatus < 0。这是首先将新head赋值给h引用,同样小编也认为新head也是不可能可null值的,理由同上。那就看h.waitStatus < 0这种情况了。
新head的waitStatus 取值有0,-3,-1
(1)新head的waitStatus 取值为0的情况有:情况一:此时同步队列中没有新的Node节点了,也就是说新head后面已经没有Node节点了,新head的waitStatus 值就是初始值0了。情况二:当新head后面有新的Node节点加入进来时,但是这个新加入的Node节点还没有执行shouldParkAfterFailedAcquire方法将它的前驱节点的waitStatus 从0变成-1的时候,此时新head的waitStatus还是0。
(2)新head的waitStatus 取值为-3的情况是新head有后继节点刚好入队,还没来的及将waitStatus 的值从0变成-1,此时刚好有其他的线程释放了锁调用doReleaseShared方法,将新head的waitStatus 的值从0变成-3。
(3)新head的waitStatus 取值为-1的情况是新head有后继节点入队,并且已经调用了shouldParkAfterFailedAcquire方法将waitStatus 从0变成-1。
来分析一下if (s == null || s.isShared())条件的成立: - s==null成立的情况是:我们知道新的Node节点入队的方式是尾方式入队的。当入队执行addWaiter方法时,还没来的及执行pred.next = node;这一行代码时,那么node.next的值就是null了。也就是说如果我们不知道node.next后面的值是什么,我们一律调用doReleaseShared方法,哪怕会造成不必要的唤醒。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;//还没来的及执行这行代码
return node;
}
}
enq(node);
return node;
}
- s.isShared()成立的情况是:此节点是共享节点,只有共享节点才有传播属性这一说法
final boolean isShared() {
return nextWaiter == SHARED;
}
2.1.8 doReleaseShared()方法
private void doReleaseShared() {
for (;;) {
Node h = head;
//确保同步队列初始化了,并且同步队列中除了head头节点还有其他的节点
//不然同步队列中只有一个head头节点(tail也指向这个head节点),那就没有唤醒的必要的
if (h != null && h != tail) {
int ws = h.waitStatus;
//确保了只有一个节点的前驱节点的waitStatus为-1,它才有资格被唤醒
if (ws == Node.SIGNAL) {
//唤醒一个节点之前,必须先将它的前驱节点的waitStatus变成0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒h的后继节点
unparkSuccessor(h);
}
//上面setHeadAndPropagate中head的WaitStatus值变成-3的赋值操作就是因为执行了这行代码
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果head头节点没有改变则就跳出死循环
if (h == head) // loop if head changed
break;
}
}
首先调用doReleaseShared方法可能是一个线程释放了锁调用了releaseShared方法,releaseShared方法里面调用了doReleaseShared方法。还有可能是前面讲到的setHeadAndPropagate里面调用的doReleaseShared方法,来继续确保共享锁的传播。一个是释放了共享锁,所以需要调用doReleaseShared,一个是因为获取了共享锁,需求确保共享锁的传播而调用doReleaseShared。
着重看一下if (h != null && h != tail) 条件里面
- if (ws == Node.SIGNAL) 和if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))确保了只有一个节点的前驱节点的waitStatus值为-1,它才有资格被唤醒。同时一个节点如果被唤醒了,那么它的前驱节点的waitStatus值肯定变成了0。这些通过CAS操作保证的。
- ws == 0的情况有:
(1)head有后继节点入队了,但是这个后继节点还没有来的及执行shouldParkAfterFailedAcquire来将head的waitStatus值从0变成-1,此时head的waitStatus值就是0.
(2)有线程刚释放了锁,刚执行了unparkSuccessor里的if (ws < 0) compareAndSetWaitStatus(node, ws, 0);把head的状态设置为了0,然后还没有把head的后继节点设置成新的head节点之前(因为一个线程释放了锁,唤醒一个节点之后,会把这个节点成功获取了锁,就会把这个节点设置成新的head头节点)。
(3)当一个线程执行了doReleaseShared方法把head的waitStatus值变成0,还没来的及改变head时,又有一个新的线程执行了doReleaseShared方法,对于这个新的线程此时head的waitStatus值就是0.
当检测到在执行doReleaseShared方法时发现head的waitStatus值是0,那么就代表head的后继节点已经有前面的线程唤醒了,那么为了确保这次的doReleaseShared唤醒操作不丢失,那么就将head的waitStatus设置成-3.这样由前面的线程唤醒的head的后继节点在执行它的setHeadAndPropagate方法时就会检测到这一次的唤醒操作,那么就会在setHeadAndPropagate方法里面再调用一次doReleaseShared方法,这也就确保了这次的唤醒操作没有丢失了。 - if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。注意:如果head变化了,那么一定是因为有节点被唤醒了,并且它成功获取了锁,所有就将自己设置成新的head头节点。那么既然这个被唤醒的节点已经被唤醒了,那么调用doReleaseShared方法的唤醒操作,自然就要唤醒它的后继节点了,所有就得继续执行for循环代码了,来继续唤醒新head头节点的后继节点了。
注意:doReleaseShared方法主要是为了唤醒head节点的后继节点并将head节点的waitStatus值变成0。如果发现在本次doReleaseShared方法还没有执行完时,又有新的线程释放锁执行了doReleaseShared方法,就将head的waitStatus值变成-3,确保新的线程释放锁来唤醒同步队列中阻塞的节点这一操作不会丢失,与setHeadAndPropagate方法中的 if ( propagate > 0 || h = = null || h.waitStatus < 0 || (h = head) = = null || h.waitStatus < 0)遥相呼应。
2.1.9 unparkSuccessor(Node node)方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//要唤醒node节点的后继节点,所以将node节点的waitStatus值变成0
//对于共享锁来说,因为前面的doReleaseShared方法已经将head节点的waitStatus值变成0了,
//所以这操作主要是针对独占锁的,因为独占锁是直接调用unparkSuccessor方法的
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//若后继结点为空,或状态为CANCEL(已失效),
//则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点进行唤醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//调用UNSAFE.unpark唤醒这个线程
LockSupport.unpark(s.thread);
}
此方法主要是唤醒node节点的后继节点,如果node节点的waitStatus值小于0的话,就把它变成0,因为一个节点被唤醒的话,那么它的前驱节点的waitStatus值必须变成0
来看一下if (s == null || s.waitStatus > 0)条件成立的情况
- s为null的情况:因为我们知道新建节点Node入队是以尾方式入队的。
也就是说会存在一个线程执行完了node.prev = t;和compareAndSetTail(t, node)代码还没来的及执行t.next = node代码时,cpu时间片发生切换,刚好让调用unparkSuccessor的线程执行,此时这个线程就会发现它的node.next是null了,因为前面的t.next = node代码还没有执行,所有需要从tail尾部想前遍历找到最前的一个处于正常阻塞状态的结点,直到节点重合(即等于当前节点)。这样解释了为什么不是从head头部开始从前往后遍历,因为node.next已经是null,找不到后面的节点了,所以必须是尾向前的方式遍历,因为compareAndSetTail设置tail指针是以cas方式进行的。保证了tail指针设置成功时node.prev = t;代码一定是成功执行了的。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
//队列为空需要初始化,创建空的头节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//set尾部节点
if (compareAndSetTail(t, node)) {
//当前节点置为尾部
t.next = node; //前驱节点的next指针指向当前节点
return t;
}
}
}
}
- s.waitStatus > 0是代表这个节点被取消了,所以得以尾向前的方式重新找一个最前的一个处于正常阻塞状态的结点,直到节点重合(即等于当前节点)。
2.1.10 selfInterrupt()方法
static void selfInterrupt() {
//其实就是当在执行doAcquireShared方法中的 parkAndCheckInterrupt
//把当前线程park阻塞住,在阻塞期间,如果当前线程被中断了,被唤醒后,接着执行parkAndCheckInterrupt方法的Thread.interrupted()时会把当前线程的中断标识位置位false并且parkAndCheckInterrupt方法返回true
//那么此处方法会重新将当前线程的中断标识位置位true
Thread.currentThread().interrupt();
}
2.1.11 shouldParkAfterFailedAcquire(Node pred, Node node)方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//如果pred节点是被取消了的,就向前查找一个正常的节点并把被取消的节点踢出同步队列中。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
此方法主要是判断参数pred的waitStatus值,如果是-1的话,就返回true,如果不是的话,就把pred的waitStatus值置为-1并返回false。如果发现pred节点是被取消了的,就一直向前查找一个正常的节点并把所有被取消的节点踢出同步队列中。
为什么将pred的waitStatus值置为-1不直接返回true呢?第一返回false后继续执行doAcquireShared方法中for循环的tryAcquireShared方法,增加获取锁的可能性。第二,这样可以确保在执行parkAndCheckInterrupt方法阻塞node节点时,node节点的前驱节点的waitStatus一定是-1。
2.1.12 parkAndCheckInterrupt() 方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//当前线程执行到这一行代码时就阻塞住了
//不再继续往下执行了。
//当此线程被唤醒时,就执行这一行代码,如果在阻塞期间被中断了
//返回true并檫除线程的中断标识位,没有被中断就返回false。
return Thread.interrupted();
}
2.1.13 cancelAcquire(Node node)方法
private void cancelAcquire(Node node) {
//node为null的情况只能是addWaiter时,new Node(Thread.currentThread(), mode)发生异常导致node没有创建成功的情况
if (node == null)
return;
//先将持有线程设置为 null
node.thread = null;
//取得node的前驱节点
Node pred = node.prev;
//跳过所有被终止或失败的前驱节点,去找正常的前驱节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//取得前驱节点的后继节点
Node predNext = pred.next;
//表明此节点被取消了
node.waitStatus = Node.CANCELLED;
//如果当前node就是尾节点,就以CAS方式把刚才找到的前驱结点设置为新的尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//同时以CAS的方式将上面设置为新的尾节点的后置节点置为null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//前驱结点不是头节点而且成功设置了"信号状态"的之后,就把它的后置节点指向即将要取消的node节点的后置节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//否则让整个同步队列的node节点重新接上,通过唤醒取消节点的后继节点的方式来实现
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
此方法的作用主要是
- 取消当前节点也就是将node的waitStatus 设置成1。
- 将当前取消节点的前驱非取消节点与后继非取消节点链接起来,也就是保证整个同步队列链接不会断了
- 让整个同步队列的node节点重新接上,以保证节点能够被正常的唤醒,通过唤醒取消节点的后继节点的方式来实现。
着重来说一下if判断的情况 - 首先pred==head和pred.thread == null都是说当前node节点的前驱节点是head节点的话,那么有这样一种情况,这个取消的node节点还没来的及将head节点的waitStatus值设置成-1,并且这个取消的node节点还没有执行node.waitStatus = Node.CANCELLED;这一行代码,并且这个取消的node节点的后面已经有节点链接上来了。那么如果这个node节点被踢出同步队列了,但是head节点的waitStatus确实0,那岂不是同步队列中剩下的节点永远不会被唤醒了,于是就unparkSuccessor唤醒这个取消节点node的后继节点,让它来将head的waitStatus值设置成-1。
- (ws = pred.waitStatus) == Node.SIGNAL不成立时,也执行unparkSuccessor方法,也是同上一样原因,取消节点node还没有来的及将它的前驱节点head的waitStatus 设置成-1,它自己有后继节点链接上并把自己的waitStatus设置成-1.如果自己被取消了,那么它的后继节点怎么被唤醒呢,所有就调用一下unparkSuccessor方法唤醒一下自己后继节点,让整个同步队列链接上
- (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))不成立时,也执行unparkSuccessor方法。翻译一下就是node节点的前置节点是取消节点了或者compareAndSetWaitStatus失败,而compareAndSetlWaiStatus 失败则发生在高并发时,前置线程突然释放锁那么这种情况下也需要唤醒一下node的后继节点,让它重新链接上同步队列中,不然node的后继节点就可能永远不会被唤醒了。
2.2释放共享锁的源码
释放共享锁的入口方法是releaseShared(int arg)方法。
2.2.1 releaseShared(int arg)方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
上面arg参数代表需要释放资源的个数,调用AQS子类的实现方法tryReleaseShared(arg)尝试释放锁,如果此方法返回值为true,代表释放锁成功,则接着调用doReleaseShared(arg)方法,来唤醒同步队列中head头节点的后继节点,并且return true,否则return false;
2.2.1 tryReleaseShared(arg)方法
此方法是由AQS的子类来具体实现的,尝试去释放锁,返回值为布尔值代表锁是否释放成功。
doReleaseShared()方法 见上面讲解。
3.独占锁(排它锁)实现的源码解读
3.1 获取独占锁的源码
获取独占锁的入口方法是acquire(int arg)方法。
3.1.1 acquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面arg参数代表需要获取资源的个数,调用AQS子类的实现方法tryAcquire(arg)尝试获取锁,此方法返回值为布尔值,true代表锁获取成功,false代表获取锁失败,如果锁获取失败,则接着调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,将当前线程放入到同步队列中阻塞住(注意:并不是立马就将当前线程阻塞住,而是在阻塞之前还有尝试获取锁的机会)。并且在阻塞的过程中发现了当前线程被中断了就调用selfInterrupt()方法(selfInterrupt()方法源码见上面分析)
addWaiter(Node mode)方法 见上面讲解,只不过在独占锁中新建节点的nextWaiter属性值为Node.EXCLUSIVE。
3.1.2 acquireQueued(final Node node, int arg)方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是头结点的话,就尝试再一次获取锁
//此if判断也确保了,如果从同步队列中唤醒一个节点,那必然是head节点的后继节点
if (p == head && tryAcquire(arg)) {
//获取锁成功,就将自己设置成新的head头节点
setHead(node);
p.next = null; // help GC
failed = false;
//返回当前线程是否被中断了
return interrupted;
}
//一般情况下shouldParkAfterFailedAcquire方法会调用两遍。
//第一遍是将node节点的前驱节点的waitStatus值设置成-1,返回值为false。
//调用第二遍时,node节点的前驱节点的waitStatus值已经是-1,直接返回true,接着执行parkAndCheckInterrupt方法。
//parkAndCheckInterrupt方法才是真正实现将当前线程阻塞住的代码,阻塞住后,等待唤醒。如果唤醒后,发现此线程被中断了,就将interrupted 标识设置成true。
//注意:阻塞住的线程有可能是被正常唤醒的,也有可能是此线程被别的线程中断了,唤醒的
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此方法的参数arg代表需要获取资源的个数,node代表新建的独占模式的Node节点。整体逻辑跟上面的doAcquireShared方法基本一致,区别就是当获取锁成功后调用的是setHead方法而不是setHeadAndPropagate方法,因为独占锁没有传播Propagate属性这一说法。其他各个方法的源码解析都可以见上面讲解。
3.2释放独占锁的源码
释放独占锁的入口方法是release(int arg)方法。
3.2.1 release(int arg)方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
上面arg参数代表需要释放资源的个数,调用AQS子类的实现方法tryRelease(arg)尝试释放锁,如果此方法返回值为true,代表释放锁成功,则直接调用unparkSuccessor,来唤醒同步队列中head头节点的后继节点,并且return true,否则return false。
unparkSuccessor方法的源码解析见上面讲解。
3.2.2 tryRelease(arg)方法
此方法是由AQS的子类来具体实现的,尝试去释放锁,返回值为布尔值代表锁是否释放成功。