CompletableFuture异步编排

简介:

CompletableFuture异步编排
什么是CompletableFuture#
CompletableFuture是JDK8提供的Future增强类。CompletableFuture异步任务执行线程池,默认是把异步任务都放在ForkJoinPool中执行。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

Future存在的问题#
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。

使用#
runAsync 和 supplyAsync方法#
CompletableFuture 提供了四个静态方法来创建一个异步操作。

Copy
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

runAsync方法不支持返回值。
supplyAsync可以支持返回值。
计算完成时回调方法#
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

Copy
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture exceptionally(Function fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务

whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

代码示例:

Copy
public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
        @Override
        public Object get() {
            System.out.println(Thread.currentThread().getName() + "\t completableFuture");
            int i = 10 / 0;
            return 1024;
        }
    }).whenComplete(new BiConsumer<Object, Throwable>() {
        @Override
        public void accept(Object o, Throwable throwable) {
            System.out.println("-------o=" + o.toString());
            System.out.println("-------throwable=" + throwable);
        }
    }).exceptionally(new Function<Throwable, Object>() {
        @Override
        public Object apply(Throwable throwable) {
            System.out.println("throwable=" + throwable);
            return 6666;
        }
    });
    System.out.println(future.get());
}

}
handle 方法#
handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。

Copy
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
线程串行化方法#
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

Copy
public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage thenAccept(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

代码演示:

Copy
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
    @Override
    public Integer get() {
        System.out.println(Thread.currentThread().getName() + "\t completableFuture");
        //int i = 10 / 0;
        return 1024;
    }
}).thenApply(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer o) {
        System.out.println("thenApply方法,上次返回结果:" + o);
        return  o * 2;
    }
}).whenComplete(new BiConsumer<Integer, Throwable>() {
    @Override
    public void accept(Integer o, Throwable throwable) {
        System.out.println("-------o=" + o);
        System.out.println("-------throwable=" + throwable);
    }
}).exceptionally(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) {
        System.out.println("throwable=" + throwable);
        return 6666;
    }
}).handle(new BiFunction<Integer, Throwable, Integer>() {
    @Override
    public Integer apply(Integer integer, Throwable throwable) {
        System.out.println("handle o=" + integer);
        System.out.println("handle throwable=" + throwable);
        return 8888;
    }
});
System.out.println(future.get());

}
两任务组合 - 都要完成#
两个任务必须都完成,触发该任务。

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值

thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

Copy
public CompletableFuture thenCombine(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public CompletableFuture thenCombineAsync(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);

public CompletableFuture thenCombineAsync(

CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);

public CompletableFuture thenAcceptBoth(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public CompletableFuture thenAcceptBothAsync(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);

public CompletableFuture thenAcceptBothAsync(

CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);

public CompletableFuture runAfterBoth(CompletionStage<?> other,

                                        Runnable action);

public CompletableFuture runAfterBothAsync(CompletionStage<?> other,

                                             Runnable action);

public CompletableFuture runAfterBothAsync(CompletionStage<?> other,

                                             Runnable action,
                                             Executor executor);

测试案例:

Copy
public static void main(String[] args) {

CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenApplyAsync(t -> {
    return t + " world!";
}).thenCombineAsync(CompletableFuture.completedFuture(" CompletableFuture"), (t, u) -> {
    return t + u;
}).whenComplete((t, u) -> {
    System.out.println(t);
});

}
输出:hello world! CompletableFuture

两任务组合 - 一个完成#
当两个任务中,任意一个future任务完成的时候,执行任务。

applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

Copy
public CompletableFuture applyToEither(

CompletionStage<? extends T> other, Function<? super T, U> fn);

public CompletableFuture applyToEitherAsync(

CompletionStage<? extends T> other, Function<? super T, U> fn);

public CompletableFuture applyToEitherAsync(

CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);

public CompletableFuture acceptEither(

CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture acceptEitherAsync(

CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture acceptEitherAsync(

CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);

public CompletableFuture runAfterEither(CompletionStage<?> other,

                                          Runnable action);

public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,

                                               Runnable action);

public CompletableFuture runAfterEitherAsync(CompletionStage<?> other,

                                               Runnable action,
                                               Executor executor);

多任务组合#
Copy
public static CompletableFuture allOf(CompletableFuture<?>... cfs);

public static CompletableFuture

anyOf:只要有一个任务完成

Copy
public static void main(String[] args) {

List<CompletableFuture> futures = Arrays.asList(CompletableFuture.completedFuture("hello"),
                                                CompletableFuture.completedFuture(" world!"),
                                                CompletableFuture.completedFuture(" hello"),
                                                CompletableFuture.completedFuture("java!"));
final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
    futures.stream().forEach(future -> {
        try {
            System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
});

}
作者: ingxx

出处:https://www.cnblogs.com/ingxx/p/12598414.html

相关文章
|
1月前
|
Java
异步技巧之CompletableFuture
异步技巧之CompletableFuture
43 2
|
16天前
|
Java
JAVA线程&线程池&异步编排
JAVA线程&线程池&异步编排
24 0
|
4月前
|
Java
CompletableFuture 异步编排、案例及应用小案例1
CompletableFuture 异步编排、案例及应用小案例
54 0
|
4月前
|
Java
CompletableFuture 异步编排、案例及应用小案例2
CompletableFuture 异步编排、案例及应用小案例
30 0
|
6月前
|
Java 数据库 数据安全/隐私保护
【CompletableFuture事件驱动异步回调】
【CompletableFuture事件驱动异步回调】
|
安全 Java
任务编排:CompletableFuture从入门到精通
最近遇到了一个业务场景,涉及到多数据源之间的请求的流程编排,正好看到了一篇某团介绍CompletableFuture原理和使用的技术文章,主要还是涉及使用层面。网上很多文章涉及原理的部分讲的不是特别详细且比较抽象。因为涉及到多线程的工具必须要理解原理,不然一旦遇到问题排查起来就只能凭玄学,正好借此梳理一下CompletableFuture的工作原理
286 0
|
9月前
|
设计模式 JavaScript 前端开发
CompletableFuture 异步编排
CompletableFuture 异步编排
|
9月前
|
存储 SpringCloudAlibaba Java
Java新特性:异步编排CompletableFuture
CompletableFuture由Java 8提供,是实现异步化的工具类,上手难度较低,且功能强大,支持通过函数式编程的方式对各类操作进行组合编排。 CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步[回调](https://so.csdn.net/so/search?q=回调&spm=1001.2101.3001.7020)、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
171 1
Java新特性:异步编排CompletableFuture
|
9月前
|
JavaScript Java UED
CompletableFuture 异步处理
CompletableFuture 异步处理
79 0
|
Java API
CompletableFuture实现异步编排
场景:电商系统中获取一个完整的商品信息可能分为以下几步:①获取商品基本信息 ②获取商品图片信息 ③获取商品促销活动信息 ④获取商品各种类的基本信息 等操作,如果使用串行方式去执行这些操作,假设每个操作执行1s,那么用户看到完整的商品详情就需要4s的时间,如果使用并行方式执行这些操作,可能只需要1s就可以完成。所以这就是异步执行的好处。
129 0
CompletableFuture实现异步编排

热门文章

最新文章