11、JUC源码分析:CyclicBarrier源码分析

JUC-AQS原理篇
JUC-AQS源码篇
JUC-AQS的Condition之await和signal源码解析

JUC-CountDownLatch基础篇
JUC-CountDownLatch源码分析
JUC-Semaphore基础篇
JUC-Semaphore源码分析
JUC-ReentrantReadWriteLock锁基础篇
JUC-ReentrantReadWriteLock锁源码分析
JUC-ReentrantLock锁基础篇
JUC-ReentrantLock锁源码分析
JUC-CyclicBarrier基础篇
JUC-CyclicBarrier源码分析

文章目录

  • 1.前言
  • 2.构造方法
    • 2.1 nextGenerationa重置屏障
  • 2.2 breakBarrier打破屏障
  • 2.3 reset屏障重置为初始状态。
  • 3.dowait方法

1.前言

CyclicBarrier可以建立一个循环屏障,这个屏障可以阻塞一个线程直到指定的所有线程都达到屏障。就大巴车等乘客,只有所有的乘客都到了,大巴车才可以出发。它可以实现让一组线程互相等待共同到达某个状态之后再全部同时执行,叫做“循环”是因为CyclicBarrier可以被重复使用。

具体用法举例可以参考JUC-CyclicBarrier基础篇 ReentrantLock用法可以参考JUC-ReentrantLock锁 AQS的Condition源码可以参考JUC-AQS的Condition之await和signal源码解析

2.构造方法

public class CyclicBarrier {
   
     
	//使用ReentrantLock做同步锁,
	//操作,修改全局变量count,generation时需要获取这把锁
 	private final ReentrantLock lock = new ReentrantLock();
 	//到达屏障但是不能放行的线程在trip条件变量上等待
 	//也就是说当前线程调用了await方法到达了屏障,但是还有其他的线程没有到达,那就让当前线程在条件队列上等待。
 	private final Condition trip = lock.newCondition();
 	//屏障点数
 	//正常通过屏障时所需要的线程数量,就是构造方法的入参,设置之后不会改变
 	private final int parties;
 	//count表示要正常通过一次屏障时还需要的线程数量,初始化值等于parties
 	//每当一个线程调用一次await方法时,count减一
 	//当count变成0时,代表所需要的所有线程都已经到达屏障了,本轮屏障可以正常通过了。
 	//然后,count值又被赋值为parties,代表开始了一个新的屏障。
 	//使用parties,count两个变量的原因,就是为了CyclicBarrier可以循环使用
 	private int count;
 	//当所需要的线程都到达屏障后执行的回调任务,可以不指定
 	private final Runnable barrierCommand;
 	//代表屏障,CyclicBarrier可以循环使用,每重置一次CyclicBarrier时就new 出一个新的generation 
 	private Generation generation = new Generation();
 	
 	//Generation内部类代表屏障
	private static class Generation {
   
     
		//表示当前屏障是否打破
        boolean broken = false;
	}
	//构造函数,创建一个新的CyclicBarrier
	//parties代表正常通过屏障时所需要的线程数量
	//barrierAction代表正常通过屏障时所要执行的任务
	public CyclicBarrier(int parties, Runnable barrierAction) {
   
     
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

//与上面的构造函数不同的是,当正常通过屏障时不需要再执行额外的任务
public CyclicBarrier(int parties) {
   
     
    //调用每一个构造器,barrierCommand传递null
    this(parties, null);
}
}

三个关键属性:parties,count和generation。parties是在创建CyclicBarrier的时候指定的值,后面不能更改,代表屏障点数。count的初始值是parties,每当一个线程到达屏障时调用await方法,count就会减一,当count变成0时,代表所需要的所有线程都到了屏障,此时屏障可以正常通过了*。此时本轮的屏障结束了,进入下一轮新的屏障,count值又被赋值为parties。就是这两个变量实现CyclicBarrier 的可复用性。generation变量在每一轮的屏障中都是会new 一个新的值出来。通过这个可以来判断当一个线程调用await方法阻塞了,当它被唤醒时可以通过它所保留的Generation类型的g引用与CyclicBarrier的全局generation引用是否相等,来判断屏障是否被重置了
CyclicBarrier屏障被重置了的情况有

