您的位置:首页 > 其它

非主流并发工具之 ForkJoinPool

2015-08-09 14:58 363 查看
原文:http://wankunde.iteye.com/blog/1521042

ForkJoinPool
是 Java SE 7 新功能“分叉/结合框架”的核心类,现在可能乏人问津,但我觉得它迟早会成为主流。分叉/结合框架是一个比较特殊的线程池框架,专用于需要将一个任务不断分解成子任务(分叉),再不断进行汇总得到最终结果(结合)的计算过程。比起传统的线程池类
ThreadPoolExecutor
ForkJoinPool
实现了工作窃取算法,使得空闲线程能够主动分担从别的线程分解出来的子任务,从而让所有的线程都尽可能处于饱满的工作状态,提高执行效率。

ForkJoinPool
提供了三类方法来调度子任务:

execute
系列
异步执行指定的任务。
invoke
invokeAll

执行指定的任务,等待完成,返回结果。
submit
系列
异步执行指定的任务并立即返回一个
Future
对象。

子任务由
ForkJoinTask
的实例来代表。它是一个抽象类,JDK 为我们提供了两个实现:
RecursiveTask
RecursiveAction
,分别用于需要和不需要返回计算结果的子任务。
ForkJoinTask
提供了三个静态的
invokeAll
方法来调度子任务,注意只能在
ForkJoinPool
执行计算的过程中调用它们。

ForkJoinPool
ForkJoinTask
还提供了很多让人眼花缭乱的公共方法,其实它们大多数都是其内部实现去调用的,对于应用开发人员来说意义不大。

下面以统计 D 盘文件个数为例。这实际上是对一个文件树的遍历,我们需要递归地统计每个目录下的文件数量,最后汇总,非常适合用分叉/结合框架来处理:

Java代码


// 处理单个目录的任务

public class CountingTask extends RecursiveTask<Integer> {

private Path dir;

public CountingTask(Path dir) {

this.dir = dir;

}

@Override

protected Integer compute() {

int count = 0;

List<CountingTask> subTasks = new ArrayList<>();

// 读取目录 dir 的子路径。

try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir)) {

for (Path subPath : ds) {

if (Files.isDirectory(subPath, LinkOption.NOFOLLOW_LINKS)) {

// 对每个子目录都新建一个子任务。

subTasks.add(new CountingTask(subPath));

} else {

// 遇到文件,则计数器增加 1。

count++;

}

}

if (!subTasks.isEmpty()) {

// 在当前的 ForkJoinPool 上调度所有的子任务。

for (CountingTask subTask : invokeAll(subTasks)) {

count += subTask.join();

}

}

} catch (IOException ex) {

return 0;

}

return count;

}

}

// 用一个 ForkJoinPool 实例调度“总任务”,然后敬请期待结果……

Integer count = new ForkJoinPool().invoke(new CountingTask(Paths.get("D:/")));

在我的笔记本上,经多次运行这段代码,耗费的时间稳定在 600 豪秒左右。普通线程池(
Executors.newCachedThreadPool()
)耗时 1100 毫秒左右,足见工作窃取的优势。

结束本文前,我们来围观一个最神奇的结果:单线程算法(使用
Files.walkFileTree(...)
)比这两个都快,平均耗时 550 毫秒!这警告我们并非引入多线程就能优化性能,并须要先经过多次测试才能下结论。

转自:http://www.blogjava.net/shinzey/

实际测试

Java代码


package com.forkjoin.countdir;

import java.io.File;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

import com.util.DateTime;

public class CountTaskRecursive extends RecursiveTask {

int sum =0;

File file ;

public CountTaskRecursive(File file) {

this.file = file;

}

@Override

protected Integer compute() {

Integer csum =0;

List<CountTaskRecursive> tasklist = new ArrayList<CountTaskRecursive>() ;

if(file.isDirectory())

{

for(File f:file.listFiles())

{

CountTaskRecursive t = new CountTaskRecursive(f);

tasklist.add(t);

}

}

else

csum ++;

if(!tasklist.isEmpty())

{

for(CountTaskRecursive t :invokeAll(tasklist))

csum += (Integer)t.join();

}

return csum;

}

public static void main(String[] args) {

DateTime dt = new DateTime() ;

System.out.println("系统日期:"+dt.getDate()) ;

System.out.println("中文日期:"+dt.getDateComplete()) ;

System.out.println("时间戳:"+dt.getTimeStamp()) ;

CountTaskRecursive task = new CountTaskRecursive(new File("F:\\私人资料"));

Integer sum = (Integer)new ForkJoinPool().invoke(task);

System.out.println(sum);

dt = new DateTime() ;

System.out.println("系统日期:"+dt.getDate()) ;

System.out.println("中文日期:"+dt.getDateComplete()) ;

System.out.println("时间戳:"+dt.getTimeStamp()) ;

}

}

Java代码


package com.forkjoin.countdir;

import java.io.File;

import java.security.Timestamp;

import java.util.Calendar;

import com.util.DateTime;

