您的位置:首页 > 编程语言 > Java开发

ThreadPoolExecutor源码阅读

2017-05-31 16:07 288 查看

1.线程池解决的问题

线程池解决两个问题
一是复用线程,减少创建销毁线程带来系统开销;
二是限定系统资源使用边界,避免大量线程消耗尽系统内存

适用于互不依赖,运行时间短,不需要对线程控制操作的线程

2.ThreadPoolExecutor主流程

添加任务时,
1.若线程数量小于corePoolSize,则新增线程执行任务
2.若线程数量大于等于corePoolSize且queue队列非满,任务添加到queue
3.若线程数量大于等于corePoolSize且queue队列满,则新增线程执行任务直到线程数量到达maximumPoolSize
4.若线程池满,任务无法被执行,执行用户定义或者默认的RejectedExecutionHandler

执行任务
执行线程持续从queue队列中获取任务并执行

3.主要方法

execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//线程少于corePoolSize添加线程
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//如果发现线程池关闭,移除当前任务
reject(command);
else if (workerCountOf(recheck) == 0)//如果线程池无线程执行,添加一个线程
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);//若队列满或者线程池关闭,执行rejecthandler
}


addWork
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&	//线程池非运行状态
! (rs == SHUTDOWN &&	//线程池关闭时任务队列非空,可以添加线程,shutdown状态不接受新任务,所以firstTask为null
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//增加计数成功
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//开始执行线程,调用的runWorker方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


runWorker
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
   //执行第一个任务或者从queue队列获取新任务,循环执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java