Callable、Future、FutureTask 分析
2016-08-26 16:33
281 查看
jdk 自1.5开始提供了Callable 接口,用来满足一个带有返回值的线程调用。
该接口只有一个call 泛型 方法,返回一个传入的泛型,并且是可抛异常的,Callable通常配合Future一起使用。
cancel : 尝试取消执行的任务。返回true,表示任务取消成功,在下列情况下将返回 false:任务已经完成、任务已经取消、因为其他原因不能取消。如果当前任务已经启动运行(NEW),那么参数 mayInterruptIfRunning 将决定是否线程可以被中断。
isCancelled:返回线程在正常结束之前是否被取消。
isDone:返回线程是否完成。完成可能是因为:正常结束,异常发生或者是被取消。
get: 这个方法将会阻塞,直到计算结束并返回结果(泛型,通常和Callable 一起使用,返回Callable的泛型)。
V get(long timeout, TimeUnit unit):等待相应的时间。
Future是一个接口,得有相应的实现类去处理这些方法,目前这个接口的实现类有:FutureTask,ForkJoinTask,今天讲解一下FutureTask。
FutureTask 实现了RunnableFuture接口
RunnableFuture 继承了Runnable和Future,也就是说RunnableFuture既能处理Runnable,也能处理Future,所以在FutureTask有两个构造器分别处理了Callable和Runnable的实现。
这两个构造函数最终都是对callable,state 这两个变量作赋值。
callable:底层调用的Callable实例。
state:任务运行的状态,值为0-6,初始为NEW。运行状态state在方法
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
一般我们使用比较多的是Callable作为参数的构造器,举个栗子来说明上述状态的改变:
最终程序打印:
其中get()方法将阻塞至call()方法返回为止,接下来我们来分析一下里面各状态的变化。
首先:构建一个FutureTask对象,入参为CallableTest的一个实例,调用FutureTask的构造函数完成赋值动作,初始状态为NEW,值为0。
然后启动一个线程 start(),中间调用Thread类的一些方法,然后执行FutureTask类的run()方法
基本步骤为:
判断当前状态为初始状态NEW
调用Callable实例的call方法取得结果值
调用set方法,修改state状态值,赋值outcome
调用finishCompletion 处理等待队列
修改state状态这边
移除且唤醒等待的线程
这条正常的线路state的状态变化为:NEW -> COMPLETING -> NORMAL。
如果在run方法中调用Callable实例的call方法时发生了异常,则会调用
这和正常的流程差不多,只是最终状态不是NORMAL而是EXCEPTIONAL,那么状态的流程为:NEW -> COMPLETING -> EXCEPTIONAL。
cancel方法
调用了futureTask.cancel,如果当前状态不在NEW状态,例如上面的sleep注释打开,2s后状态正常流转的话状态为NORMAL,直接返回false。如果
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
该接口只有一个call 泛型 方法,返回一个传入的泛型,并且是可抛异常的,Callable通常配合Future一起使用。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
cancel : 尝试取消执行的任务。返回true,表示任务取消成功,在下列情况下将返回 false:任务已经完成、任务已经取消、因为其他原因不能取消。如果当前任务已经启动运行(NEW),那么参数 mayInterruptIfRunning 将决定是否线程可以被中断。
isCancelled:返回线程在正常结束之前是否被取消。
isDone:返回线程是否完成。完成可能是因为:正常结束,异常发生或者是被取消。
get: 这个方法将会阻塞,直到计算结束并返回结果(泛型,通常和Callable 一起使用,返回Callable的泛型)。
V get(long timeout, TimeUnit unit):等待相应的时间。
Future是一个接口,得有相应的实现类去处理这些方法,目前这个接口的实现类有:FutureTask,ForkJoinTask,今天讲解一下FutureTask。
public class FutureTask<V> implements RunnableFuture<V>
FutureTask 实现了RunnableFuture接口
public interface RunnableFuture<V> extends Runnable, Future<V>{ /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
RunnableFuture 继承了Runnable和Future,也就是说RunnableFuture既能处理Runnable,也能处理Future,所以在FutureTask有两个构造器分别处理了Callable和Runnable的实现。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
这两个构造函数最终都是对callable,state 这两个变量作赋值。
/** The underlying callable; nulled out after running */ private Callable<V> callable; /** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
callable:底层调用的Callable实例。
state:任务运行的状态,值为0-6,初始为NEW。运行状态state在方法
set(V v),
setException(Throwable t)和
cancel(boolean mayInterruptIfRunning)中将被设置成一些终端的状态值。在线程任务执行到结束期间,状态值可能会被设置成临时的COMPLETING状态(当outcome 属性正在被设置时)或者是INTERRUPTING(只有当在调用canel(true) 方法使线程任务被中断)。这些中间改变的状态值将被设置成final类型,因为这些中间状态值都是唯一的,不可改变。状态改变的可能性变化:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
一般我们使用比较多的是Callable作为参数的构造器,举个栗子来说明上述状态的改变:
/** * @author peanut */ public class ThreadDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new CallableTest()); new Thread(futureTask).start(); //futureTask.cancel(false); // futureTask.cancel(true); Object o = futureTask.get(); System.out.println(o); } } class CallableTest implements Callable { @Override public Object call() throws Exception { System.out.println("in。。。"); return 123456; } }
最终程序打印:
in。。。 123456
其中get()方法将阻塞至call()方法返回为止,接下来我们来分析一下里面各状态的变化。
首先:构建一个FutureTask对象,入参为CallableTest的一个实例,调用FutureTask的构造函数完成赋值动作,初始状态为NEW,值为0。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
然后启动一个线程 start(),中间调用Thread类的一些方法,然后执行FutureTask类的run()方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, unnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //初始状态NEW if (c != null && state == NEW) { V result; boolean ran; try { //调用Callable 实例的call方法,本例中result为123456 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //发生异常 setException(ex); } if (ran) //调用set方法修改state状态值 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
基本步骤为:
判断当前状态为初始状态NEW
调用Callable实例的call方法取得结果值
调用set方法,修改state状态值,赋值outcome
调用finishCompletion 处理等待队列
修改state状态这边
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))cas操作更新,将state状态从NEW更新至COMPLETING,至于
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 这步操作是修改成最终状态NORMAL?这里有点看不懂,putOrderedInt 方法的作用是啥?排序吗?
移除且唤醒等待的线程
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
这条正常的线路state的状态变化为:NEW -> COMPLETING -> NORMAL。
如果在run方法中调用Callable实例的call方法时发生了异常,则会调用
setException(ex)方法
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
这和正常的流程差不多,只是最终状态不是NORMAL而是EXCEPTIONAL,那么状态的流程为:NEW -> COMPLETING -> EXCEPTIONAL。
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUP bb93 TING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
FutureTask futureTask = new FutureTask(new CallableTest()); new Thread(futureTask).start(); //TimeUnit.SECONDS.sleep(2); futureTask.cancel(false);
调用了futureTask.cancel,如果当前状态不在NEW状态,例如上面的sleep注释打开,2s后状态正常流转的话状态为NORMAL,直接返回false。如果
futureTask.cancel(false)状态流转流程为:NEW -> CANCELLED。如果
futureTask.cancel(true)状态流转流程为:NEW -> INTERRUPTING -> INTERRUPTED。
相关文章推荐
- AsyncTask分析(二)---Future、Callable、FutureTask
- 多线程 | 并发包中的Future、Callable、FutureTask 源码分析
- Java并发编程:Callable、Future和FutureTask
- Java_并发线程_Future、FutureTask、Callable
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java多线程之 Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask(原文链接:http://www.cnblogs.com/dolphin0520/p/3949310.html)
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- 多线程之Runnable,Callable,Future,FutureTask
- 疯狂Java学习笔记(66)-----------Callable、Future和FutureTask
- Java中的Runnable、Callable、Future、FutureTask的区别与示例
- java并发编程:Callable、Future和FutureTask
- Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程之Callable,Future,FutureTask