您的位置:首页 > 其它

线程池系列--ExecutorService接口:任务提交

2016-04-09 00:00 411 查看
我关注的任务提交方法为:invokeAll, submit, executor

1 单任务提交:submit , executor

AbstractExecutorService 类的方法实现

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
//1 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
//2 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
//3 生成一个 RunnableFuture 对象,并调用 Executor接口中的void execute(Runnable command) 函数
execute(ftask);
return ftask;
}

不论何种形式的submit函数,都是把task封装成一个RunnableFuture对象,并调用execute函数。

//4 newTaskFor 函数,把Runnable,Callable封装成统一的FutureTask对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

那么FutureTask和RunnableFuture的关系是什么?

可知,FutureTask 是 RunnableFuture 的实现类。FutureTask对象既有任务,又会有任务执行结果。

//5  RunnableFuture 接口继承了Runnable, Future
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

public class FutureTask<V> implements RunnableFuture<V> {
//6 构造函数,把Callable 封装成 FutureTask
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}
//7构造函数,把Runnable封装成FutrueTask
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;       // ensure visibility of callable
}
}

2 批量提交 invokeAll

//总体思路: 一一提交任务, 等待返回结果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 存放线程的执行结果
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
//不同的线程池,不同的实现方式
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 是否线程执行完毕, 如果没有执行完毕,则调用get()
if (!f.isDone()) {
try {
f.get();//阻塞至执行完毕
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
// 只有所有的线程都执行完, 才返回
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: