06、Java并发-线程间的通信机制

参考博客:Java并发编程的艺术(六)——线程间的通信

多条线程之间有时需要数据交互,下面介绍五种线程间数据交互的方式,他们的使用场景各有不同。

1. volatile、synchronized关键字

关于volatile的详细介绍:volatile关键字

1.1 如何实现通信?

这两种方式都采用了同步机制实现多条线程间的数据通信。与其说是“通信”,倒不如说是“共享变量”来的恰当。当一个共享变量被volatile修饰 或 被同步块包裹后,他们的读写操作都会直接操作共享内存,从而各个线程都能看到共享变量最新的值,也就是实现了内存的可见性。

1.2 特点

  • 这种方式本质上是“共享数据”,而非“传递数据”;只是从结果来看,数据好像是从写线程传递到了读线程;
  • 这种通信方式无法指定特定的接收线程。当数据被修改后究竟哪条线程最先访问到,这由操作系统随机决定。
  • 总的来说,这种方式并不是真正意义上的“通信”,而是“共享”。

1.3 使用场景

这种方式能“传递”变量。当需要传递一些公用的变量时就可以使用这种方式。如:传递boolean flag,用于表示状态、传递一个存储所有任务的队列等。

1.4 例子

用这种方式实现线程的开关控制。

// 用于控制线程当前的执行状态
private volatile boolean running = false;

// 开启一条线程
Thread thread = new Thread(new Runnable(){
    void run(){
        // 开关
        while(!running){
            Thread.sleep(1000);
        }
        // 执行线程任务
        doSometing();
    }
}).start();

// 开始执行
public void start(){
    running = true;
}

2. 等待/通知机制

等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

2.1 如何实现?

等待/通知机制的实现由Java完成,我们只需调用Object类的几个方法即可。

  • wait():将当前线程的状态改为“等待态”,加入等待队列,释放锁;直到当前线程发生中断或调用了notify方法,这条线程才会被从等待队列转移到同步队列,此时可以开始竞争锁。
  • wait(long):和wait()功能一样,只不过多了个超时动作。一旦超时,就会继续执行wait之后的代码,它不会抛超时异常!
  • notify():将等待队列中的一条线程转移到同步队列中去。
  • notifyAll():将等待队列中的所有线程都转移到同步队列中去。

2.2 注意点

  • 以上方法都必须放在同步块中;
  • 并且以上方法都只能由所处同步块的锁对象调用;
  • 锁对象A.notify()/notifyAll()只能唤醒由锁对象A wait的线程;
  • 调用notify/notifyAll函数后仅仅是将线程从等待队列转移到阻塞队列,只有当该线程竞争到锁后,才能从wait方法中返回,继续执行接下来的代码;
  • 为什么wait必须放在同步块中调用? 因为等待/通知机制需要和共享状态变量配合使用,一般是先检查状态,若状态为true则执行wait,即包含“先检查后执行”,因此需要把这一过程加锁,确保其原子执行。

2.3 具体例子

创建了两个线程——WaitThread和NotifyThread,前者检查flag值是否为false,如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠了一段时间后对lock进行通知,代码如下所示:

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();
    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }
    
    static class Wait implements Runnable {
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
            // 当条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. wait@" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                //条件满足时,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        public void run() {
            // 加锁,拥有lock的Monitor
            synchronized (lock) {
                // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
                // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
        
            // 再次加锁
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}

输出如下(输出内容可能不同,主要区别在时间上)

Thread[WaitThread,5,main] flag is true. wait @ 22:23:03
Thread[NotifyThread,5,main] hold lock. notify @ 22:23:04
Thread[NotifyThread,5,main] hold lock again. sleep @ 22:23:09
Thread[WaitThread,5,main] flag is false. running @ 22:23:14

上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait()、notify()以及notifyAll()时需要注意的细节:

  • 使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
  • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
  • notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
  • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
  • 从wait()方法返回的前提是获得了调用对象的锁。

从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改。上述示例的流程图如下所示:

*

WaitThread首先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。

2.4 生产者和消费者模式

  • 生产者
synchronized(锁A){
    flag = true;// 或者:list.add(xx);
    锁A.notify();
}
  • 消费者
synchronized(锁A){
    // 不满足条件
    while(!flag){ // 或者:list.isEmpty()
        锁A.wait();
    }

    // doSometing……
}

2.5 超时等待模式

在之前的生产者-消费者模式中,如果生产者没有发出通知,那么消费者将永远等待下去。为了避免这种情况,我们可以给消费者增加超时等待功能。该功能依托于wait(long)方法,只需在wait前的检查条件中增加超时标识位,实现如下:

public void get(long mills){
    synchronized( list ){
        // 不加超时功能
        if ( mills <= 0 ) {
            while( list.isEmpty() ){
                list.wait();
            }
        }

        // 添加超时功能
        else {
            boolean isTimeout = false;
            while(list.isEmpty() && isTimeout){
                list.wait(mills);
                isTimeout = true;
            }

            // doSometing……
        }
    }
}

3. 管道流

3.1 作用

管道流用于在两个线程之间进行字节流或字符流的传递。

3.2 特点

  • 管道流的实现依靠PipedOutputStream、PipedInputStream、PipedWriter、PipedReader。分别对应字节流和字符流。
  • 他们与IO流的区别是:IO流是在硬盘、内存、Socket之间流动,而管道流仅在内存中的两条线程间流动。

3.3 实现

步骤如下:
1、 在一条线程中分别创建输入流和输出流;
2、 将输入流和输出流连接起来;
3、 将输入流和输出流分别传递给两条线程;
4、 调用read和write方法就可以实现线程间通信;

// 创建输入流与输出流对象
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();

// 连接输入输出流
out.connect(in);

// 创建写线程
class WriteThread extends Thread{
    private PipedWriter out;

    public WriteThread(PipedWriter out){
        this.out = out;
    }

    public void run(){
        out.write("hello concurrent world!");
    }
}

// 创建读线程
class ReaderThread extends Thread{
    private PipedReader in;

    public ReaderThread(PipedReader in){
        this.in = in;
    }

    public void run(){
        in.read();
    }
}

// 

4. join

4.1 作用

  • join能将并发执行的多条线程串行执行;
  • join函数属于Thread类,通过一个thread对象调用。当在线程B中执行threadA.join()时,线程B将会被阻塞(底层调用wait方法),等到threadA线程运行结束后才会返回join方法。
  • 被等待的那条线程可能会执行很长时间,因此join函数会抛出InterruptedException。当调用threadA.interrupt()后,join函数就会抛出该异常。

4.2 实现

public static void main(String[] args){

    // 开启一条线程
    Thread t = new Thread(new Runnable(){
        public void run(){
            // doSometing
        }
    }).start();

    // 调用join,等待t线程执行完毕
    try{
        t.join();
    }catch(InterruptedException e){
        // 中断处理……
    }

}