您的位置:首页 > 编程语言 > Java开发

Callable、Future、FutureTask 分析

2016-08-26 16:33 281 查看
jdk 自1.5开始提供了Callable 接口,用来满足一个带有返回值的线程调用。

@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}


该接口只有一个call 泛型 方法,返回一个传入的泛型,并且是可抛异常的,Callable通常配合Future一起使用。

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}


cancel : 尝试取消执行的任务。返回true,表示任务取消成功,在下列情况下将返回 false:任务已经完成、任务已经取消、因为其他原因不能取消。如果当前任务已经启动运行(NEW),那么参数 mayInterruptIfRunning 将决定是否线程可以被中断。

isCancelled:返回线程在正常结束之前是否被取消。

isDone:返回线程是否完成。完成可能是因为:正常结束,异常发生或者是被取消。

get: 这个方法将会阻塞,直到计算结束并返回结果(泛型,通常和Callable 一起使用,返回Callable的泛型)。

V get(long timeout, TimeUnit unit):等待相应的时间。

Future是一个接口,得有相应的实现类去处理这些方法,目前这个接口的实现类有:FutureTask,ForkJoinTask,今天讲解一下FutureTask。

public class FutureTask<V> implements RunnableFuture<V>


FutureTask 实现了RunnableFuture接口

public interface RunnableFuture<V> extends Runnable, Future<V>{
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}


RunnableFuture 继承了Runnable和Future,也就是说RunnableFuture既能处理Runnable,也能处理Future,所以在FutureTask有两个构造器分别处理了Callable和Runnable的实现。

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}


public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;       // ensure visibility of callable
}


这两个构造函数最终都是对callable,state 这两个变量作赋值。

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/**
* The run state of this task, initially NEW.  The run state
* transitions to a terminal state only in methods set,
* setException, and cancel.  During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


callable:底层调用的Callable实例。

state:任务运行的状态,值为0-6,初始为NEW。运行状态state在方法
set(V v)
setException(Throwable t)
cancel(boolean mayInterruptIfRunning)
中将被设置成一些终端的状态值。在线程任务执行到结束期间,状态值可能会被设置成临时的COMPLETING状态(当outcome 属性正在被设置时)或者是INTERRUPTING(只有当在调用canel(true) 方法使线程任务被中断)。这些中间改变的状态值将被设置成final类型,因为这些中间状态值都是唯一的,不可改变。状态改变的可能性变化:

NEW -> COMPLETING -> NORMAL

NEW -> COMPLETING -> EXCEPTIONAL

NEW -> CANCELLED

NEW -> INTERRUPTING -> INTERRUPTED

一般我们使用比较多的是Callable作为参数的构造器,举个栗子来说明上述状态的改变:

/**
* @author peanut
*/
public class ThreadDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new CallableTest());
new Thread(futureTask).start();
//futureTask.cancel(false);
// futureTask.cancel(true);
Object o = futureTask.get();
System.out.println(o);
}
}

class CallableTest implements Callable {
@Override
public Object call() throws Exception {
System.out.println("in。。。");
return 123456;
}
}


最终程序打印:

in。。。
123456


其中get()方法将阻塞至call()方法返回为止,接下来我们来分析一下里面各状态的变化。

首先:构建一个FutureTask对象,入参为CallableTest的一个实例,调用FutureTask的构造函数完成赋值动作,初始状态为NEW,值为0。

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}


然后启动一个线程 start(),中间调用Thread类的一些方法,然后执行FutureTask类的run()方法

public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, unnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//初始状态NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用Callable 实例的call方法,本例中result为123456
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//发生异常
setException(ex);
}
if (ran)
//调用set方法修改state状态值
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}


基本步骤为:

判断当前状态为初始状态NEW

调用Callable实例的call方法取得结果值

调用set方法,修改state状态值,赋值outcome

调用finishCompletion 处理等待队列

修改state状态这边
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
cas操作更新,将state状态从NEW更新至COMPLETING,至于
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);  这步操作是修改成最终状态NORMAL?这里有点看不懂,
putOrderedInt 方法的作用是啥?排序吗?

移除且唤醒等待的线程

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null;        // to reduce footprint
}


这条正常的线路state的状态变化为:NEW -> COMPLETING -> NORMAL。

如果在run方法中调用Callable实例的call方法时发生了异常,则会调用
setException(ex)
方法

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}


这和正常的流程差不多,只是最终状态不是NORMAL而是EXCEPTIONAL,那么状态的流程为:NEW -> COMPLETING -> EXCEPTIONAL。

cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUP
bb93
TING : CANCELLED)))
return false;
try {    // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}


FutureTask futureTask = new FutureTask(new CallableTest());
new Thread(futureTask).start();
//TimeUnit.SECONDS.sleep(2);
futureTask.cancel(false);


调用了futureTask.cancel,如果当前状态不在NEW状态,例如上面的sleep注释打开,2s后状态正常流转的话状态为NORMAL,直接返回false。如果
futureTask.cancel(false)
状态流转流程为:NEW -> CANCELLED。如果
futureTask.cancel(true)
状态流转流程为:NEW -> INTERRUPTING -> INTERRUPTED。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 线程