上一节我们了解了Lock接口的一些简单的说明,知道Lock锁的常用形式,那么这节我们正式开始进入JUC锁(java.util.concurrent包下的锁,简称JUC锁)。下面我们来看一下Lock最常用的实现类ReentrantLock。
1.ReentrantLock简介
由单词意思我们可以知道这是可重入的意思。那么可重入对于锁而言到底意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个 synchronized 块时,才释放锁。
1.1公平锁与非公平锁
我们查看ReentrantLock的源码可以看到无参构造函数是这样的:
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync()方法为一个非公平锁的实现方法,另外Reentrantlock还有一个有参的构造方法:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
它允许您选择想要一个 公平(fair)锁,还是一个 不公平(unfair)锁。公平锁使线程按照请求锁的顺序依次获得锁;而不公平锁则允许直接获取锁,在这种情况下,线程有时可以比先请求锁的其他线程先得到锁。
为什么我们不让所有的锁都公平呢?毕竟,公平是好事,不公平是不好的,不是吗?(当孩子们想要一个决定时,总会叫嚷“这不公平”。我们认为公平非常重要,孩子们也知道。)在现实中,公平保证了锁是非常健壮的锁,有很大的性能成本。要确保公平所需要的记帐(bookkeeping)和同步,就意味着被争夺的公平锁要比不公平锁的吞吐率更低。作为默认设置,应当把公平设置为 false ,除非公平对您的算法至关重要,需要严格按照线程排队的顺序对其进行服务。
下面我们先来看一个例子:
public class TestReentrantLock implements Runnable{
ReentrantLock lock = new ReentrantLock();
public void get() {
lock.lock();
System.out.println(Thread.currentThread().getId());
set();
lock.unlock();
}
public void set() {
lock.lock();
System.out.println(Thread.currentThread().getId());
lock.unlock();
}
@Override
public void run() {
get();
}
public static void main(String[] args) {
TestReentrantLock ss = new TestReentrantLock();
new Thread(ss).start();
new Thread(ss).start();
new Thread(ss).start();
}
}
运行结果:
10
10
12
12
11
11
Process finished with exit code 0
由结果我们可以看出同一个线程进入了同一个ReentrantLock锁两次。
2.condition条件变量
我们知道根类 Object 包含某些特殊的方法,用来在线程的 wait() 、 notify() 和 notifyAll() 之间进行通信。那么为了在对象上 wait 或 notify ,您必须持有该对象的锁。就像 Lock 是同步的概括一样, Lock 框架包含了对 wait 和 notify 的概括,这个概括叫作 条件(Condition)。 Condition 的方法与 wait 、 notify 和 notifyAll 方法类似,分别命名为 await 、 signal 和signalAll ,因为它们不能覆盖 Object 上的对应方法。
首先我们来计算一道题:
我们要打印1到9这9个数字,由A线程先打印1,2,3,然后由B线程打印4,5,6,然后再由A线程打印7,8,9. 这道题有很多种解法,我们先用Object的wait,notify方法来实现:
public class WaitNotifyDemo {
private volatile int val = 1;
private synchronized void printAndIncrease() {
System.out.println(Thread.currentThread().getName() +"prints " + val);
val++;
}
// print 1,2,3 7,8,9
public class PrinterA implements Runnable {
@Override
public void run() {
while (val <= 3) {
printAndIncrease();
}
// print 1,2,3 then notify printerB
synchronized (WaitNotifyDemo.this) {
System.out.println("PrinterA printed 1,2,3; notify PrinterB");
WaitNotifyDemo.this.notify();
}
try {
while (val <= 6) {
synchronized (WaitNotifyDemo.this) {
System.out.println("wait in printerA");
WaitNotifyDemo.this.wait();
}
}
System.out.println("wait end printerA");
} catch (InterruptedException e) {
e.printStackTrace();
}
while (val <= 9) {
printAndIncrease();
}
System.out.println("PrinterA exits");
}
}
// print 4,5,6 after printA print 1,2,3
public class PrinterB implements Runnable {
@Override
public void run() {
while (val < 3) {
synchronized (WaitNotifyDemo.this) {
try {
System.out
.println("printerB wait for printerA printed 1,2,3");
WaitNotifyDemo.this.wait();
System.out
.println("printerB waited for printerA printed 1,2,3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
while (val <= 6) {
printAndIncrease();
}
System.out.println("notify in printerB");
synchronized (WaitNotifyDemo.this) {
WaitNotifyDemo.this.notify();
}
System.out.println("notify end printerB");
System.out.println("PrinterB exits.");
}
}
public static void main(String[] args) {
WaitNotifyDemo demo = new WaitNotifyDemo();
demo.doPrint();
}
private void doPrint() {
PrinterA pa = new PrinterA();
PrinterB pb = new PrinterB();
Thread a = new Thread(pa);
a.setName("printerA");
Thread b = new Thread(pb);
b.setName("printerB");
// 必须让b线程先执行,否则b线程有可能得不到锁,执行不了wait,而a线程一直持有锁,会先notify了
b.start();
a.start();
}
}
运行结果为:
printerB wait for printerA printed 1,2,3
printerA prints 1
printerA prints 2
printerA prints 3
PrinterA printed 1,2,3; notify PrinterB
wait in printerA
printerB waited for printerA printed 1,2,3
printerB prints 4
printerB prints 5
printerB prints 6
notify in printerB
notify end printerB
wait end printerA
printerA prints 7
printerA prints 8
printerA prints 9
PrinterA exits
PrinterB exits.
Process finished with exit code 0
我们来分析一下上面的程序:
1、 首先在main方法中我们看到是先启动了B线程,因为B线程持有wait()对象,而A线程则持有notify(),如果先启动A有可能会造成死锁的状态;
B线程启动以后进入run()方法:
while (val < 3) {
synchronized (WaitNotifyDemo.this) {
try {
System.out.println("printerB wait for printerA printed 1,2,3");
WaitNotifyDemo.this.wait();
System.out.println("printerB waited for printerA printed 1,2,3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
while (val <= 6) {
printAndIncrease();
}
这里有一个while循环,如果val的值小于3,那么在WaitNotifyDemo的实例的同步块中调用WaitNotifyDemo.this.wait()方法,这里要注意无论是wait,还是notify,notifyAll方法都需要在其实例对象的同步块中执行,这样当前线程才能获得同步实例的同步控制权,如果不在同步块中执行wait或者notify方法会出java.lang.IllegalMonitorStateException异常。另外还要注意在wait方法两边的同步块会在wait执行完毕之后释放对象锁。
这样PrinterB就进入了等待状态,我们再看下PrinterA的run方法:
while (val <= 3) {
printAndIncrease();
}
// print 1,2,3 then notify printerB
synchronized (WaitNotifyDemo.this) {
System.out.println("PrinterA printed 1,2,3; notify PrinterB");
WaitNotifyDemo.this.notify();
}
try {
while (val <= 6) {
synchronized (WaitNotifyDemo.this) {
System.out.println("wait in printerA");
WaitNotifyDemo.this.wait();
}
}
System.out.println("wait end printerA");
} catch (InterruptedException e) {
e.printStackTrace();
}
这里首先打印了1、2、3,然后在同步块中调用了WaitNotifyDemo实例的notify方法,这样PrinterB就得到了继续执行的通知,然后PrinterA进入等待状态,等待PrinterB通知。
我们再看下PrinterB run方法剩下的代码:
while (val <= 6) {
printAndIncrease();
}
System.out.println("notify in printerB");
synchronized (WaitNotifyDemo.this) {
WaitNotifyDemo.this.notify();
}
System.out.println("notify end printerB");
System.out.println("PrinterB exits.");
PrinterB首先打印了4、5、6,然后在同步块中调用了notify方法,通知PrinterA开始执行。
PrinterA得到通知后,停止等待,打印剩下的7、8、9三个数字,如下是PrinterA run方法中剩下的代码:
while (val <= 9) {
printAndIncrease();
}
整个程序就分析完了,下面我们再来使用Condition来做这道题:
public class TestCondition {
static class NumberWrapper {
public int value = 1;
}
public static void main(String[] args) {
//初始化可重入锁
final Lock lock = new ReentrantLock();
//第一个条件当屏幕上输出到3
final Condition reachThreeCondition = lock.newCondition();
//第二个条件当屏幕上输出到6
final Condition reachSixCondition = lock.newCondition();
//NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
//注意这里不要用Integer, Integer 是不可变对象
final NumberWrapper num = new NumberWrapper();
//初始化A线程
Thread threadA = new Thread(new Runnable() {
@Override
public void run() {
//需要先获得锁
lock.lock();
try {
System.out.println("threadA start write");
//A线程先输出前3个数
while (num.value <= 3) {
System.out.println(num.value);
num.value++;
}
//输出到3时要signal,告诉B线程可以开始了
reachThreeCondition.signal();
} finally {
lock.unlock();
}
lock.lock();
try {
//等待输出6的条件
reachSixCondition.await();
System.out.println("threadA start write");
//输出剩余数字
while (num.value <= 9) {
System.out.println(num.value);
num.value++;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
Thread threadB = new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
while (num.value <= 3) {
//等待3输出完毕的信号
reachThreeCondition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
lock.lock();
//已经收到信号,开始输出4,5,6
System.out.println("threadB start write");
while (num.value <= 6) {
System.out.println(num.value);
num.value++;
}
//4,5,6输出完毕,告诉A线程6输出完了
reachSixCondition.signal();
} finally {
lock.unlock();
}
}
});
//启动两个线程
threadB.start();
threadA.start();
}
}
基本思路就是首先要A线程先写1,2,3,这时候B线程应该等待reachThredCondition信号,而当A线程写完3之后就通过signal告诉B线程“我写到3了,该你了”,这时候A线程要等嗲reachSixCondition信号,同时B线程得到通知,开始写4,5,6,写完4,5,6之后B线程通知A线程reachSixCondition条件成立了,这时候A线程就开始写剩下的7,8,9了。
我们可以看到上例中我们创建了两个Condition,在不同的情况下可以使用不同的Condition,与wait和notify相比提供了更细致的控制。
3.线程阻塞原语–LockSupport
我们一再提线程、锁等概念,但锁是如果实现的呢?又是如何知道当前阻塞线程的又是哪个对象呢?LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。
java锁和同步器框架的核心 AQS: AbstractQueuedSynchronizer,就是通过调用 LockSupport .park()和 LockSupport .unpark()实现线程的阻塞和唤醒 的。 LockSupport 很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继 续 执行;如果许可已经被占用,当前线 程阻塞,等待获取许可。
LockSupport是针对特定线程来进行阻塞和解除阻塞操作的;而Object的wait()/notify()/notifyAll()是用来操作特定对象的等待集合的。
LockSupport的两个主要方法是park()和Unpark(),我们来看一下他们的实现:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
public static void park() {
unsafe.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
unsafe.unpark(thread);
}
由源码我们可见在park方法内部首先获得当前线程然后阻塞当前线程,unpark方法传入一个可配置的线程来为该线程解锁。以“线程”作为方法的参数, 语义更清晰,使用起来也更方便。而wait/notify的实现使得“线程”的阻塞/唤醒对线程本身来说是被动的,要准确的控制哪个线程、什么时候阻塞/唤醒很困难, 要不随机唤醒一个线程(notify)要不唤醒所有的(notifyAll)。
下面我们来看一个例子:
public class TestLockSupport {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
public void run() {
synchronized (u) {
System.out.println("in" + getName());
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(2000);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
当我们把”LockSupport.unpark(t1);”这一句注掉的话我们会发现程序陷入死锁。而且我们看到再main方法中unpark是在t1和t2启动之后才执行,但是为什么t1启动之后,t2也启动了呢?注意,**unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。**unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。
除了有定时阻塞的功能外,还支持中断影响,但是和其他接收中断函数不一样,他不会抛出
InterruptedException异常,他只会默默的返回,但是我们可以从Thread.Interrupted()等方法获得中断标记.
我们来看一个例子:
public class TestLockSupport {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
if (Thread.interrupted()) {
System.out.println(getName() + " 被中断了!");
}
}
System.out.println(getName() + " 执行结束");
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
t1.interrupt();
LockSupport.unpark(t2);
}
}
输出:
in t1
t1 被中断了!
t1 执行结束
in t2
t2 执行结束
Process finished with exit code 0
由run方法中的终端异常捕获我们可以看到线程在中断时并没有抛出异常而是正常执行下去了。
关于LockSupport其实要介绍的东西还是很多,因为这个类实现了底层的一些方法,各种的锁实现都是这个基础上发展而来的。以后会专门用一个篇章来学习jdk内部的阻塞机制。说前面我们讲到Object的wait和notify,讲到Condition条件,讲到jdk中不对外部暴露的LockSupport阻塞原语,那么在JUC包中还有另外一个阻塞机制—信号量机制(Semaphore),下一节我们一起探讨一下。