一、ForkJoin
ForkJoin是在Java7提供的一个用于并行执行任务的框架,ForkJoin 从字面上看Fork是分岔的意思,Join是结合的意思,核心思想就是**把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果,**其实现思想与MapReduce有异曲同工之妙
ForkJoin体系中最为关键的就是ForkJoinTask和ForkJoinPool,ForkJoin就是利用分治的思想将大的任务按照一定规则Fork拆分成小任务,再通过Join聚合起来
ForkJoin最经典的一个应用就是Java8中的Stream,我们知道Stream分为串行流和并行流,其中并行流parallelStream就是依赖于ForkJoin来实现并行处理的
Forkjoin主要使用两个类
1、ForkJoinTask
ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类如下:
(1)RecursiveAction:无结果返回的任务
(2)RecursiveTask:有返回结果的任务
(3)CountedCompleter:无返回值任务,完成任务后可以触发回调
ForkJoinTask提供了两个重要的方法:
(1)fork:让task异步执行
(2)join:让task同步执行,可以获取返回值
2、ForkJoinPool
ForkJoinPool : 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务)。
他是fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。
工作线程一次只能执行一个任务。
ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )。
这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。
3、工作窃取( work-stealing )算法
简单来说,就是空闲的线程试图从繁忙线程的 deques 中 窃取工作
默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空时,工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。
这种方法最大限度地减少了线程竞争任务的可能性。它还减少了工作线程寻找任务的次数,因为它首先在最大可用的工作块上工作
如下,分别使用 普通for循环,ForkJoin框架,Stream流式计算方式 耗时,相比Stream最简单也最高效
public class MyForkJoin extends RecursiveTask<Long> {
private Long start;
private Long end;
// 临界值
private Long temp = 10000L;
public MyForkJoin(Long start,Long end){
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
if ((end - start) < temp){
Long sum = 0L;
for (Long i = start;i <= end; i++){
sum += i;
}
return sum;
}else { // forkjoin 递归
Long middle = (start + end) / 2;
MyForkJoin task1 = new MyForkJoin(start,middle);
task1.fork();// 拆分任务,把任务压入线程队列
MyForkJoin task2 = new MyForkJoin(middle + 1,end);
task2.fork();
return task1.join() + task2.join();
}
}
}
/**
* 使用 forkjoin
* 1、forkjoinPool 通过它来执行
* 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
* 3、计算类要继承 ForkJoinTask
*/
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//forMethod(); // 耗时 6370 ms
//forkJoinMethod(); // 耗时 6013 ms
streamMethod(); // 耗时 409 ms
}
// 普通 for循环
public static void forMethod(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum=" + sum +" 时间:" + (end-start));
}
// ForkJoin
public static void forkJoinMethod() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new MyForkJoin(0L,10_0000_0000L);
// 提交任务
ForkJoinTask<Long> sumbit = forkJoinPool.submit(task);
Long sum = sumbit.get();
long end = System.currentTimeMillis();
System.out.println("sum=" + sum +" 时间:" + (end-start));
}
// stream流
public static void streamMethod(){
long start = System.currentTimeMillis();
// Stream 并行流式计算
long sum = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum +" 时间:" + (end-start));
}
}
二、异步调用CompletableFuture
在JDK8之前,我们使用的Java多线程变成,主要是 Thread+Runnable 来完成,但是这种方式有个弊端就是没有返回值;Callable+Thread的方法提供多线程和返回值的能力但是在获取返回值的时候会阻塞主线程
JDK8 就引入了 CompletableFuture
CompletableFuture
实现了 Future
接口和 CompletionStage
。因此 CompletableFuture是对 Futrue的功能增强包含了Future的功能
创建 CompletableFuture 的对象的工厂方法
static*CompletableFuture<Void> runAsync(Runnable*runnable)
static*CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync 的参数是 Runnable, 返回值是 CompletableFuture, 意思是工厂方法创建的 CompletableFuture 对象封装的任务没有返回值
suppyAsync 参数类型是 Supplier,返回值是CompletableFuture<U>
, 意思是任务不接受参数,但是会返回结果
如果任务需要返回结果,那么应该选择 suppyAsync;否则可以选择 runAsync
1、runAsync 无返回结果
// 没有返回值的 runAsync 异步回调
private static void noReturnAsync() throws ExecutionException, InterruptedException {
// 没有返回值的 runAsync 异步回调
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() ->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " runAsync====>Void");
});
System.out.println(Thread.currentThread().getName() + " 1");
// 获取阻塞执行结果
completableFuture.get();
}
main 1
ForkJoinPool.commonPool-worker-1 runAsync====>Void
2、suppyAsync 有返回结果
// 有返回值的 runAsync 异步回调
private static void returnAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " supplyAsync====>Integer");
int num = 10/0;
return 200;
});
completableFuture.whenComplete((t,u) -> {
System.out.println("t=>" + t); // 正常的返回结果
System.out.println("u=>" + u); // 错误信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 400; // 可以获取到错误的返回结果
}).get();
}
ForkJoinPool.commonPool-worker-1 supplyAsync====>Integer
t=>null
u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero