前言
在开发过程中,有些应用使用了大量线程,但其中大多数线程都是空闲的,比如:一个Web服务器可能会为每个连接分别使用一个线程。然而,另一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像或者视频处理。Java SE7新引入了fork-join框架(ps:fork是分叉的意思,在C语言中可以使用fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程),专门用来支持第二类应用,拿图像处理来说,要增强一个图像,可以变换上半部分和下半部分。如果有足够多空闲的处理器,这些操作就可以并行。顾名思义,Fork-Join框架运用了Fork/Join原理,使用“分而治之”的思想,我们可以使用ForkJoinPool自主地将大任务分拆成小任务分配给多个线程执行,最后再合并起来得到最终结果,加快运算,用伪代码描述下就是:
if(任务很小){
直接计算得到结果
}else{
分拆成N个子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
简单使用
package Concurrent;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.function.DoublePredicate;
public class ForkThreadPoolTest {
public static void main(String[] args) {
final int SIZE = 10000000;
double [] numbers = new double[SIZE];
for (int i=0;i<SIZE;i++){
numbers[i] = Math.random();
}
Counter counter = new Counter(numbers, 0, SIZE, x -> x > 0.5);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(counter);
Integer result = counter.join();//获取的结果,该方法会阻塞直到获取到结果
System.out.println(result);
}
static class Counter extends RecursiveTask<Integer>{
private static final int THRESHOLD = 1000;
private double [] values;
private int from;
private int to;
private DoublePredicate filter;
public Counter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
@Override
protected Integer compute() {
if ((to-from)<THRESHOLD){
//小于阈值
int count =0;
for (int i=from;i<to;i++){
if (filter.test(values[i])){
count++;
}
}
return count;
}else {
//二分法
int mid = (to+from)/2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first,second);
return first.join()+second.join();
}
}
}
}
网友评论