Java8 CompletableFuture 编程

简介: Java8 CompletableFuture 编程阅读目录一、简介二、CompletableFuture 使用回到顶部一、简介 所谓异步调用其实就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法。

Java8 CompletableFuture 编程
阅读目录

一、简介
二、CompletableFuture 使用
回到顶部
一、简介
 所谓异步调用其实就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

 JDK5新增了 Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

private static final ExecutorService POOL = Executors.newFixedThreadPool(TASK_THRESHOLD, new ThreadFactory() {

    AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "demo15-" + atomicInteger.incrementAndGet());
    }
});

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Future<Integer> submit = POOL.submit(() -> 123);
    // 1. get() 方法用户返回计算结果,如果计算还没有完成,则在get的时候会进行阻塞,直到获取到结果为止
    Integer get = submit.get();
    // 2. isDone() 方法用于判断当前Future是否执行完成。
    boolean done = submit.isDone();
    // 3. cancel(boolean mayInterruptIfRunning) 取消当前线程的执行。参数表示是否在线程执行的过程中阻断。
    boolean cancel = submit.cancel(true);
    // 4. isCancelled() 判断当前task是否被取消.
    boolean cancelled = submit.isCancelled();
    // 5. invokeAll 批量执行任务
    Callable<String> callable = () -> "Hello Future";
    List<Callable<String>> callables = Lists.newArrayList(callable, callable, callable, callable);
    List<Future<String>> futures = POOL.invokeAll(callables);
}

 在Java8中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

tips: CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。

回到顶部
二、CompletableFuture 使用

  1. runAsync、supplyAsync
    // 无返回值

public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
// 有返回值
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
runAsync、supplyAsync 方法是 CompletableFuture 提供的创建异步操作的方法。需要注意的是,如果没有指定 Executor 作为线程池,将会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码;如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

public class Demo1 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println(123));

    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "CompletableFuture");
    System.out.println(supplyAsync.get());
}

}

  1. whenComplete、exceptionally
    // 执行完成时,当前任务的线程执行继续执行 whenComplete 的任务。

public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
// 执行完成时,把 whenCompleteAsync 这个任务提交给线程池来进行执行。
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function fn)
当 CompletableFuture 的计算完成时,会执行 whenComplete 方法;当 CompletableFuture 计算中抛出异常时,会执行 exceptionally 方法。

public class Demo2 {

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> runAsync = CompletableFuture.supplyAsync(() -> 123456);
    runAsync.whenComplete((t, throwable) -> {
        System.out.println(t);
        if (throwable != null) {
            throwable.printStackTrace();
        }
    });
    runAsync.whenCompleteAsync((t, throwable) -> {
        System.out.println(t);
        if (throwable != null) {
            throwable.printStackTrace();
        }
    });
    runAsync.exceptionally((throwable) -> {
        if (throwable != null) {
            throwable.printStackTrace();
        }
        return null;
    });
    TimeUnit.SECONDS.sleep(2);
}

}

  1. thenApply、handle
    // T:上一个任务返回结果的类型

// U:当前任务的返回值类型

public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化

handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public class Demo3 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    // thenApply
    CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(() -> 123).thenApply(t -> t * t);
    System.out.println(thenApply.get());

   // handle
    CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
        int i = 10 / 0;
        return new Random().nextInt(10);
    }).handle((t, throwable) -> {
        if (throwable != null) {
            throwable.printStackTrace();
            return -1;
        }
        return t * t;
    });
    System.out.println(handle.get());
}

}

  1. thenAccept、thenRun
  2. CompletionStage thenAccept(Consumer<? super T> action);

public CompletionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStage thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
thenAccept 接收任务的处理结果,并消费处理。无返回结果。

thenRun 跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun。

public class Demo4 {

public static void main(String[] args) {
    // thenAccept
    CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenAccept(System.out::println);

   // thenRun
    CompletableFuture<Void> thenRun = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenRun(() -> System.out.println(123));
}

}

  1. thenCombine、thenAcceptBoth
    // T 表示第一个 CompletionStage 的返回结果类型