public class CountTaskSingle {

static int sum = 0;

public void countDir(File file)

{

if(file.isDirectory())

{

for(File f:file.listFiles())

countDir(f);

}

else

sum++;

}

public static void main(String[] args) {

CountTaskSingle ins = new CountTaskSingle();

DateTime dt = new DateTime() ;

System.out.println("系统日期:"+dt.getDate()) ;

System.out.println("中文日期:"+dt.getDateComplete()) ;

System.out.println("时间戳:"+dt.getTimeStamp()) ;

ins.countDir(new File("F:\\私人资料"));

System.out.println(sum);

dt = new DateTime() ;

System.out.println("系统日期:"+dt.getDate()) ;

System.out.println("中文日期:"+dt.getDateComplete()) ;

System.out.println("时间戳:"+dt.getTimeStamp()) ;

}

}

Java代码


package com.util;

import java.util.*; // 导入需要的工具包

public class DateTime { // 以后直接通过此类就可以取得日期时间

private Calendar calendar = null; // 声明一个Calendar对象,取得时间

public DateTime() { // 构造方法中直接实例化对象

this.calendar = new GregorianCalendar();

}

public String getDate() { // 得到的是一个日期:格式为:yyyy-MM-dd HH:mm:ss.SSS

// 考虑到程序要频繁修改字符串,所以使用StringBuffer提升性能

StringBuffer buf = new StringBuffer();

buf.append(calendar.get(Calendar.YEAR)).append("-"); // 增加年

buf.append(this.addZero(calendar.get(Calendar.MONTH) + 1, 2)).append("-"); // 增加月

buf.append(this.addZero(calendar.get(Calendar.DAY_OF_MONTH), 2)).append(" "); // 取得日

buf.append(this.addZero(calendar.get(Calendar.HOUR_OF_DAY), 2)).append(":"); // 取得时

buf.append(this.addZero(calendar.get(Calendar.MINUTE), 2)).append(":");

buf.append(this.addZero(calendar.get(Calendar.SECOND), 2)).append(".");

buf.append(this.addZero(calendar.get(Calendar.MILLISECOND), 3));

return buf.toString();

}

public String getDateComplete() { // 得到的是一个日期:格式为:yyyy年MM月dd日 HH时mm分ss秒SSS毫秒

// 考虑到程序要频繁修改字符串,所以使用StringBuffer提升性能

StringBuffer buf = new StringBuffer();

buf.append(calendar.get(Calendar.YEAR)).append("年"); // 增加年

buf.append(this.addZero(calendar.get(Calendar.MONTH) + 1, 2)).append("月"); // 增加月

buf.append(this.addZero(calendar.get(Calendar.DAY_OF_MONTH), 2)).append("日"); // 取得日

buf.append(this.addZero(calendar.get(Calendar.HOUR_OF_DAY), 2)).append("时"); // 取得时

buf.append(this.addZero(calendar.get(Calendar.MINUTE), 2)).append("分"); // 取得分

buf.append(this.addZero(calendar.get(Calendar.SECOND), 2)).append("秒"); // 取得秒

buf.append(this.addZero(calendar.get(Calendar.MILLISECOND), 3)).append("毫秒"); // 取得毫秒

return buf.toString();

}

public String getTimeStamp() { // 得到的是一个时间戳

// 考虑到程序要频繁修改字符串,所以使用StringBuffer提升性能

StringBuffer buf = new StringBuffer();

buf.append(calendar.get(Calendar.YEAR)); // 增加年

buf.append(this.addZero(calendar.get(Calendar.MONTH) + 1, 2)); // 增加月

buf.append(this.addZero(calendar.get(Calendar.DAY_OF_MONTH), 2)); // 取得日

buf.append(this.addZero(calendar.get(Calendar.HOUR_OF_DAY), 2)); // 取得时

buf.append(this.addZero(calendar.get(Calendar.MINUTE), 2)); // 取得分

buf.append(this.addZero(calendar.get(Calendar.SECOND), 2)); // 取得秒

buf.append(this.addZero(calendar.get(Calendar.MILLISECOND), 3)); // 取得毫秒

return buf.toString();

}

// 考虑到日期中存在前导0,所以在此处加上补零的方法

private String addZero(int num, int len) {

StringBuffer s = new StringBuffer();

s.append(num);

while (s.length() < len) { // 如果长度不足,则继续补0

s.insert(0, "0"); // 在第一个位置处补0

}

return s.toString();

}

};

执行结果:

单线程执行结果

系统日期:2012-05-09 23:59:07.468

中文日期:2012年05月09日23时59分07秒468毫秒

时间戳:20120509235907468

389578

系统日期:2012-05-09 23:59:56.421

中文日期:2012年05月09日23时59分56秒421毫秒

时间戳:20120509235956421

多线程执行结果

系统日期:2012-05-10 00:01:57.500

中文日期:2012年05月10日00时01分57秒500毫秒

时间戳:20120510000157500

389578

系统日期:2012-05-10 00:09:00.281

中文日期:2012年05月10日00时09分00秒281毫秒

时间戳:20120510000900281
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: