美文网首页
Java CompletableFuture 介绍

Java CompletableFuture 介绍

作者: wind_sky | 来源:发表于2019-08-08 10:11 被阅读0次

在之前的文章中介绍了一下Future 功能,然而实际开发中,我们经常需要达成以下目的:

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。

  • 等待 Future 集合中的所有任务都完成。

  • 仅等待 Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。

  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。

  • 应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)

这个时候Future 就体现出了它的局限性,它很难直接表述多个Future 结果之间的依赖性,同时在之前的示例中我们也看到,要想取出Future 的结果值,我们一般需要轮询isDone,确认完成后,调用get()获取值,或者调用get()设置一个超时时间。但是这两种方式都会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。为了解决这些问题,Java 8 增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。

CompletableFuture 介绍

CompletableFuture 类,实现了Future<T>, CompletionStage<T>两个接口。

当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当两个及以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。

CompletableFuture 实现了 CompletionStage 接口的如下策略:

  • 为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。

  • 没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。

  • 所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖。

CompletableFuture 实现了 Futurre 接口的如下策略:

  • CompletableFuture 无法直接控制完成,所以cancel 操作被视为是另一种异常完成形式。方法isCompletedExceptionally 可以用来确定一个CompletableFuture 是否以任何异常的方式完成。

  • 以一个CompletionException 为例,方法get() 和get(long,TimeUnit) 抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join() 和getNow ,而不是直接在这些情况中直接抛出CompletionException。

以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)  
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)  
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)  
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)  

其中supplyAsync用于有返回值的任务,runAsync 则用于没有返回值的任务。Executor 参数可以手动指定线程池,否则默认 ForkJoinPool.commonPool() 系统级公共线程池,
注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止

supplyAsync 方法以Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为U。方法的参数类型都是函数式接口,所以可以使用lambda 表达式实现异步任务。例如:

CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);         // 模拟耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        });
获取计算结果:

因为CompletableFuture 实现了Future 接口,所以也可以像Future 那样取结果,即通过get 方法获取,在CompletableFuture 中新增了getNow(T valueIfAbsent) 方法,接收的参数是该方法返回的默认值,即如果任务未完成 或者 内部抛出异常时返回的值。

CompletableFuture 还提供了join 方法来获取计算结果,join 方法会获得任务结束时的结果或者抛出一个异常,如果任务内发生了异常,join 方法将抛出一个CompletionException 异常。例如对上例取值:

String a = cfs.get();       // 会等待直到任务结束并拿到返回值
String b = cfs.get(2, TimeUnit.SECONDS);    // 带超时时间的get
String c = cfs.join();      // 效果与get 类似
String d = cfs.getNow("hhh");   // 调用这个方法时,如果任务未完成会立即返回 hhh,但是如果在之前调用join 会等待任务完成,如果内部抛出异常也返回 hhh

当CompletableFuture 的计算结果完成或抛出异常时,可以执行特定的Action,主要有以下方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)  
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)  
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)  
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T, ? super Throwable> 它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。例如:

CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);         // 模拟耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

//          int a = 10/0;                   // 会抛出异常
            return "hello world";
        }).whenComplete((res, exc) -> {
            if (exc != null) {
                System.out.println(exc.getMessage());   // 若内部抛出异常,会执行这句
            } else {
                System.out.println(res);                // 若无异常,会得到计算结果
            }
        });

注意:以上代码如果直接执行,并不会执行到whenComplete,也不会抛出异常,因此需要调用get、join 等取结果的方法,才会执行whenComplete。

转换:

CompletableFuture 既可以作为monad(单子),也可以作为 functor(因子) 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个Function。

转换相关的方法,包括

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。例如:

CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "hello world";
        }, es);

        CompletableFuture<Integer> cfi = cfs.thenApply(String::length);

        System.out.println(cfi.get());          // 输出 hello world 长度 11

除了转换之外还有一些其他的方法,可以参考这篇文章 https://www.jianshu.com/p/6f3ee90ab7d3

在上面的这篇文章中漏写了一个方法,即结合(链接)这两个Futures —— thenCompose(),有时你想运行一些future的值(当它准备好了),但这个函数也返回了future。CompletableFuture足够灵活地明白我们的函数结果现在应该作为顶级的future,对比CompletableFuture<CompletableFuture>。方法 thenCompose()相当于Scala的flatMap。

函数定义如下:

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

使用示例:

        ExecutorService es = Executors.newCachedThreadPool();

        CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello world";
        }, es);

        CompletableFuture<Integer> cfi = cfs.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return str.length();
        }, es));

        System.out.println(cfi.get());          // 输出 hello world 长度 11

可能看起来跟thenApply 方法类似,但实际是不同的,下面这个例子可能看的更清楚一些

CompletableFuture<Document> docFuture = //...

CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance);

CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance);

private CompletableFuture<Double> calculateRelevance(Document doc)  //...
多个CompletableFuture 任务:

JDK 提供了多任务组合方法allOf和anyOf,

  • allOf 是等待所有任务完成,构造后CompletableFuture完成;
  • anyOf 是只要有一个任务完成,构造后CompletableFuture就完成;下面看一下方法定义:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

1)anyOf 使用示例:

CompletableFuture<String> task0 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task0";
        }, es);

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task1";
        }, es);

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task2";
        }, es);

        CompletableFuture<Object> co = CompletableFuture.anyOf(task0, task1, task2);

        System.out.println(co.get());       // 输出 I am task1,因为task1 最先完成

anyOf 的返回值就是一组CompletableFuture 中最快完成那一个,注意,虽然anyOf 返回的是最快的一个CompletableFuture ,但是其他任务是仍然会执行完的。

2)allOf 使用示例

        List<String> res = new ArrayList<>();               // 保存三个task 的执行结果

        CompletableFuture<String> task0 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task0";
        }, es).whenComplete((s, e) -> res.add(s));      // 将执行结果添加到列表

        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task1";
        }, es).whenComplete((s, e) -> res.add(s));

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return "I am task2";
        }, es).whenComplete((s, e) -> res.add(s));

        CompletableFuture.allOf(task0, task1, task2).join();

        res.forEach(System.out::println);               // 输出结果集
            /* output:
      I am task1
            I am task0
        I am task2
        */

allOf 方法会等待所有任务执行完毕并返回,所以他的返回值是CompletableFuture<Void> ,这个是没办法取到结果的,或者说取到的结果是null,所以在上例中,我们对每个任务完成后进行结果保存操作。在实际应用中可以根据不同情况灵活应用。

相关文章

网友评论

      本文标题:Java CompletableFuture 介绍

      本文链接:https://www.haomeiwen.com/subject/efetjctx.html