您的位置:首页 > 其它

并发框架Executor相关类解析

2016-05-03 17:45 363 查看

一.概述

Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们之间的关系如下图所示



二.简单介绍

从现在开始我将结合源码,来为大家一一分析每个类的作用,以便我们在后面的使用中可以很清楚的知道每个类能够起到什么作用,到底是干什么的。

Executor

public interface Executor {
//在未来某个时间执行任务,根据实现类的不同有不同的执行情况
void execute(Runnable command);
}


Executor是一个接口,字面意思就是执行器,没错,它就是用来执行提交的任务的,Executor通常不是用来创建线程的,例如

new Thread(new RunnableTask()).start()


而是采用下面的方式,

executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());


可以看到,重点是执行,不是创建。

Executor没有严格要求任务的执行是同步还是异步,这个是由实现者自己决定的,请看下面的两种实现:

//同步
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}


//新开线程执行
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}


通常情况下我们的任务执行器会决定怎样和什么时候来执行任务,比如在AsyncTask中使用到的串行任务执行器,保证了一个任务执行完毕后才执行下一个任务,具体实现代码如下

class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;

SerialExecutor(Executor executor) {
this.executor = executor;
}

public synchronized void execute(final Runnable r) {
tasks.add(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}}


既然Executor是一个接口,那么肯定有实现类,我们看看几个比较常用的实现类。

ExecutorService

public interface ExecutorService extends Executor {
void shutdown();//执行完之前提交的任务后终止
List<Runnable> shutdownNow();//立刻终止
boolean isShutdown();
boolean isTerminated();
......
}


提供了一些方法来管理任务,并且可以产生Future对象来跟踪任务的进度。

调用submit方法返回一个Future对象,可以用来取消任务的执行



Executors

为Executor提供了一些工厂和工具方法来创建不同的任务执行器,例如

ExecutorService,ScheduledExecutorService,ThreadFactory,Callable

public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
......


Future

Future代表了异步计算的结果,包含了一系列方法来对结果进行操作

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
...


用法如下

ExecutorService executorService = Executors.newCachedThreadPool();
Future future = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "android";
}
});
try {
String result = (String) future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}


FutureTask

继承子Future,代表一个可取消的异步计算任务,提供了方法来开始,取消,并且判断任务是否执行完毕,获取任务执行结果等操作。注意:只有当计算结束后才能获得结果。通过get方法来获取结果,如果计算未结束,get方法会阻塞,一旦计算任务结束,任务就不能在被重启或者取消,除非再次调用runAndReset方法触发计算任务。

public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;       // ensure visibility of callable
}

public boolean isCancelled() {
return state >= CANCELLED;
}

public boolean isDone() {
return state != NEW;
}

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {    // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
......


可以看到FutureTask的代码还是比较多的,有兴趣的大家自己去看,介于篇幅,这里就不全部罗列出来了。

最后说下FutureTask的继承结构,上面可以看到FutureTask继承了RunnableFuture,

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}


RunnableFuture也很简单,就不多说了。

Callable

代表一个带有返回结果的任务,

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}


Callable类似于Runnable,不同之处在于Callable带有返回结果,并且抛出了一个checked异常.Callable通常和FutureTask结合使用,示例如下:

//1.创建Callable对象
Callable callable = new Callable() {
@Override
public Object call() throws Exception {
return null;
}
};
//2.根据callable创建FutureTask对象
FutureTask futureTask = new FutureTask(callable);
//3.根据futuretask创建线程对象
Thread thread = new Thread(futureTask);
thread.start();//4.启动线程


CompletionService

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: