AsyncTask分析(二)---Future、Callable、FutureTask
2016-12-16 16:00
239 查看
AsyncTask分析(二)—Future、Callable、FutureTask
之前分析AsyncTask源码的时候,在其中提到了Future,Callable,FutureTask三个类或接口,今天就揭开他们的神秘面纱,探究他们的运用。先看一下Future接口源码:
//Future是一个接口,他提供给了我们方法来检测当前的任务是否已经结束 //,还可以等待任务结束并且拿到一个结果,说白了他可以用来进行线程同步 public interface Future<V> { //如果任务已经完成或者已经停止了或者这个任务无法停止,则会返回一个false boolean cancel(boolean mayInterruptIfRunning); //判断任务是否取消, boolean isCancelled(); //判断任务是否完成 boolean isDone(); //取值,任务未完成之前,阻塞当前线程,直到任务完成, //如果已经调用cancel(),在调用gete,会抛异常 V get() throws InterruptedException, ExecutionException; //设置最大等待时间(线程阻塞最长时间),其余同get() V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
干说,可能不太好理解,我们来个Demo,测试一下
private static void executor() throws Exception{ ExecutorService service = Executors.newSingleThreadExecutor(); CountRunnable count = new CountRunnable(); Future f = service.submit(count); long start = System.currentTimeMillis(); System.out.println("start:"+start); try{ System.out.println("result:"+f.get(10*1000, TimeUnit.MILLISECONDS)); }catch(Exception e){ System.out.println(e.toString()); }finally{ service.shutdown(); } long end = System.currentTimeMillis(); System.out.println("任务结束 end:"+end+",共耗时:"+(end-start)); } static class CountRunnable implements Runnable{ private int sum; @Override public void run() { for(int i=1 ; i<11 ; i++){ sum = sum+i; try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("run 结束 sum="+sum); } } public static void main(String[] args) { try { executor(); } catch (Exception e) { e.printStackTrace(); } }
看下打印结果:
Future.get()方法的的确确是阻塞了当前线程(因此如果在Android上使用Future不能再UIThread调用Future.get();),任务线程的完成时间是20s,但是我们的get()设置的阻塞时间是10s,在10s内无法返回结果,就会报TimeOut异常。为什么 任务线程最后输出还是sum=55?那是因为我们并没有取消任务,任务继续执行。
继续来看Callable:
//同样是一个泛型接口,只有一个call() public interface Callable<V> { V call() throws Exception; }
这个是用来做什么的呢?看过文档注释就会明白,在普通的Runnable执行过程中,也就是run()执行是没有返回值的,那我们也就无法判断任务的执行情况,也没法决定在什么时候停止任务,取消任务,但是Callable call()就给我们提供了这个判断。
看一个Demo
static void callTest(){ ExecutorService pool = Executors.newSingleThreadExecutor(); CountCallable count = new CountCallable(); Future<Number> f = pool.submit(count); long start = System.currentTimeMillis(); System.out.println("start:"+start); try{ System.out.println("result:"+f.get().number); }catch(Exception e){ System.out.println(e.toString()); }finally{ pool.shutdown(); } long end = System.currentTimeMillis(); System.out.println("任务结束 end:"+end+",共耗时:"+(end-start)); } static class CountCallable implements Callable<Number>{ @Override public Number call() throws Exception { Number number = new Number(); number.setNumber(100); System.out.println("call..."); TimeUnit.SECONDS.sleep(2); return number; } } static class Number{ public int number; public void setNumber(int number){ this.number = number; } }
看下结果:
当pool.submit()之后,就会立即回调call(),当任务完成之后,就会返回我们在call()中设置的Number值。
刚刚说了,Future还有个功能是取消任务,不妨来试一下:
static void cancelTest() throws Exception{ ExecutorService pool = Executors.newSingleThreadExecutor(); CountCallable count = new CountCallable(); Future<Number> f = pool.submit(count); System.out.println("任务开始于:"+System.currentTimeMillis()); TimeUnit.SECONDS.sleep(4); f.cancel(true); if(f.isCancelled()){ System.out.println("任务被取消于:"+System.currentTimeMillis()); }else{ Number b = f.get(); System.out.println("任务继续执行:"); if(f.isDone()){ System.out.println("任务执行完毕:"+System.currentTimeMillis()); pool.shutdown(); } } }
看一下打印结果:
,cancel()之后,是不能再调get(),不然会报异常。
到这里Future和Callable基本就介绍完了,下面再看一下FutureTask
FutureTask登场
Future是一个接口,他的唯一实现类就是FutureTask,其实FutureTask的一个很好地特点是他有一个回调函数done()方法,当一个任务执行结束后,会回调这个done()方法,我们可以在done()方法中调用FutureTask的get()方法来获得计算的结果。为什么我们要在done()方法中去调用get()方法呢? 这是有原因的,我在Android开发中,如果我在主线程去调用futureTask.get()方法时,会阻塞我的UI线程,如果在done()方法里调用get(),则不会阻塞我们的UI线程。
先看一下
public class FutureTask<V> implements RunnableFuture<V>{} public interface RunnableFuture<V> extends Runnable, Future<V> {}
这就知道了FutureTask,即可作为Runnable,也可作为Future;
那就看下FutureTask中的代码:
public void run() { if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void setException(Throwable t) { if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) { outcome = t; U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state finishCompletion(); } }
当执行FutureTask中的run()时,真正起作用的其实是Callable接口在调用call(),这里最终也是调用了finishCompletion();
再来看一段:
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && U.compareAndSwapInt(this, STATE, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state U.putOrderedInt(this, STATE, INTERRUPTED); } } } finally { finishCompletion(); } return true; } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (U.compareAndSwapObject(this, WAITERS, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
这里我们看到,当调用cancel(boolean)后,如果mayInterruptIfRunning = true,就会去执行t.interrupt();不论成功与否,最后执行finishCompletion(),执行done(),置空callable;
所以不论是cancel(返回true的时候),还是run最终都会调用done()
来个Demo验证下:
static class CountCallable implements Callable<Number>{ @Override public Number call() throws Exception { Number n = new Number(); n.number = 100; System.out.println("call...."); TimeUnit.SECONDS.sleep(10); return n; } } static class Number{ public int number; } FutureTask<Number> task ; ExecutorService es; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); CountCallable callable = new CountCallable(); task = new FutureTask<Number>(callable){ @Override protected void done() { try { Number result = task.get(); System.out.println("任务结束 number:"+result.number+",结束:"+System.currentTimeMillis()+",isDone:"+task.isCancelled()+",isCancel:"+task.isCancelled()+",theadName:"+Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } } }; es = Executors.newFixedThreadPool(2); es.execute(task); System.out.println("任务开始:"+System.currentTimeMillis()); findViewById(R.id.btn).setOnClickListener(new OnClickListener() { @Override public void onClick(View v) { if(task.isDone()||task.isCancelled()){ return; } System.out.println(task.cancel(true)+",isCanceled:"+task.isCancelled()); } }); }
看下结果:
1,不点击cancel:
为毛要在done()里面调用get(),因为done()是运行在子线程的,不会阻塞UIThread,
2,点击cancel:
task.cancel()返回true;
那在回过头来看下AsyncTask中的运用:
public AsyncTask() { //Callable接口的实现类,也就是FutureTask run()中真正工作的部分 mWorker = new WorkerRunnable<Params, Result>() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // 暴露到程序中的就是doInBackGround(); Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask<Result>(mWorker) { @Override protected void done() { try { //这里获取的就是call()中的result postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; } public final boolean cancel(boolean mayInterruptIfRunning) { mCancelled.set(true); //同样交给了Future去处理 return mFuture.cancel(mayInterruptIfRunning); }
我们重点追踪execute()这个流程:
@MainThread public final AsyncTask<Params, Progress, Result> execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } @MainThread public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; //这里其实就是SerialExecutor的execute() exec.execute(mFuture); return this; } private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); private static class SerialExecutor implements Executor { final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>(); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }
一步一步追踪下来,发现最终都是在SerialExecutor里面了,exec.execute(mFuture)其实就是SerialExecutor执行的execute(mFuture);
这里面我们看到直接调用了mFuture.run();
执行的流程图如下:
参考:
Android并发编程之白话文详解Future,FutureTask和Callable
相关文章推荐
- Android并发-Future,FutureTask和Callable及AsyncTask
- 多线程 | 并发包中的Future、Callable、FutureTask 源码分析
- Callable、Future、FutureTask 分析
- Android AsyncTask完全解析FutureTask 深度解析 -Java并发编程:Callable、Future和FutureTask一个使用DownloadManager下载文件的小例
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- java并发callable,runnable,future和futureTask
- Java并发编程举例Runnable, Callable, Future, FutureTask, CompletionService
- Java并发编程:Callable、Future和FutureTask
- 多线程下的其它组件之CyclicBarrier、Callable、Future、FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java多线程21:多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask
- ThreadPool 之 Callable、Future 和 FutureTask
- (转)Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java并发编程:Callable、Future和FutureTask
- Java中的Runnable、Callable、Future、FutureTask的区别与示例