// U 表示第二个 CompletionStage 的返回结果类型
// V表示 thenCombine/thenAcceptBoth 处理结果类型
public CompletionStage thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public CompletionStage thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
thenCombine、thenAcceptBoth 都是用来合并任务 —— 等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值。

public class Demo5 {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    // thenCombine
    CompletableFuture<String> thenCombine = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
            .thenCombine(CompletableFuture.supplyAsync(() -> "str"),
                    // 第一个参数是第一个 CompletionStage 的处理结果
                    // 第二个参数是第二个 CompletionStage 的处理结果
                    (i, s) -> i + s
            );
    System.out.println(thenCombine.get());

    // thenAcceptBoth 
    CompletableFuture<Void> thenAcceptBoth = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
            .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "str"), 
                    (i, s) -> System.out.println(i + s));
}

}

  1. applyToEither、acceptEither、runAfterEither、runAfterBoth
    applyToEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,有返回值。

acceptEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,无返回值。
runAfterEither:两个 CompletionStage,任何一个完成了,都会执行下一步的操作(Runnable),无返回值。
runAfterBoth:两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable),无返回值。
由于这几个方法含义相近,使用更加类似,我们就以 applyToEither 来介绍...

// T 两个 CompletionStage 组合运算后的结果类型
// U 下一步处理运算的结果返回值类型
public CompletionStage applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
public class Demo6 {

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> applyToEither = CompletableFuture.supplyAsync(() -> {
        int nextInt = new Random().nextInt(10);
        try {
            Thread.sleep(nextInt);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("f1=" + nextInt);
        return nextInt;
    }).applyToEither(CompletableFuture.supplyAsync(() -> {
        int nextInt = new Random().nextInt(10);
        try {
            Thread.sleep(nextInt);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("f2=" + nextInt);
        return nextInt;
    }), i -> i);

    System.out.println(applyToEither.get());
}

}

  1. thenCompose
  2. CompletableFuture thenCompose(Function<? super T, ? extends CompletionStage> fn);

public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn) ;
public CompletableFuture thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor) ;
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public class Demo7 {

public static void main(String[] args) throws ExecutionException, InterruptedException {

    CompletableFuture<Integer> thenCompose = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
            .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * i));
    System.out.println(thenCompose.get());

}

}

参考博文:https://www.jianshu.com/p/6bac52527ca4
原文地址https://www.cnblogs.com/jmcui/p/11335445.html

相关文章
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
13天前
|
Java 调度
Java并发编程:深入理解线程池的原理与实践
【4月更文挑战第6天】本文将深入探讨Java并发编程中的重要概念——线程池。我们将从线程池的基本原理入手,逐步解析其工作过程,以及如何在实际开发中合理使用线程池以提高程序性能。同时,我们还将关注线程池的一些高级特性,如自定义线程工厂、拒绝策略等,以帮助读者更好地掌握线程池的使用技巧。
|
17天前
|
存储 NoSQL Java
Java数据库编程指南:实现高效数据存储与访问
【4月更文挑战第2天】Java开发者必须掌握数据库编程,尤其是JDBC,它是连接数据库的标准接口。使用Spring JDBC或JPA能简化操作。选择合适的JDBC驱动,如MySQL Connector/J,对性能至关重要。最佳实践包括事务管理、防SQL注入、优化索引和数据库设计。NoSQL数据库如MongoDB也日益重要,Java有对应的驱动支持。理解这些概念和技术是构建高效数据库应用的基础。
Java数据库编程指南:实现高效数据存储与访问
|
13天前
|
设计模式 安全 Java
Java并发编程实战:使用synchronized关键字实现线程安全
【4月更文挑战第6天】Java中的`synchronized`关键字用于处理多线程并发,确保共享资源的线程安全。它可以修饰方法或代码块,实现互斥访问。当用于方法时,锁定对象实例或类对象;用于代码块时,锁定指定对象。过度使用可能导致性能问题,应注意避免锁持有时间过长、死锁,并考虑使用`java.util.concurrent`包中的高级工具。正确理解和使用`synchronized`是编写线程安全程序的关键。
|
15天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
11天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
7天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
11天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
13天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
3天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。