设计网站有哪些,做食品网站需要什么条件,手机主题制作网站,唐山建设造价信息网的网站文章目录 1. 三种实现接口2. 链式调用#xff1a;保证链的顺序性与异步性3. CompletableFuture创建CompletionStage子任务4. 处理异常a. 创建回调钩子b. 调用handle()方法统一处理异常和结果 5. 如何选择线程池#xff1a;不同的业务选择不同的线程池 CompletableFuture是JDK… 文章目录 1. 三种实现接口2. 链式调用保证链的顺序性与异步性3. CompletableFuture创建CompletionStage子任务4. 处理异常a. 创建回调钩子b. 调用handle()方法统一处理异常和结果 5. 如何选择线程池不同的业务选择不同的线程池 CompletableFuture是JDK 1.8引入的实现类该类实现了Future和CompletionStage两个接口。该类的实例作为一个异步任务可以在自己异步执行完成之后触发一些其他的异步任务从而达到异步回调的效果。
CompletionStage代表异步计算过程中的某一个阶段一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务每一个子任务会包装一个Java函数式接口实例表示该子任务所要执行的操作。 1. 三种实现接口
每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。
这三个常用的函数式接口的特点如下
被包装接口功能描述FunctionFunction接口的特点是有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数并会产生一个输出结果到下一步。RunnableRunnable接口的特点是无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数又不会产生任何输出。ConsumerConsumer接口的特点是有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数但不会产生任何输出。 2. 链式调用保证链的顺序性与异步性
多个CompletionStage构成了一条任务流水线一个环节执行完成了可以将结果移交给下一个环节子任务。多个CompletionStage子任务之间可以使用链式调用。
下面是一个顺序调用的例子 使用 CompletionStage 及其方法构建了一个异步任务链thenApply 用于对前一个阶段的结果进行计算并传递结果thenAccept 用于消费前一个阶段的结果并执行操作thenRun 用于执行无输入输出的操作。 oneStage//被thenApply包装CompletionStage子任务,由输入输出.thenApply(x - square(x)) //消耗上游输出但是没有输出.thenAccept(y - System.out.println(y)) //不消耗上一个子任务的输出又不产生结果.thenRun(() - System.out.println())
这种链式操作可以方便地将多个异步操作连接起来同时保证了操作的顺序性和异步性提高了代码的可维护性和并发性能。 接下来是一个异步调用的例子 在这个例子中task2 和 task3 都依赖于 task1 完成后执行但它们可能并行执行也就是说task2 和 task3 的执行顺序是不确定的它们不一定会按照 thenRunAsync 的顺序执行。 CompletableFutureVoid task1 CompletableFuture.runAsync(() - {System.out.println(Task 1);
});CompletableFutureVoid task2 task1.thenRunAsync(() - {System.out.println(Task 2);
});CompletableFutureVoid task3 task1.thenRunAsync(() - {System.out.println(Task 3);
}); 3. CompletableFuture创建CompletionStage子任务
CompletableFuture定义了一组方法用于创建CompletionStage子任务或者阶段性任务基础的方法如下
//子任务包装一个Runnable实例并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFutureVoid runAsync(Runnable runnable)//子任务包装一个Runnable实例并调用指定的executor线程池来执行
public static CompletableFutureVoid runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例并调用ForkJoinPool.commonPool()线程池来执行
public static U CompletableFutureU supplyAsync(SupplierU supplier)//子任务包装一个Supplier实例并使用指定的executor线程池来执行
public static U CompletableFutureU supplyAsync(SupplierU supplier, Executor executor)
其中主要注意的信息是 Supplier 表示一个无参数但有返回值的函数Runnable表示无惨无返回值的函数在使用CompletableFuture创建CompletionStage子任务时如果没有指定Executor线程池在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。它们都会交给线程池执行get方法会堵塞主线程等待执行结果。 给一个例子
//无返回值异步调用
Test
public void runAsyncDemo() throws Exception { CompletableFutureVoid future CompletableFuture.runAsync(() - { sleepSeconds(1);//模拟执行1秒 Print.tco(run end ...); }); //等待异步任务执行完成,最多等待2秒 future.get(2, TimeUnit.SECONDS);
} //有返回值异步调用
Test
public void supplyAsyncDemo() throws Exception { CompletableFutureLong future CompletableFuture.supplyAsync(() - { long start System.currentTimeMillis(); sleepSeconds(1);//模拟执行1秒 Print.tco(run end ...); return System.currentTimeMillis() - start; }); //等待异步任务执行完成,现时等待2秒 long time future.get(2, TimeUnit.SECONDS); Print.tco(异步执行耗时秒 time / 1000);
}4. 处理异常
a. 创建回调钩子
可以为CompletionStage子任务设置特定的回调钩子当计算结果完成或者抛出异常的时候执行这些特定的回调钩子。
//设置子任务完成时的回调钩子
public CompletableFutureT whenComplete(BiConsumer? super T,? super Throwable action)//设置子任务完成时的回调钩子可能不在同一线程执行
public CompletableFutureT whenCompleteAsync(BiConsumer? super T,? super Throwable action)//设置子任务完成时的回调钩子提交给线程池executor执行
public CompletableFutureT whenCompleteAsync(BiConsumer? super T,? super Throwable action,Executor executor)//设置异常处理的回调钩子
public CompletableFutureT exceptionally(FunctionThrowable,? extends T fn)Test
public void whenCompleteDemo() throws Exception { CompletableFutureVoid future CompletableFuture.runAsync(() - { sleepSeconds(1);//模拟执行1秒 Print.tco(抛出异常); throw new RuntimeException(发生异常); //Print.tco(run end ...); }); //设置执行完成后的回调钩子 future.whenComplete(new BiConsumerVoid, Throwable() { Override public void accept(Void t, Throwable action) { Print.tco(执行完成); } }); //设置发生异常后的回调钩子 future.exceptionally(new FunctionThrowable, Void() { Override public Void apply(Throwable t) { Print.tco(执行失败 t.getMessage()); return null; } }); future.get();
}[ForkJoinPool.commonPool-worker-9]抛出异常
[main]执行完成
[ForkJoinPool.commonPool-worker-9]执行失败java.lang.RuntimeException: 发生异常
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 发生异常有如下几个注意点 调用cancel()方法取消CompletableFuture时任务被视为异常完成completeExceptionally()方法所设置的异常回调钩子也会被执行。如果没有设置异常回调钩子发生内部异常时会有两种情况发生 在调用get()时如果遇到内部异常get()方法就会抛出ExecutionException执行异常。在调用join()和getNow(T)启动任务时如果遇到内部异常join()和getNow(T)方法就会抛出CompletionException。 b. 调用handle()方法统一处理异常和结果
//在执行任务的同一个线程中处理异常和结果
public U CompletionStageU handle(BiFunction? super T, Throwable, ? extends U fn);//可能不在执行任务的**同一个线程**中处理异常和结果
public U CompletionStageU handleAsync(BiFunction? super T, Throwable, ? extends U fn);//在指定线程池executor中处理异常和结果
public U CompletionStageU handleAsync(BiFunction? super T, Throwable, ? extends U fn,Executor executor);Test
public void handleDemo() throws Exception { CompletableFutureVoid future CompletableFuture.runAsync(() - { sleepSeconds(1);//模拟执行1秒 Print.tco(抛出异常); throw new RuntimeException(发生异常); //Print.tco(run end ...); }); //设置执行完成后的回调钩子 future.handle(new BiFunctionVoid, Throwable, Void() { Override public Void apply(Void input, Throwable throwable) { if (throwable null) { Print.tcfo(没有发生异常); } else { Print.tcfo(sorry,发生了异常); } return null; } }); future.get();
}//
//[ForkJoinPool.commonPool-worker-1]抛出异常
//[ForkJoinPool.commonPool-worker-1|CompletableFutureDemo$3.apply]: sorry,发生了异常5. 如何选择线程池不同的业务选择不同的线程池
默认情况下通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池默认的线程数是CPU的核数。当然它的线程数可以通过以下JVM参数设置 option:-Djava.util.concurrent.ForkJoinPool.common.parallelism如果所有CompletableFuture共享一个线程池那么一旦有任务执行一些很慢的IO操作就会导致线程池中的所有线程都阻塞在IO操作上造成线程饥饿进而影响整个系统的性能。所以强烈建议大家根据不同的业务类型创建不同的线程池以避免互相干扰。