在之前的文章中介绍了一下Future 功能,然而实际开发中,我们经常需要达成以下目的:
-
将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
-
等待 Future 集合中的所有任务都完成。
-
仅等待 Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
-
通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
-
应对 Future 的完成事件(即当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)
这个时候Future 就体现出了它的局限性,它很难直接表述多个Future 结果之间的依赖性,同时在之前的示例中我们也看到,要想取出Future 的结果值,我们一般需要轮询isDone,确认完成后,调用get()获取值,或者调用get()设置一个超时时间。但是这两种方式都会阻塞住调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。为了解决这些问题,Java 8 增加了一个包含50个方法左右的类:CompletableFuture. 提供了非常强大的Future的扩展功能,可以帮助简化异步编程的复杂性,提供了函数式编程能力,可以通过回调的方式计算处理结果,并且提供了转换和组织CompletableFuture的方法。
CompletableFuture 介绍
CompletableFuture 类,实现了Future<T>, CompletionStage<T>两个接口。
当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当两个及以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。
CompletableFuture 实现了 CompletionStage 接口的如下策略:
-
为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。
-
没有显式入参Executor的所有async方法都使用ForkJoinPool.commonPool()为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask的实例。
-
所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖。
CompletableFuture 实现了 Futurre 接口的如下策略:
-
CompletableFuture 无法直接控制完成,所以cancel 操作被视为是另一种异常完成形式。方法isCompletedExceptionally 可以用来确定一个CompletableFuture 是否以任何异常的方式完成。
-
以一个CompletionException 为例,方法get() 和get(long,TimeUnit) 抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join() 和getNow ,而不是直接在这些情况中直接抛出CompletionException。
以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
其中supplyAsync用于有返回值的任务,runAsync 则用于没有返回值的任务。Executor 参数可以手动指定线程池,否则默认 ForkJoinPool.commonPool() 系统级公共线程池,
注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止。
supplyAsync 方法以Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为U。方法的参数类型都是函数式接口,所以可以使用lambda 表达式实现异步任务。例如:
CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000); // 模拟耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
});
获取计算结果:
因为CompletableFuture 实现了Future 接口,所以也可以像Future 那样取结果,即通过get 方法获取,在CompletableFuture 中新增了getNow(T valueIfAbsent) 方法,接收的参数是该方法返回的默认值,即如果任务未完成 或者 内部抛出异常时返回的值。
CompletableFuture 还提供了join 方法来获取计算结果,join 方法会获得任务结束时的结果或者抛出一个异常,如果任务内发生了异常,join 方法将抛出一个CompletionException 异常。例如对上例取值:
String a = cfs.get(); // 会等待直到任务结束并拿到返回值
String b = cfs.get(2, TimeUnit.SECONDS); // 带超时时间的get
String c = cfs.join(); // 效果与get 类似
String d = cfs.getNow("hhh"); // 调用这个方法时,如果任务未完成会立即返回 hhh,但是如果在之前调用join 会等待任务完成,如果内部抛出异常也返回 hhh
当CompletableFuture 的计算结果完成或抛出异常时,可以执行特定的Action,主要有以下方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的类型是BiConsumer<? super T, ? super Throwable> 它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。例如:
CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000); // 模拟耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
// int a = 10/0; // 会抛出异常
return "hello world";
}).whenComplete((res, exc) -> {
if (exc != null) {
System.out.println(exc.getMessage()); // 若内部抛出异常,会执行这句
} else {
System.out.println(res); // 若无异常,会得到计算结果
}
});
注意:以上代码如果直接执行,并不会执行到whenComplete,也不会抛出异常,因此需要调用get、join 等取结果的方法,才会执行whenComplete。
转换:
CompletableFuture 既可以作为monad(单子),也可以作为 functor(因子) 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个Function。
转换相关的方法,包括
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
关键的入参只有一个Function,它是函数式接口,所以使用Lambda表示起来会更加优雅。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。例如:
CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}, es);
CompletableFuture<Integer> cfi = cfs.thenApply(String::length);
System.out.println(cfi.get()); // 输出 hello world 长度 11
除了转换之外还有一些其他的方法,可以参考这篇文章 https://www.jianshu.com/p/6f3ee90ab7d3。
在上面的这篇文章中漏写了一个方法,即结合(链接)这两个Futures —— thenCompose(),有时你想运行一些future的值(当它准备好了),但这个函数也返回了future。CompletableFuture足够灵活地明白我们的函数结果现在应该作为顶级的future,对比CompletableFuture<CompletableFuture>。方法 thenCompose()相当于Scala的flatMap。
函数定义如下:
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
使用示例:
ExecutorService es = Executors.newCachedThreadPool();
CompletableFuture<String> cfs = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}, es);
CompletableFuture<Integer> cfi = cfs.thenCompose(str -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str.length();
}, es));
System.out.println(cfi.get()); // 输出 hello world 长度 11
可能看起来跟thenApply 方法类似,但实际是不同的,下面这个例子可能看的更清楚一些
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance);
private CompletableFuture<Double> calculateRelevance(Document doc) //...
多个CompletableFuture 任务:
JDK 提供了多任务组合方法allOf和anyOf,
- allOf 是等待所有任务完成,构造后CompletableFuture完成;
- anyOf 是只要有一个任务完成,构造后CompletableFuture就完成;下面看一下方法定义:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
1)anyOf 使用示例:
CompletableFuture<String> task0 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task0";
}, es);
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task1";
}, es);
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task2";
}, es);
CompletableFuture<Object> co = CompletableFuture.anyOf(task0, task1, task2);
System.out.println(co.get()); // 输出 I am task1,因为task1 最先完成
anyOf 的返回值就是一组CompletableFuture 中最快完成那一个,注意,虽然anyOf 返回的是最快的一个CompletableFuture ,但是其他任务是仍然会执行完的。
2)allOf 使用示例
List<String> res = new ArrayList<>(); // 保存三个task 的执行结果
CompletableFuture<String> task0 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task0";
}, es).whenComplete((s, e) -> res.add(s)); // 将执行结果添加到列表
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task1";
}, es).whenComplete((s, e) -> res.add(s));
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "I am task2";
}, es).whenComplete((s, e) -> res.add(s));
CompletableFuture.allOf(task0, task1, task2).join();
res.forEach(System.out::println); // 输出结果集
/* output:
I am task1
I am task0
I am task2
*/
allOf 方法会等待所有任务执行完毕并返回,所以他的返回值是CompletableFuture<Void> ,这个是没办法取到结果的,或者说取到的结果是null,所以在上例中,我们对每个任务完成后进行结果保存操作。在实际应用中可以根据不同情况灵活应用。
网友评论