您的位置:首页 > 编程语言 > Java开发

《Java并发编程实践》笔记8——Fork/Join框架

2015-07-02 10:57 519 查看
《java并发编程实践》书中并没有介绍Fork/Join框架,该框架和并发编程关系密切,也是由本书作者之一Doug Lea在JDK1.7引入,因此作为最后一篇笔记简单介绍。

Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,它把一个大任务分割为若干个小任务执行,最后将小任务的执行结果汇总得到大任务的结果。当看到这个定义时,我想很多人会和我一样立刻想到现在非常热门的Map-Reduce思想。二者的对比如下:

共同点:

都是用于执行并行任务的。基本思想都是把复杂问题分解为一个个简单的子问题分别计算,再合并结果。应该说并行计算都是这种思想,彼此独立的或可分解的。从名字上看Fork和Map都有切分的意思,Join和Reduce都有合并的意思,比较类似。

区别:

(1).环境差异:

分布式 vs 单机多核:Fork/Join设计初衷针对单机多核(处理器数量很多的情况)。Map-Reduce一开始就明确是针对很多机器组成的集群环境的。也就是说一个是想充分利用多处理器,而另一个是想充分利用很多机器做分布式计算。这是两种不同的的应用场景,有很多差异,因此在细的编程模式方面有很多不同。

(2).编程差异:

Map-Reduce一般是:做较大粒度的切分,一开始就先切分好任务然后再执行,并且彼此间在最后合并之前不需要通信。这样可伸缩性更好,适合解决巨大的问题,但限制也更多。Fork/Join可以是较小粒度的切分,任务自己知道该如何切分自己,递归地切分到一组合适大小的子任务来执行,因为是同一个JVM内,所以彼此间通信是很容易的,更像是传统编程方式。

综上所述,我们可以认为Fork/Join是同一个JVM内的Map-Reduce的一种实现方式。

Fork/Join的基本工作流程如下图:



Fork-Join框架的实现原理为:

首先,分割任务,只要任务的粒度超过阀值,就不停地将任务分拆为小任务;

然后,将分割的任务任务添加到双端队列中,启动线程从双端队列获取任务执行,将执行结果统一放到一个队列中;

最后,再启动一个线程合并结果队列的值。

Fork-Join框架涉及的主要类如下:



RecursiveAction:用于没有返回值的任务。

RecursiveTask:用于需要返回值的任务,通过泛型参数设置计算的返回值类型。

ForkJoinPool:提供了一系列的submit方法,计算ForkJoinTask(需要实现computer方法)。

我们以计算1+2+3+4结果为例,来演示Fork/Join的使用,我们设置阀值为2,则由于是4个数字相加,因此Fork/Join框架会分拆为两个子任务,一个计算1+2,另一个计算3+4,最后在把两个子任务的结果合并,例子代码如下:

public class CounterTask extends RecursiveTask<Integer>{
private static final int THRESHOLD = 2;
private int start;
private int end;

public CounterTask(int start, int end){
this.start = start;
this.end = end;
}

protected Integer computer(){
int sum = 0;
boolean canComputer = (end - start) <= THRESHOLD;
if(canComputer){
for(int i = start; i <= end; i++){
sum += i;
}
}else{
int middle = (start + end) / 2;
CounterTask leftTask = new CounterTask(start, middle);
CounterTask rightTask = new CounterTask(middle + 1, end);
leftTask.fork();
rightTask.fork();
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool();
CounterTask task = new CounterTask(1, 4);
Future<Integer> result = pool.submit(task);
try{
System.out.println("result is:" + result.get());
}catch(InterruptedException e){
}catch(ExecutionException e){
}finally{
pool.shutdown();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: