您的位置:首页 > 编程语言 > Java开发

Java并发编程-Fork/Join框架同步与异常

2016-06-11 17:19 591 查看
Fork/Join框架介绍

前面学习了Executor框架,它将任务的创建和执行进行了分离,通过这个框架,只需要创建Runnable对象和Executor对象,把Runnable对象交给Executor执行器,Executor执行器在负者执行这些任务的线程创建,管理以及线程的结果。

Java 7 Concurrent包提供了一个新的Fork/join框架。是ExecutorService接口的另一种实现。Fork/join框架用来解决能够通过分治技术将问题划分为小任务的问题,能够充分的利用计算机的资源,类似于MapReduce。如果问题的大小超过了一个设定值,就把任务划分成小任务,通过框架执行这些小问题,如果问题的大小小于一个设定值,就立即执行任务。

工作窃取算法

Fork/join框架与Executor框架主要区别是工作窃取算法。与Executor框架不同,join操作会让主任务等待它创建的子任务执行完,执行这个主任务的线程叫做工作者线程。工作窃取算法 就是这个工作者线程在等待时去寻找未被执行的任务执行,进而提升性能。

Fork/Join的核心操作

Fork/Join框架核心由两个类组成:

ForkJoinPool:这个类实现ExecutorService和工作窃取算法,它对工作线程进行管理。

ForkJoinTask:这个类时可以在ForkJoinPool执行任务的基类,所有子类需要重写compute()方法,在该方法中进行任务的划分和执行。这个类包括两个实现类:RecursiveTask用于任务用返回值得场景,RecursiveAction用于没有返回值得场景。

同步与异步:

采用同步方式,调用同步方法(如:invokeAll()),任务会被挂起,直到在Fork/Join的任务执行完成。所以这种方式ForkJoinPool可以采用工作窃取算法。

异步方式,调用异步方法(如:fork()),任务将继续执行,所以这种方式ForkJoinPool不能采用工作窃取算法。

join()和get()方法的区别:

这两个方法都是等待任务结束,获取返回结果。

join(),不能被中断,如果中断,方法将会抛出InterruptException异常,

如果任务抛出任务运行时异常,get()方法将会抛出ExecutionException,而join会抛出RuntimeException异常。

使用

下面例子是计算1-10000相加的任务,设定任务的阀值为1000,任务会被分成10小任务执行。

package MyThread;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTest{
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
CountTask task = new CountTask(1,10000);
ForkJoinTask<Integer> submit = pool.submit(task);
System.out.println("Final result:" + submit.get());
}
}
class CountTask extends RecursiveTask<Integer>{

private static final long serialVersionUID = 1L;
//阈值
private static final int THRESHOLD = 1000;
//起始值
private int start;
//结束值
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
boolean compute = (end - start) <= THRESHOLD;
int res = 0;
if (compute){
for (int i = start; i <= end; i++){
res += i;
}
}else {
//如果长度大于阈值,则分割为小任务
int mid = (start + end) / 2;
CountTask task1 = new CountTask(start,mid);
CountTask task2 = new CountTask(mid + 1, end);
//计算小任务的值
task1.fork();
task2.fork();
//得到两个小任务的值
int task1Res = task1.join();
int task2Res = task2.join();
res = task1Res + task2Res;
}
return res;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: