13、JUC源码分析:Semaphore源码分析

JUC-AQS原理篇
JUC-AQS源码篇
JUC-AQS的Condition之await和signal源码解析

JUC-CountDownLatch基础篇
JUC-CountDownLatch源码分析
JUC-Semaphore基础篇
JUC-Semaphore源码分析
JUC-ReentrantReadWriteLock锁基础篇
JUC-ReentrantReadWriteLock锁源码分析
JUC-ReentrantLock锁基础篇
JUC-ReentrantLock锁源码分析
JUC-CyclicBarrier基础篇
JUC-CyclicBarrier源码分析

文章目录

    1. Semaphore类结构图
    • 1.1 Semaphore的构造方法
  • 2.获取信号量
    • 2.1 tryAcquireShared方法源码
  • 3.释放信号量
    • 3.1 tryReleaseShared方法源码

建议阅读Semaphore源码分析之前,先阅读JUC-Semaphore基础篇,了解一下Semaphore用处以及基本用法。

我们知道Semaphore是一个信号量,在java并发编程中,Semaphore本质上是一个共享锁的,用来限制多线程对共享资源的并发访问的。

1. Semaphore类结构图

*
Semaphore内部有一个类Sync,Sync是一个继承了AbstractQueuedSynchronizer的静态抽象类Sync,还有一个负责处理公平策略的内部类FairSync,一个处理非公平策略的内部类NonfairSync,它们两个都是Sync的子类。

1.1 Semaphore的构造方法

public Semaphore(int permits) {
   
     
        sync = new NonfairSync(permits);
    }
