文章目录
- 什么是异步
- Future
- 异步编排
- 1串行关系执行
- thenRun
- thenApply
- thenAccept
- thenCompose
- 2聚合AND
- thenCombine
- thenAcceptBoth
- runAfterBoth
- 3OR聚合
- applyToEiter
- acceptEither
- runAfterEither
- 4异常处理
- exceptionally
- whenComplete
- handle
- 异步开启
- 1RunAsync:没有使用自定义线程池,默认使用的线程池ForkJoinPool.commonPool
- 2RunAsync使用自己的线程池
- 3supplyAsync: 有返回值的,关注返回值
什么是异步
异步调用实现一个不需要被等待的方法的返回值;让调用者继续执行(异步执行);在java中,简单的讲就是开启另一个线程完成程序计算,使得调用者继续执行,不需要等待计算的结果,但是调用者仍然需要获取线程的计算结果(不需要同步阻塞等待)。
Future
Future也是一个异步计算结果返回接口,目的获取返回值结果。但是future在获取返回值结果的时候,方法必须同步阻塞等待返回值结果。
Get:获取结果(等待,阻塞)
Get(timeout):获取结果,指定等待时间
Cancel: 取消当前任务
isDone:判断任务是否已经完成(轮询)
future对于结果获取不是很方便,只能通过同步阻塞的方式获取结果,或者是轮询的方式获取到结果;阻塞的方式获取返回值结果与异步思想相违背,轮询方式又很占用cpu资源,也不能及时得到我们结果。
异步编排
CompletableFuture 可以帮助我们简化异步编程复杂性,提供了函数式编程的能力,可以通过回调函数的方式处理计算结果。
public class CompletableFuture<T> implements Future<T>,CompletionStage<T>
CompletableFuture具有Future的特性,还实现了CompletionStage接口,具备CompletionStage接口的特性:串行执行,并行执行,聚合(AND聚合,OR聚合)
1串行关系执行
串行关系执行: then- 然后,也就是表示下一步,所以通常是一个串行的关系体现,then后面的单词(比如run/apply/accept)就是函数式接口中抽象方法名称;
串行关系执行:利用上一步的执行结果,去进行下一步任务执行,任务执行具有先后顺序,因此把这种操作叫做串行关系。
thenRun
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("主线程start...................");//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);// thenRun 没有返回值,也不关心上一步执行结果,只和上一步执行具有顺序关系//2.public CompletionStage<Void> thenRun(Runnable action);CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).thenRun(() -> {log.info("thenRun子线程开始运行...................");});// 调用执行future.get();log.info("主线程end...................");}}
thenApply
thenApply此方法具有返回值,上一步直接的结果当成传参传递给thenApply,T就是参数类型,U就是返回值类型
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);// thenApply 有返回值,返回值类型是U,跟上一步执行结果有关系,上一步执行结果会被当成参数传递给下一步,参数类型为T//2.public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);// 多线程异步编排CompletableFuture<Long> thenApply = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).thenApply((t) -> {log.info("thenApply子线程开始执行,参数是:{}", t);long res = t * 5;log.info("计算结果:{}", res);return res;});// 调用异步方法Long aLong = thenApply.get();log.info("最终计算结果:{}",aLong);}}
thenAccept
thenAccept:没有返回值,跟上一步执行结果有关系,上一步执行结果将会被下一步消费,参数类型T
public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);//thenAccept:没有返回值,跟上一步执行结果有关系,上一步执行结果将会被下一步消费,参数类型T//2.public CompletionStage<Void> thenAccept(Consumer<? super T> action);log.info("主线程start...................");CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).thenAccept((t) -> {log.info("thenAccept子线程开始执行,参数是:{}", t);long res = t * 5;log.info("计算结果:{}", res);});// 调用异步方法future.get();log.info("主线程end...................");}}
thenCompose
- 有返回值,返回值类型U
- 依赖于上一步的返回值结果,上一步返回值结果将会作为参数被传递
- 允许对2个CompletionStage流水线进行操作,第一个操作完成时,将第一个操作结果传递第二个CompletionStage
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);//thenCompose://1) 有返回值,返回值类型U//2) 依赖于上一步的返回值结果,上一步返回值结果将会作为参数被传递//3) 允许对2个CompletionStage流水线进行操作,第一个操作完成时,将第一个操作结果传递第二个CompletionStage//2.public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);CompletableFuture<Long> thenCompose = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).thenCompose(new Function<Integer, CompletionStage<Long>>() {@Overridepublic CompletionStage<Long> apply(Integer t) {// 第二次执行CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {log.info("thenCompose子线程开始执行,参数是:{}", t);long res = t * 5;log.info("计算结果:{}", res);return res;});return future;}});//调用异步方法Long aLong = thenCompose.get();log.info("最终计算结果: {}",aLong);}}
2聚合AND
Combine…with… 和 both… and … 都是要求两者都必须满足,也就是and且的关系。
thenCombine
thenCombine:
1) 有返回值
2) thenCombine会把两个CompletionStage的任务都执行完毕后,把两个任务的结果一块交给thenCombine去处理
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);//thenCombine://1) 有返回值//2) thenCombine会把两个CompletionStage的任务都执行完毕后,把两个任务的结果一块交给thenCombine去处理//2.public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 4;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});//第二个:CompletionStageCompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 3;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});// 利用thenCombine方法对f1,f2进行合并操作CompletableFuture<Integer> thenCombine = f1.thenCombine(f2, (t, u) -> {log.info("第一个 CompletableFuture 执行结果{}", t);log.info("第二个 CompletableFuture 执行结果{}", u);return t + u;});//调用异步方法Integer integer = thenCombine.get();log.info("最终异步的调用结果:{}",integer);log.info("主线程end...................");}}
thenAcceptBoth
当2个阶段的CompletionStage都执行完毕后,把结果一块交给thenAcceptBoth进行执行,没有返回值
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T,? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);//thenAcceptBoth: 当2个阶段的CompletionStage都执行完毕后,把结果一块交给thenAcceptBoth进行执行,没有返回值//2.public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T,? super U> action);log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 4;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});//第二个:CompletionStageCompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 3;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});// 利用thenAcceptBoth方法对f1,f2进行合并操作CompletableFuture<Void> f = f1.thenAcceptBoth(f2, (t, u) -> {log.info("第一个 CompletableFuture 执行结果{}", t);log.info("第二个 CompletableFuture 执行结果{}", u);});//调用异步方法f.get();log.info("主线程end...................");}}
runAfterBoth
无返回值,当2个阶段的CompletionStage都执行完毕后,才会执行下一步操作
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//1.public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);//runAfterBoth: 无返回值,当2个阶段的CompletionStage都执行完毕后,才会执行下一步操作//2.public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 4;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});//第二个:CompletionStageCompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 3;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});// 利用runAfterBoth方法对f1,f2进行合并操作CompletableFuture<Void> f = f1.runAfterBoth(f2, () -> {log.info("有个任务在执行:runAfterBoth方法正在运行......");});f.get();log.info("主线程end...................");}}
3OR聚合
applyToEiter
1.针对于2个阶段CompletionStage,将计算速度最快的那个CompletionStage的结果作为下一步处理的消费;
2.有返回值
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn,Executor executor);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T,U> fn);// 1.applyToEither特点:针对于2个阶段CompletionStage,将计算速度最快的那个CompletionStage的结果作为下一步处理的消费;// 2.有返回值log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 4;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});//第二个:CompletionStageCompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 3;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});// 利用applyToEither方法对f1,f2进行合并操作,合并操作关系:ORCompletableFuture<Integer> f = f1.applyToEither(f2, result -> {log.info("applyToEither子线程开始执行,参数是:{}", result);return result;});Integer r1 = f.get();log.info("最终计算结果: {}",r1);log.info("主线程end...................");}}
acceptEither
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
runAfterEither
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
4异常处理
exceptionally
public CompletionStage<T> exceptionally(Function<Throwable,? extends T> fn);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//public CompletionStage<T> exceptionally(Function<Throwable,? extends T> fn);log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 0;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).exceptionally((t)->{log.info("业务执行失败:{}",t.getMessage());return null;});Integer r1 = f.get();log.info("主线程end...................");}}
whenComplete
public CompletionStage<T> whenComplete(BiConsumer<? super T,? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//public CompletionStage<T> whenComplete(BiConsumer<? super T,? super Throwable> action);log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 0;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).whenComplete((t, u) -> {log.info("上一步执行结果:{}",t);//判断if(u!=null){log.info("执行错误,有异常:{}",u.getMessage());}});Integer integer = f.get();log.info("最终执行结果:{}",integer);log.info("主线程end...................");}}
handle
try{}finally{}:对上一步执行结果进行处理,还可以处理异常任务
public <U> CompletionStage<U> handle(BiFunction<? super T,Throwable,? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn);public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable,? extends U> fn, Executor executor);
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.function.Function;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//public <U> CompletionStage<U> handle(BiFunction<? super T,Throwable,? extends U> fn);//try{}finally{}:对上一步执行结果进行处理,还可以处理异常任务log.info("主线程start...................");//第一个:CompletionStageCompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;}).handle((t, u) -> {int res = -1;if (u != null) {log.info("执行错误:{}", u.getMessage());} else {res = t * 5;}return res;});Integer integer = f.get();log.info("最终执行结果:{}",integer);log.info("主线程end...................");}}
异步开启
CompletableFuture提供了4个静态的方法,来创建一个异步操作(异步开启,从这4个静态的方法开始即可)
runAsync:没有返回值的方法,不关注返回值
public static CompletableFuture<Void> runAsync(Runnable runnable);public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
supplyAsync:有返回值,关注返回值的。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
1RunAsync:没有使用自定义线程池,默认使用的线程池ForkJoinPool.commonPool
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("主线程start...................");//runAsync: 实现异步编排,没有返回值//public static CompletableFuture<Void> runAsync(Runnable runnable);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.info("子线程future线程start...................");int i = 10/2;log.info("线程名称:{},线程执行结果:{}",Thread.currentThread().getName(),i);log.info("子线程future线程end...................");});// 调用异步任务future.get();log.info("主线程end...................");}
}
2RunAsync使用自己的线程池
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),9,3,TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("主线程start...................");//runAsync: 实现异步编排,没有返回值//public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {log.info("子线程future线程start...................");int i = 10/2;log.info("线程名称:{},线程执行结果:{}",Thread.currentThread().getName(),i);log.info("子线程future线程end...................");},threadPool);// 调用异步任务future.get();log.info("主线程end...................");}
}
3supplyAsync: 有返回值的,关注返回值
package com.xd.cubemall.juc;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;/*** 并发编程中异步编排*/
@Slf4j
public class AsyncFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {log.info("主线程start...................");//supplyAsync:实现异步编排,有返回值//public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {log.info("子线程future线程start...................");int i = 10 / 2;log.info("线程名称:{},线程执行结果:{}", Thread.currentThread().getName(), i);log.info("子线程future线程end...................");return i;});// 调用异步编排futureInteger integer = uCompletableFuture.get();log.info("supplyAsync异步编排的返回值结果:{}",integer);log.info("主线程end...................");}}