  • 正常通过屏障所需要的所有线程都到达了,屏障正常结束,调用nextGeneration()方法重置屏障。
  • 调用了reset方法强制重置屏障。

2.1 nextGenerationa重置屏障

private void nextGeneration() {
   
     
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

在屏障所需要的所有线程都到达了,屏障正常完成了的时候调用的方法或者是reset方法强制重置屏障时调用的方法。因为此方法涉及到修改全景变量count 和generation所以调用此方法的调用者必须先获取到锁lock。
此方法所做的事情:

  • 唤醒所有在trip条件变量上等待的线程
  • count重置为parties
  • 重新初始化一个Generation对象,赋给generation,这就是新的一个屏障

2.2 breakBarrier打破屏障

private void breakBarrier() {
   
     
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

在出现异常情况或者reset方法强制重置屏障时调用的方法。因为此方法涉及到修改全景变量count 和generation所以调用此方法的调用者必须先获取到锁lock。用于打破当前屏障,表明这个屏障已经失效了。
此方法所做的事情:

  • 打破当前屏障,将generation.broken设置成true
  • count重置为parties
  • 唤醒所有在trip条件变量上等待的线程

2.3 reset屏障重置为初始状态。

public void reset() {
   
     
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
   
     
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
   
     
            lock.unlock();
        }
    }

此方法所做的事情:

1、 先将当前的屏障打破;
2、 重置屏障,开启一个新的屏障;

3.dowait方法

public int await() throws InterruptedException, BrokenBarrierException {
   
     
        try {
   
     
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
   
     
            throw new Error(toe); // cannot happen
        }
    }

 public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
   
     
        return dowait(true, unit.toNanos(timeout));
    }

dowait方法是由await方法调用的,一个是支持超时等待的await方法一个是不支持超时等待的await方法。

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
   
     
        final ReentrantLock lock = this.lock;
        //首先获取锁
        //因为下面的代码中会调用一些必须要获取到锁才能调用的方法
        //比如breakBarrier,nextGeneration方法以及
        //会调用CAS的Condition对象的await方法
        //我们知道调用Condition对象的await方法的前提是必须先获取到lock锁
        lock.lock();
        try {
   
     
            //获取当前的屏障,保存在当前的线程中
            final Generation g = generation;

            //先判断一下当前屏障有没有被打破,如果打破了就没有往下执行的必要了
            //如果当前的屏障已经被其他的线程打破了,那么直接抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            //判断当前线程是否被中断了,如果中断了,就打破当前屏障并唤醒其他线程,抛出遇到的异常
            //我们知道调用lock.lock()时可能存在当前线程被中断,而lock方法又不会抛出中断异常,只是会将线程的中断标识设置为true,所以这里需要判断一下。
            if (Thread.interrupted()) {
   
     
                breakBarrier();
                throw new InterruptedException();
            }

             //count自减1,表示正常通过屏障时所需要的线程又来了一个
            int index = --count;
            if (index == 0) {
   
       // tripped
                //index==0表示当前线程是最后一个达到屏障的线程,所需要的所有线程都到达了屏障点
                //当前屏障可以通过了
                //ranAction表示执行,当屏障通过时所需要执行的任务是否执行成功
                boolean ranAction = false;
                try {
   
     
                   //执行当屏障通过时所需要执行的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //当前屏障正常通过时,唤醒其他的所有其他等待的线程
                    //并且重置屏障开始新的一轮
                    nextGeneration();
                    return 0;
                } finally {
   
     
                    if (!ranAction)
                        //如果执行当屏障通过时所需要执行的任务失败
                        //就打破当前屏障,并唤醒其他线程,随后抛出遇到的异常
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //表示index不为0,那么表示当前线程不是最后一个达到屏障的线程,可能需要等待,开启一个死循环
            for (;;) {
   
     
                try {
   
     
                    if (!timed)
                         //不需要超时等待的,但是这一步会释放锁
                        trip.await();
                    else if (nanos > 0L)
                       //需要超时等待的,这一步会释放锁
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
   
     
                    //在等待过程中抛出了中断异常
                    if (g == generation && ! g.broken) {
   
     
                        //如果屏障还是当前屏障并且没有被打破的话
                        //就打破屏障并且抛出异常
                        breakBarrier();
                        throw ie;
                    } else {
   
     
                      //情况一:最后正常通过屏障的最后一个线程已经到达了
                      //调用了nextGeneration函数,导致g != generation
                      //但是还没来的及唤醒此线程,此线程就被中断了,
                      //这种情况算是屏障正常通过了,所以只是保留当前线程的中断状态。
                      //情况二:有其他线程强制调用了reset()函数导致调用了nextGeneration函数,导致g != generation,同时reset()函数也打破了屏障
                      //那么保留一下当前线程的中断状态,由下面的代码抛出BrokenBarrierException异常
                      //情况三:当前屏障被其他线程因为中断的原因所打破了,那么也是保留一下当前线程的中断状态由下面的代码抛出BrokenBarrierException异常
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                //这一步表示当前屏障被别的线程因为某种原因打破了,导致唤醒了当前线程,那么就抛出BrokenBarrierException异常
                //原因一:其他的线程被中断了或者超时了,导致打破了屏障。
                //原因二:最后一个到达的线程执行command任务出现异常,导致打破了屏障。
                //原因三:其他线程执行了reset函数打破了屏障
                    throw new BrokenBarrierException();

                if (g != generation)
                //走到这一步,说明屏障没有被打破同时当前屏障不在是之前的那个屏障了,那只能是屏障正常通过调用nextGeneration了。
                //这是正常的结束状态,那么就返回index
                    return index;

                if (timed && nanos <= 0L) {
   
     
                //走到这一步只能是await等待超时了
                //那么就打破屏障,抛出TimeoutException
                    breakBarrier();
                    throw new TimeoutException();
                }
                //走到这一步进行下一次for循环,说明屏障既没有被打破
                //也没有被替换也没用超时,那就只能是“虚假唤醒了”
            }
        } finally {
   
     
            //释放锁
            lock.unlock();
        }
    }

dowait方法大致流程:
1、 首先获取锁lock.lock()因为下面的代码中会调用一些必须要获取到锁才能调用的方法比如breakBarrier,nextGeneration方法以及会调用CAS的Condition对象的await方法我们知道调用Condition对象的await方法的前提是必须先获取到lock锁;
2、 获取当前的屏障,保存在当前的线程中;
3、 先判断一下当前屏障有没有被打破,如果打破了就没有往下执行的必要了如果当前的屏障已经被其他的线程打破了,那么直接抛出BrokenBarrierException异常;
4、 判断当前线程是否被中断了,如果中断了,就打破当前屏障并唤醒其他线程,抛出InterruptedException异常我们知道调用lock.lock()时可能存在当前线程被中断,而lock方法又不会抛出中断异常,只是会将线程的中断标识设置为true,所以这里需要判断一下;
5、 index=count减一,表示正常通过屏障时所需要的线程又来了一个;
6、 如果index=0;表示当前线程是最后一个达到屏障的线程,所需要的所有线程都到达了屏障点;
6、 1.如果存在Runnable任务,就执行它如果正常执行就调用nextGeneration重置屏障,并唤醒其他线程,返回0,方法正常结束如果执行Runnable任务出现异常就打破当前屏障;
6、 2.如果不存在Runnable任务就调用nextGeneration重置屏障,并唤醒其他线程,返回0,方法正常结束;
7、 如果index!=0;那么表示当前线程不是最后一个达到屏障的线程,可能需要等待,开启一个死循环;
8、 如果是非超时等待调用trip.await()执行等待;如果是超时等待就调用trip.awaitNanos(nanos)执行等待;;
9、 如果执行等待的过程中当前线程被中断唤醒了;
9、 1.如果是屏障还是当前屏障并且没有被打破的话就打破屏障并且抛出异常;
9、 2.否则保留一下当前线程的中断状态出现这种结果的原因有:;
情况一:最后正常通过屏障的最后一个线程已经到达了调用了nextGeneration函数,导致g != generation但是还没来的及唤醒此线程,此线程就被中断了,这种情况算是屏障正常通过了,所以只是保留当前线程的中断状态。
情况二:有其他线程强制调用了reset()函数导致调用了nextGeneration函数,导致g != generation,同时reset()函数也打破了屏障那么保留一下当前线程的中断状态,由下面的代码抛出BrokenBarrierException异常
情况三:当前屏障被其他线程因为中断的原因所打破了,那么也是保留一下当前线程的中断状态由下面的代码抛出BrokenBarrierException异常。
10、 当等待线程被唤醒了;
10、 1如果if(g.broken)成立,表示当前屏障被别的线程因为某种原因打破了,导致唤醒了当前线程,那么就抛出BrokenBarrierException异常这种原因有:;
原因一:其他的线程被中断了或者超时了,导致打破了屏障。
原因二:最后一个到达的线程执行command任务出现异常,导致打破了屏障。
原因三:其他线程执行了reset函数打破了屏障
10、 2如果if(g!=generation)成立,走到这一步,说明屏障没有被打破同时当前屏障不在是之前的那个屏障了,那只能是屏障正常通过调用nextGeneration了这是正常的结束状态,那么就返回index;
10、 3如果if(timed&&nanos<=0L)成立,走到这一步只能是await等待超时了那么就打破屏障,抛出TimeoutException;
10、 4走到这一步进行下一次for循环,说明屏障既没有被打破也没有被替换也没用超时,那就只能是“虚假唤醒了”;
11最终需要在finally中释放lock锁。
说一下执行trip.await()或者trip.awaitNanos(nanos)等待时,线程被唤醒的情况:
1、 是当前线程被中断唤醒导致的;
2、 是当前线程等待超时导致的;
3、 是其他线程调用了signalAll()方法导致的;
调用了signalAll()方法的情况有:
3、 1.最后一个线程到达了,屏障正常通过了,调用了signalAll()方法来唤醒所有线程;
3、 2.其他线程因为自身的原因打破屏障了,调用了trip.signalAll()方法来唤醒所有线程;
其他线程因为自身原因需要打破屏障的情况有:
3、 2.1:其他线程因为自己被中断了,需要打破屏障;
3、 2.2:执行Runnable任务出现异常,需要打破屏障;
3、 2.3:等待超时,需要打破屏障;
3、 3.是其他线程强制调用reset方法,调用了trip.signalAll()方法来唤醒所有线程;
4、 还有一种唤醒是**“虚假唤醒”**导致,具体这是一种什么样的场景才会出现小编也没想出来,欢迎大家留言告诉小编;
注意:当执行dowait方法时既有线程中断异常,又有屏障被打破异常,那么优先抛出屏障被打破异常。这一点可以从下面这段代码体现出来

 try {
   
     
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
   
     
                    if (g == generation && ! g.broken) {
   
     
                        breakBarrier();
                        throw ie;
                    } else {
   
     
                        Thread.currentThread().interrupt();
                    }
                }