背景
上一篇文章我们看了FutureTask,分析了他的问题,异步编程并不方便。
- 问题1: FutureTask获取执行结果前,主线程需要通过get()方法一直阻塞等待子线程执行完成call方法,才可以拿到返回结果
- 问题2:如果不通过get挂起线程,通过while循环,不停的判断任务的状态是否结束,结束后,再拿结果。如果任务长时间没有执行完毕,CPU会一直调度查看任务状态的方法,浪费CPU资源。
CompletableFuture在一定程度上提高了各种异步非阻塞的方案,并且响应式变成,代码编写效果上,效率更高。
1、相关API
1.1 等在前置结果再执行当前任务的API
-
supplyAsync(Supplier supplier) 异步执行任务,有返回结果
-
runAsync(Runnable runnable) 异步执行任务,无返回结果
在不指定线程池的前提下,两个异步任务都交给ForkJoinPool去执行,而ForkJoinPool内部是守护线程,守护线程是主线程结束后就结束了。
-
thenApply(Function fn) 等待前一个任务结束,拿到前一个方法的结果并处理然后返回结果。使用和前一个任务想同的线程
thenApplyAsync(Function fn) 和上面的一样,但是使用不同的线程执行
thenApplyAsync(Function fn, Executor executor) 和上面一样,这里需要自定义线程池
CompletableFuture中的大部分方法都有三个重载,(不带Async、带Async、带Async和线程池)
-
thenAccept(Consumer action) 等待前一个任务结束,拿到前一个方法的结果并处理,没有返回值
-
thenRun(Runnable action) 等待前一个任务结束,再处理,不需要前面的结果,没有返回值
1.2 等待多个并行任务完成后,并拿到结果再执行当前任务API
-
thenCombine(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果再执行当前任务,并返回结果
thenCombineAsync(CompletionStage other, BiFunction fn) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务,并返回结果
thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) 等待前两个并行任务完成后,拿到结果,并在新线程执行当前任务(自定义线程池),并返回结果
-
thenAcceptBoth(CompletionStage other, BiConsumer action) 等待前两个并行任务结束,拿到结果并处理,没有返回值
thenAcceptBothAsync(CompletionStage other, BiConsumer action)
thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor)
-
runAfterBoth(CompletionStage other, Runnable action) 让两个并行任务结束,再执行,不需要前面的结果,没有返回值
runAfterBothAsync
1.3 两个任务一起执行,有一个任务返回结果后,拿到结果就可以处理API
-
applyToEither(CompletionStage other, Function fn) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,并返回结果
Async方法同样如上
-
acceptEither(CompletionStage other, Consumer action) 等待前两个并行任务执行,任意一个有返回结果,拿到结果执行,无返回
-
runAfterEither(CompletionStage other, Runnable action) 等待前两个并行任务执行结束,再执行,无返回
1.4 等前置任务执行完成,再处理,后续返回结果为CompletableStage
-
thenCompose(Function> fn) 等待前置任务执行完成,拿到结果并执行,返回CompletableStage
thenApply(Function fn) 用Apply执行就够了,等同于Apply
1.5 异常和其他处理
- exceptionally(Function fn) 异常处理
- whenComplete(BiConsumer action) 可以拿到上一个任务的返回结果和异常,当前处理不会返回结果
- handle(BiFunction fn) 可以拿到上一个任务的返回结果和异常,同时当前处理可以返回结果
1.6 总结
上面的是常用API,CompletableFuture这么多API很难记,但是有规律可循:
- Async结尾的是异步执行的API,通常有带线程池和不带线程池的版本
- run开头的是无参方法,没有返回值
- supply开头的是有参方法,有返回值
- 以Accept开头或者结尾的方法,有参数,没有返回值
- 以Apply开头或者结尾的方法,有参数,有返回值
- 带有either后缀的方法,表示谁先完成就消费谁
2、如何使用?
基于上面API做应用案例
2.1 一个简单案例
小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭。
我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:
/** * 小连要回家干饭, 小严做饭, 小连看电视, 等小严做完, 小连干饭 * * main线程 小连 * 异步线程 小严 */
public static void main(String[] args) throws ExecutionException, InterruptedException { print("小连回家干饭"); CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{ print("小严做饭"); sleep(2);print("小严做完"); return "锅包肉"; }); print("小连看电视"); print("小连干饭,"+future.join());
}
输出结果:
可以看到默认线程池是ForkJoinPool
main: 小连回家干饭
main: 小连看电视
ForkJoinPool.commonPool-worker-1: 小严做饭
ForkJoinPool.commonPool-worker-1: 小严做完
main: 小连干饭,锅包肉
2.2 稍微复杂的案例
小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭。
我们把能同时执行的并发执行,必须分开的顺序执行,设计如下:
/** * 小连要回家干饭, 小严炒菜, 小李焖饭, 小连看电视, 等小严小李做完, 小陈端菜和饭给小连, 小连干饭 ** main线程 小连* 异步线程 小严, 小李,小陈**/
public static void main(String[] args) throws ExecutionException, InterruptedException {print("小连回家干饭"); CompletableFuture<String> future= CompletableFuture.supplyAsync(()->{ print("小严做菜"); sleep(2); print("小严做完"); return "锅包肉"; }, executor).thenCombineAsync(CompletableFuture.supplyAsync(()->{ print("小李焖饭"); sleep(3); print("饭好了"); return "大米饭"; }, executor), (r1, r2)->{ print("饭菜好了,小李端菜"); sleep(1); return r1+", "+r2; }, executor); print("小连看电视"); print("小连干饭,"+future.join()); executor.shutdown();
}
执行结果:
这里指定了线程池
main: 小连回家干饭
pool-1-thread-1: 小严做菜
pool-1-thread-2: 小李焖饭
main: 小连看电视
pool-1-thread-1: 小严做完
pool-1-thread-2: 饭好了
pool-1-thread-3: 饭菜好了,小李端菜
main: 小连干饭,锅包肉, 大米饭
3、源码分析
3.1 runAsync源码
3.1.1 从runAsync进入,来到asyncRunStage方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();//声明当前任务的CompletableFuture对象//任务执行和后续任务的触发是两个操作,这里的d是为了触发后续任务的执行CompletableFuture<Void> d = new CompletableFuture<Void>();//将任务和CompletableFuture封装到一起,作为Async对象//将Async交给线程池执行e.execute(new AsyncRun(d, f));return d;
}
3.1.2 进入new AsyncRun
封装任务和CompletableFuture,作为Async对象,将Async交给线程池执行
static final class AsyncRun extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<Void> dep; Runnable fn;//存储CompletableFuture以及当前任务AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}public final boolean exec() { run(); return false; }public void run() {CompletableFuture<Void> d; Runnable f;//将成员变量做临时存储if ((d = dep) != null && (f = fn) != null) {// help gcdep = null; fn = null;//当前任务是否已经有返回结果if (d.result == null) {try {//线程池异步执行任务f.run();//当前Runnable是没有返回结果的,所以直接封装一个null值d.completeNull();} catch (Throwable ex) {d.completeThrowable(ex);}}//执行后续任务d.postComplete();}}
}
3.1.3 completeXXX系列
completeXXX系列方法都差不多,如下,还有completeThrowable等等
//不需要返回值的时候封装null
final boolean completeNull() {//CAS设置result值return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL);
}
//需要返回值的时候,封装结果
final boolean completeValue(T t) {return UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t);
}
3.1.4 postComplete后续任务的触发方式
当前任务执行完毕后,触发的后续处理。即触发后续任务执行。
final void postComplete() {/** On each step, variable f holds current dependents to pop* and run. It is extended along only one path at a time,* pushing others to avoid unbounded recursion.*///h 是栈顶CompletableFuture<?> f = this; Completion h;//f.stack存储后续任务的栈while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;//栈结构中有后续需要处理的任务,进入while循环,没循环一次,h指针会后移。//casStack将h后移,栈顶出栈if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null; // detach}//执行栈顶任务f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}
3.2 thenRun源码
后续任务触发的方式有两种:
- 一种是基于前置任务执行完毕,执行postComplete方法触发
- 另一种是后续任务在压栈之前和之后,会尝试执行后续任务,只要前置任务执行结束快,后续任务就可以直接执行,不需要前置任务触发
3.2.1 thenRun系列
几个方法如下,都是走的uniRunStage,同步执行的线程池参数是null。
public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);
}public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);
}public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);
}
3.2.2 从thenRun进入uniRunStage
//e 线程池执行器,如果是Async异步调用,会传递线程池
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {if (f == null) throw new NullPointerException();CompletableFuture<Void> d = new CompletableFuture<Void>();//如果传递了线程池,代表异步执行,直接走if代码块执行//如果没有传递线程池,先执行d.uniRun同步执行,如果d.uniRun返回false,继续向下if (e != null || !d.uniRun(this, f, null)) {//如果前置任务没有执行完成,那就压栈//将线程池、后续任务、前置任务,封装成cUniRun<T> c = new UniRun<T>(e, d, this, f);//将封装好的c压栈//不确保一定压到栈中//在这个位置,可能出现前置任务已经执行完毕,导致无法压到栈中push(c);//尝试执行后续任务c.tryFire(SYNC);}return d;
}final void push(UniCompletion<?,?> c) {if (c != null) {//result是前置任务的结果//只有前置任务还没有执行完成,才能将封装好的UniRun对象压栈while (result == null && !tryPushStack(c))lazySetNext(c, null); // clear on failure}
}
3.2.3 uniRun方法,参数c==null表示同步执行,否则异步执行(进入claim执行)
//尝试执行后续任务,a是前置任务, f:后续任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {Object r; Throwable x;//如果前置任务没有执行完,直接走后面的后续任务//只看第二个判断,如果前置任务没有执行完成,直接返回falseif (a == null || (r = a.result) == null || f == null)return false;//说明前置任务已经完成,要执行后续任务,但是要先判断后续任务执行了么if (result == null) {//后续任务还没有执行//如果前置任务异常结束,那么后续任务不需要执行了if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)completeThrowable(x, r);else//前置任务正常结束,尝试执行后继任务try {//c==null,同步执行,否则同步执行c.claim if (c != null && !c.claim())//异步执行完毕return false;//同步执行f.run();completeNull(); //已分析,见3.1.3} catch (Throwable ex) {completeThrowable(ex);//已分析,见3.1.3}}return true;
}abstract static class UniCompletion<T,V> extends Completion {//异步执行任务final boolean claim() {Executor e = executor;//判断当前任务标记,是否执行if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)//线程池为null,代表同步执行,直接返回falsereturn true;//异步执行,使用线程池执行即可executor = null; // disablee.execute(this);}return false;}
}
3.2.4 UniRun类(和uniRun方法没关系)
这个类将线程池、后续任务、前置任务、后续具体任务(Runable实现)封装,此类继承了UniCompletion
//后续任务执行,以及将前置任务封装成UniRun对象
static final class UniRun<T> extends UniCompletion<T,Void> {Runnable fn;UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}//dep 后续任务//src 前置任务//fn 后续具体任务(Runable实现)final CompletableFuture<Void> tryFire(int mode) {//d 后续任务, a 前置任务CompletableFuture<Void> d; CompletableFuture<T> a;if ((d = dep) == null ||//mode>0是同步!d.uniRun(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;return d.postFire(a, mode);}
}