05、JUC源码分析:多线程之线程池ThreadPoolExecutor

创建Java线程需要给线程分配堆栈内存以及初始化内存,还需要进行系统调用。创建线程和销毁线程的花销也是比较大的(手动new Thread 类),创建和消耗线程的时间有可能比处理业务的时间还要长。频繁地创建和销毁线程比较消耗资源,会大大降低系统的运行效率

线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程****,对线程统一管理

线程池就是存放线程的池子,池子里存放了很多可以复用的线程

线程池优点

1、 提高效率,提升系统响应速度:创建好一定数量的线程放在池中,等需要使用的时候就从池中拿一个,这要比需要的时候创建一个线程对象要快的多;

2、 降低资源的消耗:减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务;

3、 线程的可管理性:线程是稀缺资源,如果无限制的创建会严重影响系统效率,线程池可以对线程进行管理、监控、调优;

一、Executors类(不推荐)

Executors类属于java.util.concurrent包,Executors(静态Executor工厂)用于创建线程池。工厂和工具方法Executor , ExecutorService , ScheduledExecutorService , ThreadFactory和Callable在此包中定义的类

1、newSingleThreadExecutor

创建一个单线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

        ExecutorService threadPool = Executors.newSingleThreadExecutor();

        for (int i = 1; i <= 5; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " OK");
            });
        }
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK

2、newFixedThread

创建一个定长的线程池,可控制最大并发数,超出的线程进行队列等待

ExecutorService threadPool = Executors.newFixedThreadPool(5);

        for (int i = 1; i <= 10; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " OK");
            });
        }
pool-1-thread-1 OK
pool-1-thread-5 OK
pool-1-thread-1 OK
pool-1-thread-4 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-1 OK
pool-1-thread-5 OK

3、newScheduleThreadPool

可以创建定长的、支持定时任务,周期任务执行

ExecutorService threadPool = Executors.newScheduledThreadPool(5);

        for (int i = 1; i <= 10; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " OK");
            });
        }

4、newCacheTreadPool

创建一个可以缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,没回收的话就新建线程

ExecutorService threadPool = Executors.newCachedThreadPool();

        for (int i = 1; i <= 100; i++) {
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " OK");
            });
        }

*

/**
 * Executors 工具类
 */
public class ExecutorsTest {
    public static void main(String[] args) {
        //ExecutorService threadPool = Executors.newSingleThreadExecutor();
        //ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //ExecutorService threadPool = Executors.newScheduledThreadPool(5);
        ExecutorService threadPool = Executors.newCachedThreadPool();

        try {
            for (int i = 1; i <= 100; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

但是不推荐 Executors 工具类创建线程池,可以在阿里巴巴开发手册中找到答案

线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资

源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者 “过度切换 ” 的问题

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样

的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明: Executors 返回的线程池对象的弊端如下:

1) FixedThreadPool 和 SingleThreadPool :

允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。

2) CachedThreadPool 和 ScheduledThreadPool :

允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM

*

二、线程池标准创建方式ThreadPoolExecutor

new ThreadPoolExecutor**()** 的写法创建线程池,这样写线程数量更灵活,开发中多数用这个类创建线程

ThreadPoolExecutor 源码以及核心参数


    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

(一)核心参数说明

参数         含义 说明
corePoolSize 线程池中的核心线程数 核心线程生命周期无限,即使空闲也不会死亡
maximumPoolSize 线程池中最大线程数

任务队列满了以后当有新任务进来则会增加一个线程来处理新任务,

(线程总数 < maximumPoolSize)

keepAliveTime 闲置超时时间 当线程数大于核心线程数时,经过keepAliveTime时间将会回收非核心线程
unit

超时时间的单位

(时/分/秒等)

workQueue 线程池中的任务队列 存放任务(Runnable)的容器
threadFactory 为线程池提供创建新线程的线程工厂
rejectedExecutionHandler 拒绝策略 新增一个任务到线程池,如果线程池任务队列超过最大值之后,并且已经开启到最大线程数时,默认为抛出ERROR异常

查看电脑CPU核数以及处理器个数

1、 任务管理器----性能----逻辑处理器;

*

2、 我的电脑-管理----设备管理器----处理器;

*

最大线程数定义

1、 CPU密集型;

处理器个数就是最大并发量

2、 IO密集型;

判断程序中消耗IO的线程设置2倍数量最大线程

(二)ThreadPoolExecutor 工作原理

*

(三)四种拒绝策略

四种拒绝策略,在ThreadPoolExecutor是四个内部类

AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();

DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();

DiscardOldestPolicy discardOldestPolicy =
        new ThreadPoolExecutor.DiscardOldestPolicy();

CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();

1、 AbortPolicy

当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常

2、DiscardPolicy

当任务添加到线程池中被拒绝时,丢弃被拒绝的任务,不抛异常

3、DiscardOldestPolicy

当任务添加到线程池中被拒绝时,**丢弃任务队列中最旧的未处理任务,**然后将被拒绝的任务添加到等待队列中

4、CallerRunsPolicy

被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务(被拒绝的任务,直接在主线程中运行,不再进入线程池)

1、 AbortPolicy

当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常

/**
 * new ThreadPoolExecutor.AbortPolicy()// 当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常
 * new ThreadPoolExecutor.CallerRunsPolicy() // 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务(被拒绝的任务,直接在主线程中运行,不再进入线程池)
 * new ThreadPoolExecutor.DiscardPolicy() // 当任务添加到线程池中被拒绝时,丢弃被拒绝的任务,不抛异常
 * new ThreadPoolExecutor.DiscardOldestPolicy() // 当任务添加到线程池中被拒绝时,丢弃任务队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中
 */
public class ThreadPoolExecutorTest {
    public static void main(String[] args) {
        // 最大线程到底该如何定义
        // 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!(1)任务管理器-性能-逻辑处理器;(2)我的电脑-管理-设备管理器-处理器
        // 2、IO  密集型   > 判断你程序中十分耗IO的线程,设置2倍线程
        // 程序   15个大型任务  io十分占用资源!

        // 获取CPU核数
        int cpuNum = Runtime.getRuntime().availableProcessors();

        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                cpuNum,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                //RejectedExecutionHandler 4种拒绝策略实现类
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            for (int i = 1; i <= 15; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}
pool-1-thread-2 OK
pool-1-thread-5 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-4 OK
pool-1-thread-3 OK
pool-1-thread-6 OK
pool-1-thread-1 OK
pool-1-thread-7 OK
pool-1-thread-5 OK
pool-1-thread-8 OK
java.util.concurrent.RejectedExecutionException: Task juc.threadpool.ThreadPoolExecutorTest$$Lambda$1/668386784@b4c966a rejected from java.util.concurrent.ThreadPoolExecutor@2f4d3709[Running, pool size = 8, active threads = 4, queued tasks = 0, completed tasks = 7]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at juc.threadpool.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:30)

2、DiscardPolicy

当任务添加到线程池中被拒绝时,丢弃被拒绝的任务,不抛异常

/**
 * new ThreadPoolExecutor.AbortPolicy()// 当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常
 * new ThreadPoolExecutor.CallerRunsPolicy() // 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务(被拒绝的任务,直接在主线程中运行,不再进入线程池)
 * new ThreadPoolExecutor.DiscardPolicy() // 当任务添加到线程池中被拒绝时,丢弃被拒绝的任务,不抛异常
 * new ThreadPoolExecutor.DiscardOldestPolicy() // 当任务添加到线程池中被拒绝时,丢弃任务队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中
 */
public class ThreadPoolExecutorTest {
    public static void main(String[] args) {

        // 获取CPU核数
        int cpuNum = Runtime.getRuntime().availableProcessors();

        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                cpuNum,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                //RejectedExecutionHandler 4种拒绝策略实现类
                new ThreadPoolExecutor.DiscardPolicy()
        );

        try {
            for (int i = 1; i <= 15; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

只有11条输出

pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-6 OK
pool-1-thread-5 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-8 OK
pool-1-thread-7 OK

3、DiscardOldestPolicy

当任务添加到线程池中被拒绝时,**丢弃任务队列中最旧的未处理任务,**然后将被拒绝的任务添加到等待队列中(尝试和最早的线程竞争,也不会抛出异常)

4、CallerRunsPolicy

被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务(被拒绝的任务,直接在主线程中运行,不再进入线程池)

/**
 * new ThreadPoolExecutor.AbortPolicy()// 当任务添加到线程池中被拒绝时,直接丢弃任务,并抛出RejectedExecutionException异常
 * new ThreadPoolExecutor.CallerRunsPolicy() // 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务(被拒绝的任务,直接在主线程中运行,不再进入线程池)
 * new ThreadPoolExecutor.DiscardPolicy() // 当任务添加到线程池中被拒绝时,丢弃被拒绝的任务,不抛异常
 * new ThreadPoolExecutor.DiscardOldestPolicy() // 当任务添加到线程池中被拒绝时,丢弃任务队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中
 */
public class ThreadPoolExecutorTest {
    public static void main(String[] args) {

        // 获取CPU核数
        int cpuNum = Runtime.getRuntime().availableProcessors();

        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                cpuNum,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                //RejectedExecutionHandler 4种拒绝策略实现类
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        try {
            for (int i = 1; i <= 15; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}
main OK
main OK
main OK
main OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-5 OK
pool-1-thread-5 OK
pool-1-thread-5 OK
pool-1-thread-2 OK
pool-1-thread-6 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-7 OK
pool-1-thread-8 OK