public Semaphore(int permits, boolean fair) {
   
     
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

可以看到有一个带一个参数permits的构造方法,默认是采用非公平锁策略,还有一个是带有两个参数permits,fair的构造方法,根据传入的参数来决定是否采用公平策略。
如果采用公平策略的话,就调用FairSync的构造方法:

 FairSync(int permits) {
   
     
            super(permits);
        }

FairSync构造方法又调用父类Sync的构造方法:

Sync(int permits) {
   
     
            setState(permits);
        }

在这里给state变量赋初始值为permits,代表此信号量一开始的个数。

如果采用非公平策略的话,就调用NonfairSync的构造方法:

NonfairSync(int permits) {
   
     
            super(permits);
        }

同样调用父类Sync的构造方法,给state变量赋初始值为permits

2.获取信号量

获取一个信号量的流程大致如下:

调用Semaphore类的的acquire()方法—>调用AQS的acquireSharedInterruptibly(1)方法—>调用Sync类的tryAcquireShared(1)方法—>如果tryAcquireShared返回值小于0,代表已经没有信号量可以获取了,则调用AQS的doAcquireSharedInterruptibly(1)方法,将当前线程加入到同步队列中阻塞住

获取一个信号量的入口方法是semaphore.acquire()

//此acquire方法是Semaphore类里面的
public void acquire() throws InterruptedException {
   
     
        sync.acquireSharedInterruptibly(1);
    }

再调用AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
   
     
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

此方法中如果tryAcquireShared方法的返回值小于0,代表已经没有信号量可以获取了,则就调用AQS的doAcquireSharedInterruptibly方法将当前线程加入到同步队列中并且阻塞住(doAcquireSharedInterruptibly源码实现可以参考AQS源码篇中doAcquireShared方法)
而tryAcquireShared方法在AQS中是没有实现的,由Semaphore中继承了AQS类的内部类Sync提供了实现。

2.1 tryAcquireShared方法源码

tryAcquireShared方法作用主要是判断Semaphore的信号量还有没有,如果有的话,那就将当前信号量数值减一,表示当前线程用掉了一个信号量,让后返回剩下没有用的信号量数值,如果没有那就返回-1,那么当前线程就要到同步队列上面阻塞住了。分为两个实现,在公平策略下与非公平策略下。

非公平策略的tryAcquireShared方法实现源码

protected int tryAcquireShared(int acquires) {
   
     
            return nonfairTryAcquireShared(acquires);
        }

直接调用nonfairTryAcquireShared方法

 final int nonfairTryAcquireShared(int acquires) {
   
     
            for (;;) {
   
     
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

nonfairTryAcquireShared方法的大致流程如下:

1、 先开始一个死循环;
2、 获取剩余的可以的信号量数值,赋值给变量available;
3、 然后将available减一,代表当前线程要消耗掉一个信号量,然后赋值给remaining,然后看消耗掉了一个信号量,剩余的信号量值是多少;
4、 如果remaining值小于0,代表available减一之前,剩余的信号量为0,没有可用的信号量给当前线程消耗,那么直接返回-1;
5、 如果remaining值大于等于0,那么代表有可用的信号量给当前线程消耗然后通过CAS操作将state值更新为当前还剩余的信号量并且返回当前还剩余的信号量数值;

公平策略的tryAcquireShared方法实现源码

 protected int tryAcquireShared(int acquires) {
   
     
            for (;;) {
   
     
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

此方法大致流程如下:

1、 先开始一个死循环;
2、 然后通过hasQueuedPredecessors()方法来判断,是不是已经有其他线程在排队等着获取信号量了,如果是的话,为了体现先来先到的公平原则,当前线程不能去获取剩余的信号量了,直接返回-1老老实实的排队等着(hasQueuedPredecessors方法源码分析可以参考这里);
3、 如果没有其他的线程在排队等着获取信号量,那么就接着执行后面的逻辑后面的逻辑就是nonfairTryAcquireShared方法里面的逻辑了,参考上面流程就行;

3.释放信号量

释放一个信号量的流程大致如下:

调用Semaphore类的的release()方法—>调用AQS的releaseShared(1)方法—>调用Sync类的tryReleaseShared(1)方法—>如果tryReleaseShared返回值为true,代表成功释放了一个信号量,则调用AQS的doReleaseShared()方法,唤醒同步队列中head节点的后继节点线程

释放一个信号量的入口方法是semaphore.release()

//此release方法是Semaphore类里面的
public void release() {
   
     
        sync.releaseShared(1);
    }

再调用AQS的releaseShared方法

 public final boolean releaseShared(int arg) {
   
     
        if (tryReleaseShared(arg)) {
   
     
            doReleaseShared();
            return true;
        }
        return false;
    }

如果tryReleaseShared方法的返回值为true,代表成功释放了一个信号量,则就调用AQS的doReleaseShared方法将同步队列head节点的后继节点唤醒(doReleaseShared方法的源码可以参考AQS源码篇)
而tryReleaseShared方法在AQS中是没有实现的,由Semaphore中继承了AQS类的内部类Sync提供了实现。

3.1 tryReleaseShared方法源码

 protected final boolean tryReleaseShared(int releases) {
   
     
            for (;;) {
   
     
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

tryReleaseShared方法的大致流程:

1、 开始一个死循环;
2、 获取剩余信号量的个数,赋值给current;
3、 将剩余信号量的个数+1赋值给next变量;
4、 如果next变量的值还小于current表示超出最大许可的信号量,则直接返回false;
5、 否则,将信号量的值更新为next,然后返回true,代表释放信号量成功;
注意:Semaphore的tryReleaseShared方法与JUC中其他锁的tryReleaseShared方法最大的不同点是,调用Semaphore的tryReleaseShared方法来释放信号量,不需要当前线程之前调用acquire方法成功获取了一个信号量,当前线程才有资格来释放一个信号量。也就是说,我一个线程不需要获取一个信号量,我就直接来释放一个信号量,这个导致Semaphore的信号量最大值并不是new Semaphore(permits)时传递的permits初始值,它是可以变的,可以一直增加,直到超出了int类型的最大值,就变成了负数,然后if (next < current)条件生效,直接抛出throw new Error(“Maximum permit count exceeded”)异常了。