JUC-CountDownLatch基础篇
JUC-CountDownLatch源码分析
JUC-Semaphore基础篇
JUC-Semaphore源码分析
JUC-ReentrantReadWriteLock锁基础篇
JUC-ReentrantReadWriteLock锁源码分析
JUC-ReentrantLock锁基础篇
JUC-ReentrantLock锁源码分析
JUC-CyclicBarrier基础篇
JUC-CyclicBarrier源码分析
文章目录
-
- Semaphore类结构图
-
- 1.1 Semaphore的构造方法
- 2.获取信号量
-
- 2.1 tryAcquireShared方法源码
- 3.释放信号量
-
- 3.1 tryReleaseShared方法源码
建议阅读Semaphore源码分析之前,先阅读JUC-Semaphore基础篇,了解一下Semaphore用处以及基本用法。
我们知道Semaphore是一个信号量,在java并发编程中,Semaphore本质上是一个共享锁的,用来限制多线程对共享资源的并发访问的。
1. Semaphore类结构图
Semaphore内部有一个类Sync,Sync是一个继承了AbstractQueuedSynchronizer的静态抽象类Sync,还有一个负责处理公平策略的内部类FairSync,一个处理非公平策略的内部类NonfairSync,它们两个都是Sync的子类。
1.1 Semaphore的构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
可以看到有一个带一个参数permits的构造方法,默认是采用非公平锁策略,还有一个是带有两个参数permits,fair的构造方法,根据传入的参数来决定是否采用公平策略。
如果采用公平策略的话,就调用FairSync的构造方法:
FairSync(int permits) {
super(permits);
}
FairSync构造方法又调用父类Sync的构造方法:
Sync(int permits) {
setState(permits);
}
在这里给state变量赋初始值为permits,代表此信号量一开始的个数。
如果采用非公平策略的话,就调用NonfairSync的构造方法:
NonfairSync(int permits) {
super(permits);
}
同样调用父类Sync的构造方法,给state变量赋初始值为permits
2.获取信号量
获取一个信号量的流程大致如下:
调用Semaphore类的的acquire()方法—>调用AQS的acquireSharedInterruptibly(1)方法—>调用Sync类的tryAcquireShared(1)方法—>如果tryAcquireShared返回值小于0,代表已经没有信号量可以获取了,则调用AQS的doAcquireSharedInterruptibly(1)方法,将当前线程加入到同步队列中阻塞住
获取一个信号量的入口方法是semaphore.acquire()
//此acquire方法是Semaphore类里面的
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
再调用AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
此方法中如果tryAcquireShared方法的返回值小于0,代表已经没有信号量可以获取了,则就调用AQS的doAcquireSharedInterruptibly方法将当前线程加入到同步队列中并且阻塞住(doAcquireSharedInterruptibly源码实现可以参考AQS源码篇中doAcquireShared方法)
而tryAcquireShared方法在AQS中是没有实现的,由Semaphore中继承了AQS类的内部类Sync提供了实现。
2.1 tryAcquireShared方法源码
tryAcquireShared方法作用主要是判断Semaphore的信号量还有没有,如果有的话,那就将当前信号量数值减一,表示当前线程用掉了一个信号量,让后返回剩下没有用的信号量数值,如果没有那就返回-1,那么当前线程就要到同步队列上面阻塞住了。分为两个实现,在公平策略下与非公平策略下。
非公平策略的tryAcquireShared方法实现源码
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
直接调用nonfairTryAcquireShared方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
nonfairTryAcquireShared方法的大致流程如下:
1、 先开始一个死循环;
2、 获取剩余的可以的信号量数值,赋值给变量available;
3、 然后将available减一,代表当前线程要消耗掉一个信号量,然后赋值给remaining,然后看消耗掉了一个信号量,剩余的信号量值是多少;
4、 如果remaining值小于0,代表available减一之前,剩余的信号量为0,没有可用的信号量给当前线程消耗,那么直接返回-1;
5、 如果remaining值大于等于0,那么代表有可用的信号量给当前线程消耗然后通过CAS操作将state值更新为当前还剩余的信号量并且返回当前还剩余的信号量数值;
公平策略的tryAcquireShared方法实现源码
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
此方法大致流程如下:
1、 先开始一个死循环;
2、 然后通过hasQueuedPredecessors()方法来判断,是不是已经有其他线程在排队等着获取信号量了,如果是的话,为了体现先来先到的公平原则,当前线程不能去获取剩余的信号量了,直接返回-1老老实实的排队等着(hasQueuedPredecessors方法源码分析可以参考这里);
3、 如果没有其他的线程在排队等着获取信号量,那么就接着执行后面的逻辑后面的逻辑就是nonfairTryAcquireShared方法里面的逻辑了,参考上面流程就行;
3.释放信号量
释放一个信号量的流程大致如下:
调用Semaphore类的的release()方法—>调用AQS的releaseShared(1)方法—>调用Sync类的tryReleaseShared(1)方法—>如果tryReleaseShared返回值为true,代表成功释放了一个信号量,则调用AQS的doReleaseShared()方法,唤醒同步队列中head节点的后继节点线程
释放一个信号量的入口方法是semaphore.release()
//此release方法是Semaphore类里面的
public void release() {
sync.releaseShared(1);
}
再调用AQS的releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
如果tryReleaseShared方法的返回值为true,代表成功释放了一个信号量,则就调用AQS的doReleaseShared方法将同步队列head节点的后继节点唤醒(doReleaseShared方法的源码可以参考AQS源码篇)
而tryReleaseShared方法在AQS中是没有实现的,由Semaphore中继承了AQS类的内部类Sync提供了实现。
3.1 tryReleaseShared方法源码
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared方法的大致流程:
1、 开始一个死循环;
2、 获取剩余信号量的个数,赋值给current;
3、 将剩余信号量的个数+1赋值给next变量;
4、 如果next变量的值还小于current表示超出最大许可的信号量,则直接返回false;
5、 否则,将信号量的值更新为next,然后返回true,代表释放信号量成功;
注意:Semaphore的tryReleaseShared方法与JUC中其他锁的tryReleaseShared方法最大的不同点是,调用Semaphore的tryReleaseShared方法来释放信号量,不需要当前线程之前调用acquire方法成功获取了一个信号量,当前线程才有资格来释放一个信号量。也就是说,我一个线程不需要获取一个信号量,我就直接来释放一个信号量,这个导致Semaphore的信号量最大值并不是new Semaphore(permits)时传递的permits初始值,它是可以变的,可以一直增加,直到超出了int类型的最大值,就变成了负数,然后if (next < current)条件生效,直接抛出throw new Error(“Maximum permit count exceeded”)异常了。