您的位置:首页 > 编程语言 > Java开发

java ThreadPoolExecutor 线程池源码分析

2018-03-05 19:25 465 查看
当遇到多线程的时候可能都会用到ThreadPoolExecutor类,那么它究竟干了些什么呢?
1.首先看下构造方法:public ThreadPoolExecutor(
int corePoolSize, // 线程池的核心线程数,会在空闲期也最少持有该线程数
int maximumPoolSize, // 最大线程数。当线程池的线程实在不够用的时候(当queue放不下的时候)会增加pool持有的线程资源数量。当总thread数量超过orePoolSize时thread空闲时间超过就会被释放掉。
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // ThreadFactory在线程池创建线程时调用threadFactory.newThread生成新的线程
RejectedExecutionHandler handler // 当队列满了并且maximumPoolSize达到了最大数时,对新增任务的拒绝策略。
)RejectedExecutionHandler 有四种实现(源码都很简单我只说作用不在重复源码了):
        AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
        DiscardPolicy:直接丢弃任务
        DiscardOldestPolicy:丢弃任务队列顶端的任务,然后吧新任务加入
        CallerRunsPolicy:主线程(调用execute方法的线程)自己执行
2.那么接下来看看线程池如何运作的:
    我们可以看到构造方法几乎就是设置了以下字段值,没什么卵用:public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}  3.然后我们再来看一下罪魁祸首execute(Runnable command)方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果 当前持有thread资源小于corePoolSize数尝试新建一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果可以加入队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不能加入到队列中
else if (!addWorker(command, false))
reject(command); // look at this 这就是我们之前设置的拒绝策略
}reject源码如下:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
4.那么重点还是没有出现那么继续深入addWorker()方法:private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 如果thread资源超过corePoolSize或者超过maximumPoolSize添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加 workerCount 计数器,然后跳出循环,新增worker
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}一目了然就是用worker取完成我们提交的任务。这个worker就是threadPool的处理中心了。Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);// 这里就是我们配置的threadFactory
}并且这个Worker实现了Runnable接口public void run() {
runWorker(this);
}
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);// 糊弄人的空方法。。。
Throwable thrown = null;
try {
task.run(); // 运行我们提交的runnable任务。
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);// 糊弄人的空方法。。。
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
5.综上所述就是worker对应一个thread注册在threadPool中,然后这些个worker去queue中取得需要处理的runnable任务去runnable.run();达到线程复用的效果
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: