03、JUC源码分析:多线程之同步辅助CountDownLatch,CyclicBarrier,Semaphore

一、CountDownLatch

CountDownLatch的作用:就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执行完才可以

*

/**
 * CountDownLatch的作用:就是一个或者一组线程在开始执行操作之前,必须要等到其他线程执行完才可以
 * 计数器
 * 如:一扇门,所有人都出去之后才可以锁上
 */
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        int num = 5;
        // 总数是5,必须要执行任务的时候,再使用!
        CountDownLatch countDownLatch = new CountDownLatch(num);

        for (int i = 1; i <= num; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "出去了!");

                // 数量 -1
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }

        // 等待计数器归零,然后再向下执行
        countDownLatch.await();
        System.out.println("可以锁门了!");
    }
}

输出如下

2出去了!
5出去了!
3出去了!
1出去了!
4出去了!
可以锁门了!

main线程一直阻塞直到所有的线程执行结束

(一)CountDownLatch原理分析

CountDownLatch是AQS的共享模式的实现,其内部也有一个静态内部类Sync继承了AbstractQueuedSynchronizer

当我们通过构造函数创建CountDownLatch对象时,其实是指定了AQS的同步状态state的值,所以state在CountDownLatch代表的即使计数器的个数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
 //有参构造器,指定state的值
    Sync(int count) {
        setState(count);
    }
 //获取state状态值
    int getCount() {
        return getState();
    }
 //实现了AQS的共享模式的加锁方法
    protected int tryAcquireShared(int acquires) {
        //如果state为0,则返回大于0的数值,不等于0则返回小于0的数值
        return (getState() == 0) ? 1 : -1;
    }
 //通过死循环的方式释放锁
    protected boolean tryReleaseShared(int releases) {       
        for (;;) {
            //获取状态值
            int c = getState();
            //如果此时状态值已经为0了,说明计数器已经减到0了不能再减了
            if (c == 0)
                return false;
            //否则就将计数器减1,并通过CAS的方式赋值给state,因为此时还会有其他线程修改状态
            //这里与ReentrantLock的独占解锁方式不同,独占是直接setState,因为它不会有其他线程竞争
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                //如果计数器为0,则返回true,唤醒同步队列中的等待线程
                //不等于0则返回false
                return nextc == 0;
        }
    }
}

CountDownLatch是共享模式的加锁和解锁方式,await()表示获取操作,countDown()表示释放操作

state为0则表示可以加锁,不等于0的时候则线程会调用AQS提供的doAcquireSharedInterruptibly加入同步队列。每次解锁都只释放一个同步器状态,如果计数器为0的时候则会唤醒同步队列中的等待线程

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public void countDown() {
    sync.releaseShared(1);
}

(二)CountDownLatch方法

1、 CountDownLatch(intcount):构造函数,需要指定一个不小于0的int数值;

2、 await():当前线程调用该方法会进入阻塞状态,直到同步器状态为0时被其他线程唤醒或者被其他线程中断也即将计数器减为0返回true的线程负责唤醒阻塞的线程当计数器为0时,调用await()方法将立即返回;

3、 await(longtimeout,TimeUnitunit):该方法与await()作用一样,只是添加了等待的时间,如果超过等待时间还没有被唤醒或者被中断,那么阻塞线程将退出阻塞状态;

4、 countDown():该方法主要是将指定的计数器减1,当计数器已经是0了调用该方法将会被忽略,也就是说计数器的值最小只能是0;为了保证计数器一定会减1,一般要在finally语句块中执行countDown操作;

二、CyclicBarrier

CyclicBarrier是一个可循环的屏障,它允许多个线程在执行完相应的操作后彼此等待共同到达一个point,等所有线程都到达后再继续执行

/**
 * 打王者,开局前所有人都必须要加载到100%才可以进入。否则所有玩家都相互等待
 */
