您的位置:首页 > 移动开发 > Android开发

android线程池详解之ThreadPoolExecutor剖析(二)

2016-07-04 18:56 447 查看
在前一篇文章中,我们简单了解了线程的的使用,以及线程池工厂如何创建几种线程池的,接下来我们一起了解一下ThreadPoolExecutor的工作原理

1.首先我们不用工厂类来创建一个ThreadPoolExecutor

LinkedBlockingDeque<Runnable> bd=new LinkedBlockingDeque<Runnable>();
ThreadPoolExecutor  pool=new ThreadPoolExecutor(5, 10, 2, TimeUnit.SECONDS, bd, new ThreadFactory() {
volatile int i=0;
@Override
public Thread newThread(Runnable r) {
//创建线程工厂
Thread t=new Thread(r);
t.setName("ThreadName"+i);
i++;
return t;
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//线程池对拒绝任务的处理策略
throw RejectedExecutionException("Task " + r.toString() + " rejected from " + <span style="font-family: Arial, Helvetica, sans-serif;">executor</span><span style="font-family: Arial, Helvetica, sans-serif;">.toString());</span>
}
});
以上程序中我们它的完整构造方法为

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler)

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

keepAliveTime: 线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue: 线程池所使用的缓冲队列(已知的缓冲队列有

handler:
线程池对拒绝任务的处理策略

先借张图,看看ThreadPoolExecutor的工作原理



先看看execute()源代码

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// <span style="color: rgb(51, 51, 51); font-family: Arial, sans-serif; font-size: 14px; line-height: 21px;">判断当前线程数量是否超过了核心线程数,若未超过,则直接添加一个核心线程,并用此线程完成当前提交的任务。</span>
if (workerCountOf(c) < corePoolSize) {
//添加任务到工作线程线程中
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池处于工作状态并且把任务存放到缓冲队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//没有空闲线程执行新任务尝试移除任务,抛出异常
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//检查到工作线程有空闲的,马上执行新任务
addWorker(null, false);
}
//没有空闲的线程,队列也添加不进去抛出异常
else if (!addWorker(command, false))
reject(command);
}


有几点需要注意的

1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。

2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:

a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。

c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;

d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。

3、当一个线程完成任务时,它会从队列中取下一个任务来执行。

4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到
corePoolSize 的大小。

就下了我们一起来看看addWorker()这个方法

先贴源码吧

<span style="font-family:Lantinghei SC, Open Sans, Arial, Hiragino Sans GB, Microsoft YaHei, STHeiti, WenQuanYi Micro Hei, SimSun, sans-serif;color:#3d464d;"><span style="font-size: 16px; line-height: 30px;">  private boolean addWorker(Runnable firstTask, boolean core) {
retry://标示符,表示开始执行的位置</span></span><span style="font-family:Lantinghei SC, Open Sans, Arial, Hiragino Sans GB, Microsoft YaHei, STHeiti, WenQuanYi Micro Hei, SimSun, sans-serif;color:#3d464d;"><span style="font-size: 16px; line-height: 30px;">

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

</span></span>
<span style="font-family:Lantinghei SC, Open Sans, Arial, Hiragino Sans GB, Microsoft YaHei, STHeiti, WenQuanYi Micro Hei, SimSun, sans-serif;color:#3d464d;"><span style="font-size: 16px; line-height: 30px;">//<span style="color: rgb(51, 51, 51); font-family: Arial, sans-serif; font-size: 14px; line-height: 20px;">表示ctl状态为RUNNING状态或者为SHUTDOWN状态且此时任务队列仍有任务未执行完时,可以继续调用addWorker添加工作线程,但不能新建任务,即firstTask参数必须为null.否则这里将返回false,即新建工作线程失败。</span></span></span>
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//<span style="color: rgb(51, 51, 51); font-family: Arial, sans-serif; font-size: 14px; line-height: 20px;"> 这部分对当前线程数量做了判断,依据core参数,若core参数为true,即添加的为核心线程,则当前工作的线程数量不应当超过corePoolSize,否则返回false。若core参数为false,即添加的为普通线程,则当前工作的线程数量不应当超过maximumPoolSize,否则返回false。通过上述校验,则调用compareAndIncrementWorkerCount(c)尝试增加当前ctl计数,若成功则跳出外曾循环。若失败则重复检查当前线程池ctl状态(之所以检查ctl是因为造成失败的原因为ctl发生变化,这有两种可能,一种是作为下29位的工作线程计数发生变化,一种是作为上3位的状态标志位发生变化,这里检查的就是上3位的变化),若是ctl状态发生变化,则重新尝试外层循环(这是有可能外层循环会由于ctl状态的变化而直接return false)。若是ctl计数发生变化,则重新尝试内层循环。</span>
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
<span style="color: rgb(61, 70, 77); font-family: Arial, sans-serif; font-size: 14px; line-height: 20px;">//这部分获取了线程池的主锁mainLock。从而保证对共享资源workers的排他访问。这里通过new Worker(firstTask) 新建了一个worker对象,该对象持有他的第一个任务firstTask。Woker类的构造方法,将通过ThreadFactory获取一个thread对象,这里可能失败,返回null,或者抛出异常。所以在上面的代码段中对这种情况作了处理,若返回null,则workerStarted将为false。将执行addWorkerFailed方法处理失败情况;若抛出异常同样workerStarted将为false。将同样执行addWorkerFailed方法。若成功通过ThreadFactory新建了一个持有当前worker对象的thread对象,则继续往下recheck ctl状态(规则与1没有区别),通过校验则将当前worker放入workers数组中,然后重新校正队则池大小(largestPoolSize),置workerAdded标志位为true。最后通过wokerAdded标志位校验,置workerStarted标志位为true,启动线程,该线程持有当前worker对象,会执行worker对象的run方法,而woker对象的run方法又调用了自身的runWorker(this)方法。至此为止一个worker正式被添加进入workers数组并且正式开始运转。</span>
<span style="color: rgb(61, 70, 77); font-family: Arial, sans-serif; font-size: 14px; line-height: 20px;">
</span>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息