08、JUC源码分析:locksAQS condition

AQS的conditionObject实现类似object的wait/notify/notify的功能,功能大概是:

1、 object维护一个监视器和一个等待队列,condition对于一个lock可以有多个condition,对于每个condition维护一个条件队列;

2、 提供wait/signal/signalall功能;

来个入门demo:

public class ConditionTest {

    private static ReentrantLock lock = new ReentrantLock();

    private static Condition condition = lock.newCondition();
    
    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread()+ "等待条件完成");
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread()+ "终于等到条件完成了,gogogo");
                    lock.unlock();
                }
            }
        }).start();

        Thread b = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    lock.lock();
                    condition.signalAll();
                    System.out.println(Thread.currentThread()+ "条件完成了,释放吧");
                } finally {
                    lock.unlock();
                }
            }
        });
        b.start();
    }
}

ConditionObject实现 Condition接口,Condition提供的方法定义:

*有没有很熟悉的感觉。

ConditionObject每次new都会维护一个条件队列,通过node的nextWaiter串起来

/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;

/**
 * 空的构造,看下ReentrantLock.newCondition()每次都会new ConditionObject()可以维护多个条件队列
 */
public ConditionObject() { }

看下响应中断的await()流程

/**
响应中断的await
能调用await的方法的线程肯定获得锁
*/
public final void await() throws InterruptedException {
		//线程中断直接异常
    if (Thread.interrupted())
        throw new InterruptedException();
		//将当前线程封装加入condition的条件队列        
    Node node = addConditionWaiter();
    //释放AQS同步等待队列中的节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //看节点是否还在AQS的同步等待队列,因为signal/signalall调用的话会把节点加入到AQS的等待队列,如果没在那就说明需要park
    while (!isOnSyncQueue(node)) {
    		//不在的话那就应该在条件队列了,那么park吧
        LockSupport.park(this);
        //被signal/signalall唤醒后,检查中断状态,如果被中断,break,没有的话while
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //这里说明已经加入到AQS的队列,重新acquire,注意的是acquireQueued返回值为是否中断,返回true肯定是中断,返回false
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)//中断时,直接throw还是设置中断状态
        reportInterruptAfterWait(interruptMode);
}
/**
先判断lastWaiter的状态,如果不是condition就过一遍条件队列,将所有状态不为condition的都去掉
然后将节点加入到lastWaiter(类似AQS中的tail)的nextWaiter,如果last为null,就将first和last的nextWaiter都指向新节点
最后将lastWaiter指向新加入节点
*/
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
/**
从firstWaiter开始,过滤掉所有状态不为condition的节点
基本上按trail-t-next逐个节点向后移动,t从firstWaiter开始
当时看的时候,拿纸画了一遍才清楚
*/
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}
/**
这里是释放掉AQS同步等待队列中的节点
返回释放前的state值
有异常的话就将节点的状态改为cancelled
*/
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
/**
看节点是否还在AQS的队列中
*/
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     之前分析过AQS加入节点的顺序enq(),pre-tail-next,pre加入了,但是并不能说明这个节点就真正在AQS的等待队列,
     所以需要从tail往前过滤一遍看是否存在
     */
    return findNodeFromTail(node);
}
/**
从tail往前判断节点是否在队列中,找到返回true
*/
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

/** 
2个状态,表示await被唤醒后,如果检查线程是中断的,就需要判断是在什么时候被中断,然后判断怎么返回这个中断,是直接异常还是设置中断状态
 */
private static final int REINTERRUPT =  1;
private static final int THROW_IE    = -1;

/**
	检查中断状态,0:未中断,
	THROW_IE:
	REINTERRUPT:
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
/**
检查中断是在什么时候发生的,是在signal前还是signal后
*/
final boolean transferAfterCancelledWait(Node node) {
	//如果调用signal的话,先把节点的状态设置成0,再把节点从条件队列转移(enq)到AQS的等待队列
	//所以下面这个cas成功,那么这个中断肯定是发生在signal前
  if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  		//把节点放入AQS队列,保证后面acquireQueued执行
      enq(node);
      return true;
  }
  /*
   * If we lost out to a signal(), then we can't proceed
   * until it finishes its enq().  Cancelling during an
   * incomplete transfer is both rare and transient, so just
   * spin.
   */
	/**
	到这里的话,肯定是已经发生了signal,但是signal的enq没有完成,所以自旋,让signal的enq完成,返回false
	*/   
  while (!isOnSyncQueue(node))
      Thread.yield();
  return false;
}
/**
这是根据之前的标识判断怎么处理中断
signal前就抛出,signal后就设置中断状态
*/
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

整个await的流程为:

1、 判断线程中断,中断直接抛出异常;
2、 将节点加入condition条件队列;
3、 释放AQS队列中的锁;
4、 while判断是否在AQS等待队列;
5、 如果不在AQS队列中,就park;
6、 唤醒后检查是被signal唤醒还是中断唤醒;
7、 中断唤醒要判断signal前还是signal后,设置怎么处理中断,signal前的话还需要将节点enq到AQS的等待队列,转到4;
8、 如果在就acquireQueued,重新获取,这里判断acquire返回,为true则为中断,然后设置中断处理方式;
9、 如果节点的nextWaiter不为null,就清理下condition的条件队列,清除所有状态不为condition的节点;
10、 最后看是否需要处理中断,如有,signal前的中断直接抛出,signal后设置中断状态;

awaitNanos/awaitUntil/await(long time, TimeUnit unit)基本流程跟响应中断的await差不多,只不过多了超时时间处理,跟前面讲过的响应超时没什么区别,都是底层unsafe的那些。

看下signal/signalAll:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
/**子类实现判断是否是自己拥有*/
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
/**
这里从first开始释放一个condition状态的节点
*/
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //设置节点状态为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //将节点加入AQS的等待队列,返回的是加入节点的pre
    Node p = enq(node);
    int ws = p.waitStatus;
    //设置节点状态为SIGNAL,如果失败直接unpark新加入的节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal只释放first的开始第一个状态为condition的节点,然后将节点加入到AQS的同步等待队列,设置新加入节点的pre的状态为SIGNAL。看下signalAll的释放:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

看到signalAll最终是处理所有condition节点。

其实不管是await还是signal/signalAll都是模拟object.wait跟notify/notifyAll,可以对比来看。

AQS大概就这么多了,还有个AbstractQueuedLongSynchronizer这个类,跟AQS差不多,只是state状态采用的是long类型:

    private volatile long state;

AQS采用的是:

    private volatile int state;

注意:

await会有虚假唤醒的情况,即使没有signal,await的线程也可能被唤醒。参考:多线程编程中条件变量和虚假唤醒(spurious wakeup)的讨论 http://siwind.iteye.com/blog/1469216,最后的建议就是使用 while判断条件而不是使用if判断:

while(条件不满足){
condition_wait(cond, mutex);
}
而不是:
If(条件不满足 ){
Condition_wait(cond,mutex);
}

说实话最后我也没看懂什么原因导致虚假唤醒,后来去stackoverflow查询了下,这是解释,自己研究吧

http://stackoverflow.com/questions/1050592/do-spurious-wakeups-actually-happen,还有这篇http://blog.sina.com.cn/s/blog_e59371cc0102v29b.html

参考:

http://blog.csdn.net/yuenkin/article/details/50867530

http://brokendreams.iteye.com/blog/2250372

怎么理解Condition