public class CyclicBarrierTest {
    public static void main(String[] args) {
        int num = 5;

        CyclicBarrier cyclicBarrier = new CyclicBarrier(num,() -> {
            System.out.println("五名队员都已加载100%,可以开始游戏!");
        });

        for (int i = 1; i <= num; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "加载中......");

                // 等待
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

(一)CyclicBarrier原理分析

CyclicBarrier内部维护了独占锁ReentrantLock,并且关联了一个Condition。

await()方法主要是判断count的数量来决定线程进入阻塞状态还是唤醒所有的阻塞线程。count是初始化时parties的值,parties的值一经赋值不会改变,count会随着线程到达障点而减到0

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
//阻塞方法
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        //获取独占锁
        final ReentrantLock lock = this.lock;
        //加锁,之后的代码都是属于同步代码
        lock.lock();
        try {
            final Generation g = generation;
            //broken默认false,已经broken的barrier不能再次使用了
            if (g.broken)
                throw new BrokenBarrierException();
   //如果线程被打断了,那么将唤醒所有的阻塞线程
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //count值减1
            int index = --count;
            //如果index值为0,则表示所有的线程都到达了障点
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //获取Runnable执行单元,如果不为空则执行逻辑
                    //此处就可以明白为什么Runnable逻辑优先执行了吧
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒阻塞的所有线程,重置count
                    //此处可以明白CyclicBarrier为什么可以循环利用了吧
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 如果index不为0,则表示还有线程没有达到障点
            //死循环一直等待唤醒
            for (;;) {
                try {
                    //如果没有设置超时时间,则调用Condition的await()方法
                    //await方法线程释放锁并加入等待队列
                    //是不是又到了AQS了
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        //如果设置了超时时间,则调用Condition的awaitNanos()方法
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {                        
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)
                    throw new BrokenBarrierException();
 
                if (g != generation)
                    return index;
 
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放锁
            lock.unlock();
        }
    }
 

在await()方法中,很多个分支调用了breakBarrier方法,此方法主要用于异常分支下的线程唤醒和count重置,但是broken被设置为true的CyclicBarrier已经不能再使用了,必须使用reset方法重置它

private void breakBarrier() {
    //设置broken为true
    generation.broken = true;
    //重置count
    count = parties;
    //唤醒所有阻塞的线程
    trip.signalAll();
}

//重置CyclicBarrier,break现有的generation,重新生成新的generation
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

nextGeneration用于唤醒所有阻塞的线程,并重置count和generation

private void nextGeneration() {
    // 唤醒所有阻塞的线程
    trip.signalAll();
    // 设置count
    count = parties;
    generation = new Generation();
}

(二)CyclicBarrier方法

1、 CyclicBarrier(intparties):构造器指定不能小于0的parties,该值不会发生改变;

2、 CyclicBarrier(intparties,RunnablebarrierAction):构造器指定parties和一个Runnable接口,当所有的线程到达障点之后Runnable接口会被调用;

3、 await():当前线程调用该方法之后会进入阻塞状态,直到所有的线程都调用await()方法到达障点才会被唤醒当CyclicBarrier内部的count为0时,调用await()方法不会进入阻塞状态;

4、 await(longtimeout,TimeUnitunit):该方法与await()方法作用一样,只是可以设置阻塞等待的时间,超时没有被唤醒将退出阻塞状态;

5、 isBroken():返回barrier的broken状态,某个线程执行await()方法进入阻塞状态,如果被中断了isBroken()方法将返回true也即是线程的中断将会导致CyclicBarrier被broken,被broken的CyclicBarrier此时不能再使用必须reset,如果此时线程调用了await()方法将抛出异常BrokenBarrierException;

6、 reset():中断当前barrier,并重新生成Generation;

CyclicBarrier和CountDownLatch区别

1、 CountDownLatch:一个线程(或者多个),等待另外N个线程完成某个事情之后才能执行;

CyclicBarrier : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待

关键点其实就在于那N个线程

(1)CountDownLatch里面N个线程就是学生,学生做完了试卷就可以走了,不用等待其他的学生是否完成

(2)CyclicBarrier 里面N个线程就是所有的游戏玩家,一个游戏玩家加载到100%还不可以,必须要等到其他的游戏玩家都加载到100%才可以开局

2、 CountDownLatch的await()线程会等待计数器减为0,而执行CyclicBarrier的await()方法会使线程进入阻塞等待其他线程到达障点;

3、 CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用;

4、 CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLock和Condition实现的;

5、 CountDownLatch不会让子线程进入阻塞,CyclicBarrier会使所有子线程进入阻塞;

三、Semaphore

Semaphore ,一个计数信号量,通常叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源

通常用于那些资源有明确访问数量限制的场景,常用于限流。多个共享资源互斥,并发限流,控制最大的线程数

如:

1、 数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接;

2、 停车场场景,车位数量有限,同时只能容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入;

*

public class SemaphoreTest {
    public static void main(String[] args) {
        // 线程数量:3个停车位! 限流!
        Semaphore semaphore = new Semaphore(3);

        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                // 从该信号量获取许可证,阻止直到可用
                try {
                    semaphore.acquire();

                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + "离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放许可证,将其返回到信号量
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

4抢到车位
3抢到车位
1抢到车位
3离开车位
4离开车位
1离开车位
2抢到车位
5抢到车位
6抢到车位
5离开车位
6离开车位
2离开车位

(一)Semaphore原理分析

1、 Semaphore初始化

Semaphore semaphore=new Semaphore(2);

(1)当调用new Semaphore(2) 方法时,默认会创建一个非公平的锁的同步阻塞队列。

(2)把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量

2、获取令牌

semaphore.acquire();

(1)当前线程会尝试去同步队列获取一个令牌,获取令牌的过程也就是使用原子的操作去修改同步队列的state ,获取一个令牌则修改为state=state-1

(2)当计算出来的state<0,则代表令牌数量不足,此时会创建一个Node节点加入阻塞队列,挂起当前线程

(3)当计算出来的state>=0,则代表获取令牌成功

/**
     *  获取1个令牌
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

/**
     * 共享模式下获取令牌,获取成功则返回,失败则加入阻塞队列,挂起线程
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取令牌,arg为获取令牌个数,当可用令牌数减当前令牌数结果小于0,则创建一个节点加入阻塞队列,挂起当前线程。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

/**
     * 1、创建节点,加入阻塞队列,
     * 2、重双向链表的head,tail节点关系,清空无效节点
     * 3、挂起当前节点线程
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建节点加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获得当前节点pre节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回锁的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重组双向链表,清空无效节点,挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3、 释放令牌

semaphore.release();

当调用semaphore.release() 方法时

(1)线程会尝试释放一个令牌,释放令牌的过程也就是把同步队列的state修改为state=state+1的过程

(2)释放令牌成功之后,同时会唤醒同步队列中的一个线程

(3)0被唤醒的节点会重新尝试去修改state=state-1 的操作,如果state>=0则获取令牌成功,否则重新进入阻塞队列,挂起线程

/**
     * 释放令牌
     */
    public void release() {
        sync.releaseShared(1);
    }
/**
     *释放共享锁,同时会唤醒同步队列中的一个线程。
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        //释放共享锁
        if (tryReleaseShared(arg)) {
            //唤醒所有共享节点线程
            doReleaseShared();
            return true;
        }
        return false;
    }

/**
     * 唤醒同步队列中的一个线程
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//是否需要唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改状态为初始0
                        continue;
                    unparkSuccessor(h);//唤醒h.nex节点线程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

**(二)**Semaphore方法

acquire()  
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
​
acquire(int permits)  
获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
    
acquireUninterruptibly() 
获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
    
tryAcquire()
尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
​
tryAcquire(long timeout, TimeUnit unit)
尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。
​
release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
​
hasQueuedThreads()
等待队列里是否还存在等待线程。
​
getQueueLength()
获取等待队列里阻塞的线程数。
​
drainPermits()
清空令牌把可用令牌数置为0,返回清空令牌的数量。
​
availablePermits()
返回可用的令牌数量