CompletableFuture实现多任务异步执行
CountDownLatch可以实现多任务并行,最后收集结果。业务中更广泛的使用CompletableFuture来实现多任务并行
描述
CompletableFuture.supplyAsync() 是 Java 中用于异步执行任务的一种方法。它返回一个CompletableFuture 对象,该对象表示一个异步计算的结果。supplyAsync() 方法接受一个 Supplier 函数式接口作为参数,该接口表示一个无参数、有返回值的函数。
使用
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>supplyAsync(Supplier<U> supplier, Executor executor)
第一个方法使用默认的 ForkJoinPool.commonPool() 作为执行器。
第二个方法允许你指定一个自定义的 Executor
使用demo
public static void main(String[] args) {List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();long t1 =System.currentTimeMillis();CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {int time = 6;try {//处理业务TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}//返回业务结果return time;},threadPool);CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {int time = 8;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {int time = 10;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;});CompletableFuture<Integer> completableFuture4 = CompletableFuture.supplyAsync(() -> {int time = 13;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture5 = CompletableFuture.supplyAsync(() -> {int time = 20;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);CompletableFuture<Integer> completableFuture6 = CompletableFuture.supplyAsync(() -> {int time = 25;try {TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {throw new RuntimeException(e);}return time;},threadPool);completableFutureList.add(completableFuture1);completableFutureList.add(completableFuture2);completableFutureList.add(completableFuture3);completableFutureList.add(completableFuture4);completableFutureList.add(completableFuture5);completableFutureList.add(completableFuture6);//用于等待一组 CompletableFuture 全部完成的方法.join() 方法用于阻塞当前主线程,直到 CompletableFuture 完成CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();// 汇总所有结果List<Integer> dataList = new ArrayList<>();for (CompletableFuture<Integer> future : completableFutureList) {try {Integer time = future.get();dataList.addAll(Collections.singleton(time));} catch (InterruptedException | ExecutionException e) {}}long t2 =System.currentTimeMillis();System.out.println("6个任务总用时:"+(t2-t1)+"毫秒");System.out.println(dataList);}
线程池的定义
static Executor threadPool = new ThreadPoolExecutor(10,//核心活跃线程数,类比银行两个柜台一直保持营业30 ,//线程池最大大小,类比银行共25个柜台可以营业2L,//超时回收空闲的线程,类比有三个非活跃线程处于活跃状态,在一定时间还未接到任务就进入非活跃状态(就是不营业了)TimeUnit.SECONDS,//时间单位new ArrayBlockingQueue<>(3),//存放等待任务的队列,类比为银行的候客区,不指定大小的话就是最大整数Executors.defaultThreadFactory(),// 线程工厂,不修改!用来创建new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略,如果线程满了,线程池就会使用拒绝策略);
最终输出:
6个任务总用时:25029毫秒
[6, 8, 10, 13, 20, 25]
注意:
①join是为了让主线程等待所有子任务都执行完
②线程池的定义要注意,核心线程太小的话,会在核心线程用完时,进入队列等待。除非队列装满才会唤醒非核心线程