您的位置:首页 > 其它

AsyncTask分析(二)---Future、Callable、FutureTask

2016-12-16 16:00 239 查看

AsyncTask分析(二)—Future、Callable、FutureTask

之前分析AsyncTask源码的时候,在其中提到了Future,Callable,FutureTask三个类或接口,今天就揭开他们的神秘面纱,探究他们的运用。

先看一下Future接口源码:

//Future是一个接口,他提供给了我们方法来检测当前的任务是否已经结束
//,还可以等待任务结束并且拿到一个结果,说白了他可以用来进行线程同步
public interface Future<V> {
//如果任务已经完成或者已经停止了或者这个任务无法停止,则会返回一个false
boolean cancel(boolean mayInterruptIfRunning);
//判断任务是否取消,
boolean isCancelled();
//判断任务是否完成
boolean isDone();
//取值,任务未完成之前,阻塞当前线程,直到任务完成,
//如果已经调用cancel(),在调用gete,会抛异常
V get() throws InterruptedException, ExecutionException;
//设置最大等待时间(线程阻塞最长时间),其余同get()
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


干说,可能不太好理解,我们来个Demo,测试一下

private static void executor() throws Exception{
ExecutorService service = Executors.newSingleThreadExecutor();
CountRunnable count = new CountRunnable();
Future f = service.submit(count);
long start = System.currentTimeMillis();
System.out.println("start:"+start);
try{
System.out.println("result:"+f.get(10*1000, TimeUnit.MILLISECONDS));
}catch(Exception e){
System.out.println(e.toString());
}finally{
service.shutdown();
}
long end = System.currentTimeMillis();
System.out.println("任务结束 end:"+end+",共耗时:"+(end-start));
}

static class CountRunnable implements Runnable{

private int sum;
@Override
public void run() {
for(int i=1 ; i<11 ; i++){
sum = sum+i;
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("run 结束  sum="+sum);
}

}

public static void main(String[] args) {
try {
executor();
} catch (Exception e) {
e.printStackTrace();
}
}


看下打印结果:



Future.get()方法的的确确是阻塞了当前线程(因此如果在Android上使用Future不能再UIThread调用Future.get();),任务线程的完成时间是20s,但是我们的get()设置的阻塞时间是10s,在10s内无法返回结果,就会报TimeOut异常。为什么 任务线程最后输出还是sum=55?那是因为我们并没有取消任务,任务继续执行。

继续来看Callable:

//同样是一个泛型接口,只有一个call()
public interface Callable<V> {
V call() throws Exception;
}


这个是用来做什么的呢?看过文档注释就会明白,在普通的Runnable执行过程中,也就是run()执行是没有返回值的,那我们也就无法判断任务的执行情况,也没法决定在什么时候停止任务,取消任务,但是Callable call()就给我们提供了这个判断。

看一个Demo

static void callTest(){
ExecutorService pool = Executors.newSingleThreadExecutor();
CountCallable count = new CountCallable();
Future<Number> f = pool.submit(count);
long start = System.currentTimeMillis();
System.out.println("start:"+start);
try{
System.out.println("result:"+f.get().number);
}catch(Exception e){
System.out.println(e.toString());
}finally{
pool.shutdown();
}
long end = System.currentTimeMillis();
System.out.println("任务结束 end:"+end+",共耗时:"+(end-start));
}

static class CountCallable implements Callable<Number>{

@Override
public Number call() throws Exception {
Number number = new Number();
number.setNumber(100);
System.out.println("call...");
TimeUnit.SECONDS.sleep(2);
return number;
}

}

static class Number{
public int number;

public void setNumber(int number){
this.number = number;
}
}


看下结果:



当pool.submit()之后,就会立即回调call(),当任务完成之后,就会返回我们在call()中设置的Number值。

刚刚说了,Future还有个功能是取消任务,不妨来试一下:

static void cancelTest() throws Exception{
ExecutorService pool = Executors.newSingleThreadExecutor();
CountCallable count = new CountCallable();
Future<Number> f = pool.submit(count);
System.out.println("任务开始于:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(4);
f.cancel(true);
if(f.isCancelled()){
System.out.println("任务被取消于:"+System.currentTimeMillis());
}else{
Number b = f.get();
System.out.println("任务继续执行:");
if(f.isDone()){
System.out.println("任务执行完毕:"+System.currentTimeMillis());
pool.shutdown();
}
}

}


看一下打印结果:



,cancel()之后,是不能再调get(),不然会报异常。

到这里Future和Callable基本就介绍完了,下面再看一下FutureTask

FutureTask登场

Future是一个接口,他的唯一实现类就是FutureTask,其实FutureTask的一个很好地特点是他有一个回调函数done()方法,当一个任务执行结束后,会回调这个done()方法,我们可以在done()方法中调用FutureTask的get()方法来获得计算的结果。为什么我们要在done()方法中去调用get()方法呢? 这是有原因的,我在Android开发中,如果我在主线程去调用futureTask.get()方法时,会阻塞我的UI线程,如果在done()方法里调用get(),则不会阻塞我们的UI线程。

先看一下

public class FutureTask<V> implements RunnableFuture<V>{}

public interface RunnableFuture<V> extends Runnable, Future<V> {}


这就知道了FutureTask,即可作为Runnable,也可作为Future;

那就看下FutureTask中的代码:

public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}


当执行FutureTask中的run()时,真正起作用的其实是Callable接口在调用call(),这里最终也是调用了finishCompletion();

再来看一段:

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {    // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null;        // to reduce footprint
}


这里我们看到,当调用cancel(boolean)后,如果mayInterruptIfRunning = true,就会去执行t.interrupt();不论成功与否,最后执行finishCompletion(),执行done(),置空callable;

所以不论是cancel(返回true的时候),还是run最终都会调用done()

来个Demo验证下:

static class CountCallable implements Callable<Number>{

@Override
public Number call() throws Exception {
Number n = new Number();
n.number = 100;
System.out.println("call....");
TimeUnit.SECONDS.sleep(10);
return n;
}

}

static class Number{
public int number;
}

FutureTask<Number> task ;
ExecutorService es;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
CountCallable callable = new CountCallable();
task = new FutureTask<Number>(callable){

@Override
protected void done() {
try {
Number result = task.get();
System.out.println("任务结束  number:"+result.number+",结束:"+System.currentTimeMillis()+",isDone:"+task.isCancelled()+",isCancel:"+task.isCancelled()+",theadName:"+Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}

};

es = Executors.newFixedThreadPool(2);
es.execute(task);
System.out.println("任务开始:"+System.currentTimeMillis());

findViewById(R.id.btn).setOnClickListener(new OnClickListener() {

@Override
public void onClick(View v) {
if(task.isDone()||task.isCancelled()){
return;
}
System.out.println(task.cancel(true)+",isCanceled:"+task.isCancelled());

}
});
}


看下结果:

1,不点击cancel:



为毛要在done()里面调用get(),因为done()是运行在子线程的,不会阻塞UIThread,

2,点击cancel:



task.cancel()返回true;

那在回过头来看下AsyncTask中的运用:

public AsyncTask() {
//Callable接口的实现类,也就是FutureTask run()中真正工作的部分
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);

Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// 暴露到程序中的就是doInBackGround();
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};

mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//这里获取的就是call()中的result
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
//同样交给了Future去处理
return mFuture.cancel(mayInterruptIfRunning);
}


我们重点追踪execute()这个流程:

@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}

@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}

mStatus = Status.RUNNING;

onPreExecute();

mWorker.mParams = params;
//这里其实就是SerialExecutor的execute()
exec.execute(mFuture);

return this;
}

private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;

public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}


一步一步追踪下来,发现最终都是在SerialExecutor里面了,exec.execute(mFuture)其实就是SerialExecutor执行的execute(mFuture);

这里面我们看到直接调用了mFuture.run();

执行的流程图如下



参考:

Android并发编程之白话文详解Future,FutureTask和Callable
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  asynctask