AQS的conditionObject实现类似object的wait/notify/notify的功能,功能大概是:
1、 object维护一个监视器和一个等待队列,condition对于一个lock可以有多个condition,对于每个condition维护一个条件队列;
2、 提供wait/signal/signalall功能;
来个入门demo:
public class ConditionTest {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println(Thread.currentThread()+ "等待条件完成");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread()+ "终于等到条件完成了,gogogo");
lock.unlock();
}
}
}).start();
Thread b = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
condition.signalAll();
System.out.println(Thread.currentThread()+ "条件完成了,释放吧");
} finally {
lock.unlock();
}
}
});
b.start();
}
}
ConditionObject实现 Condition接口,Condition提供的方法定义:
有没有很熟悉的感觉。
ConditionObject每次new都会维护一个条件队列,通过node的nextWaiter串起来
/** 条件队列的第一个节点 */
private transient Node firstWaiter;
/** 条件队列的最后一个节点 */
private transient Node lastWaiter;
/**
* 空的构造,看下ReentrantLock.newCondition()每次都会new ConditionObject()可以维护多个条件队列
*/
public ConditionObject() { }
看下响应中断的await()流程
/**
响应中断的await
能调用await的方法的线程肯定获得锁
*/
public final void await() throws InterruptedException {
//线程中断直接异常
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程封装加入condition的条件队列
Node node = addConditionWaiter();
//释放AQS同步等待队列中的节点
int savedState = fullyRelease(node);
int interruptMode = 0;
//看节点是否还在AQS的同步等待队列,因为signal/signalall调用的话会把节点加入到AQS的等待队列,如果没在那就说明需要park
while (!isOnSyncQueue(node)) {
//不在的话那就应该在条件队列了,那么park吧
LockSupport.park(this);
//被signal/signalall唤醒后,检查中断状态,如果被中断,break,没有的话while
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//这里说明已经加入到AQS的队列,重新acquire,注意的是acquireQueued返回值为是否中断,返回true肯定是中断,返回false
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)//中断时,直接throw还是设置中断状态
reportInterruptAfterWait(interruptMode);
}
/**
先判断lastWaiter的状态,如果不是condition就过一遍条件队列,将所有状态不为condition的都去掉
然后将节点加入到lastWaiter(类似AQS中的tail)的nextWaiter,如果last为null,就将first和last的nextWaiter都指向新节点
最后将lastWaiter指向新加入节点
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
从firstWaiter开始,过滤掉所有状态不为condition的节点
基本上按trail-t-next逐个节点向后移动,t从firstWaiter开始
当时看的时候,拿纸画了一遍才清楚
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
/**
这里是释放掉AQS同步等待队列中的节点
返回释放前的state值
有异常的话就将节点的状态改为cancelled
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
/**
看节点是否还在AQS的队列中
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
之前分析过AQS加入节点的顺序enq(),pre-tail-next,pre加入了,但是并不能说明这个节点就真正在AQS的等待队列,
所以需要从tail往前过滤一遍看是否存在
*/
return findNodeFromTail(node);
}
/**
从tail往前判断节点是否在队列中,找到返回true
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
2个状态,表示await被唤醒后,如果检查线程是中断的,就需要判断是在什么时候被中断,然后判断怎么返回这个中断,是直接异常还是设置中断状态
*/
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
/**
检查中断状态,0:未中断,
THROW_IE:
REINTERRUPT:
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
检查中断是在什么时候发生的,是在signal前还是signal后
*/
final boolean transferAfterCancelledWait(Node node) {
//如果调用signal的话,先把节点的状态设置成0,再把节点从条件队列转移(enq)到AQS的等待队列
//所以下面这个cas成功,那么这个中断肯定是发生在signal前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//把节点放入AQS队列,保证后面acquireQueued执行
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
/**
到这里的话,肯定是已经发生了signal,但是signal的enq没有完成,所以自旋,让signal的enq完成,返回false
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
这是根据之前的标识判断怎么处理中断
signal前就抛出,signal后就设置中断状态
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
整个await的流程为:
1、 判断线程中断,中断直接抛出异常;
2、 将节点加入condition条件队列;
3、 释放AQS队列中的锁;
4、 while判断是否在AQS等待队列;
5、 如果不在AQS队列中,就park;
6、 唤醒后检查是被signal唤醒还是中断唤醒;
7、 中断唤醒要判断signal前还是signal后,设置怎么处理中断,signal前的话还需要将节点enq到AQS的等待队列,转到4;
8、 如果在就acquireQueued,重新获取,这里判断acquire返回,为true则为中断,然后设置中断处理方式;
9、 如果节点的nextWaiter不为null,就清理下condition的条件队列,清除所有状态不为condition的节点;
10、 最后看是否需要处理中断,如有,signal前的中断直接抛出,signal后设置中断状态;
awaitNanos/awaitUntil/await(long time, TimeUnit unit)基本流程跟响应中断的await差不多,只不过多了超时时间处理,跟前面讲过的响应超时没什么区别,都是底层unsafe的那些。
看下signal/signalAll:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**子类实现判断是否是自己拥有*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
这里从first开始释放一个condition状态的节点
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//设置节点状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//将节点加入AQS的等待队列,返回的是加入节点的pre
Node p = enq(node);
int ws = p.waitStatus;
//设置节点状态为SIGNAL,如果失败直接unpark新加入的节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal只释放first的开始第一个状态为condition的节点,然后将节点加入到AQS的同步等待队列,设置新加入节点的pre的状态为SIGNAL。看下signalAll的释放:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
看到signalAll最终是处理所有condition节点。
其实不管是await还是signal/signalAll都是模拟object.wait跟notify/notifyAll,可以对比来看。
AQS大概就这么多了,还有个AbstractQueuedLongSynchronizer这个类,跟AQS差不多,只是state状态采用的是long类型:
private volatile long state;
AQS采用的是:
private volatile int state;
注意:
await会有虚假唤醒的情况,即使没有signal,await的线程也可能被唤醒。参考:多线程编程中条件变量和虚假唤醒(spurious wakeup)的讨论 http://siwind.iteye.com/blog/1469216,最后的建议就是使用 while判断条件而不是使用if判断:
while(条件不满足){
condition_wait(cond, mutex);
}
而不是:
If(条件不满足 ){
Condition_wait(cond,mutex);
}
说实话最后我也没看懂什么原因导致虚假唤醒,后来去stackoverflow查询了下,这是解释,自己研究吧
http://stackoverflow.com/questions/1050592/do-spurious-wakeups-actually-happen,还有这篇http://blog.sina.com.cn/s/blog_e59371cc0102v29b.html
参考:
http://blog.csdn.net/yuenkin/article/details/50867530
http://brokendreams.iteye.com/blog/2250372