java ThreadPoolExecutor 线程池源码分析
2018-03-05 19:25
465 查看
当遇到多线程的时候可能都会用到ThreadPoolExecutor类,那么它究竟干了些什么呢?
1.首先看下构造方法:public ThreadPoolExecutor(
int corePoolSize, // 线程池的核心线程数,会在空闲期也最少持有该线程数
int maximumPoolSize, // 最大线程数。当线程池的线程实在不够用的时候(当queue放不下的时候)会增加pool持有的线程资源数量。当总thread数量超过orePoolSize时thread空闲时间超过就会被释放掉。
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // ThreadFactory在线程池创建线程时调用threadFactory.newThread生成新的线程
RejectedExecutionHandler handler // 当队列满了并且maximumPoolSize达到了最大数时,对新增任务的拒绝策略。
)RejectedExecutionHandler 有四种实现(源码都很简单我只说作用不在重复源码了):
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
DiscardPolicy:直接丢弃任务
DiscardOldestPolicy:丢弃任务队列顶端的任务,然后吧新任务加入
CallerRunsPolicy:主线程(调用execute方法的线程)自己执行
2.那么接下来看看线程池如何运作的:
我们可以看到构造方法几乎就是设置了以下字段值,没什么卵用:public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} 3.然后我们再来看一下罪魁祸首execute(Runnable command)方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果 当前持有thread资源小于corePoolSize数尝试新建一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果可以加入队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不能加入到队列中
else if (!addWorker(command, false))
reject(command); // look at this 这就是我们之前设置的拒绝策略
}reject源码如下:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
4.那么重点还是没有出现那么继续深入addWorker()方法:private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果thread资源超过corePoolSize或者超过maximumPoolSize添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加 workerCount 计数器,然后跳出循环,新增worker
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}一目了然就是用worker取完成我们提交的任务。这个worker就是threadPool的处理中心了。Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);// 这里就是我们配置的threadFactory
}并且这个Worker实现了Runnable接口public void run() {
runWorker(this);
}
1.首先看下构造方法:public ThreadPoolExecutor(
int corePoolSize, // 线程池的核心线程数,会在空闲期也最少持有该线程数
int maximumPoolSize, // 最大线程数。当线程池的线程实在不够用的时候(当queue放不下的时候)会增加pool持有的线程资源数量。当总thread数量超过orePoolSize时thread空闲时间超过就会被释放掉。
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // ThreadFactory在线程池创建线程时调用threadFactory.newThread生成新的线程
RejectedExecutionHandler handler // 当队列满了并且maximumPoolSize达到了最大数时,对新增任务的拒绝策略。
)RejectedExecutionHandler 有四种实现(源码都很简单我只说作用不在重复源码了):
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
DiscardPolicy:直接丢弃任务
DiscardOldestPolicy:丢弃任务队列顶端的任务,然后吧新任务加入
CallerRunsPolicy:主线程(调用execute方法的线程)自己执行
2.那么接下来看看线程池如何运作的:
我们可以看到构造方法几乎就是设置了以下字段值,没什么卵用:public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} 3.然后我们再来看一下罪魁祸首execute(Runnable command)方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果 当前持有thread资源小于corePoolSize数尝试新建一个线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果可以加入队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不能加入到队列中
else if (!addWorker(command, false))
reject(command); // look at this 这就是我们之前设置的拒绝策略
}reject源码如下:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
4.那么重点还是没有出现那么继续深入addWorker()方法:private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果thread资源超过corePoolSize或者超过maximumPoolSize添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加 workerCount 计数器,然后跳出循环,新增worker
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}一目了然就是用worker取完成我们提交的任务。这个worker就是threadPool的处理中心了。Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);// 这里就是我们配置的threadFactory
}并且这个Worker实现了Runnable接口public void run() {
runWorker(this);
}
final void runWorker(ThreadPoolExecutor.Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);// 糊弄人的空方法。。。 Throwable thrown = null; try { task.run(); // 运行我们提交的runnable任务。 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);// 糊弄人的空方法。。。 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }5.综上所述就是worker对应一个thread注册在threadPool中,然后这些个worker去queue中取得需要处理的runnable任务去runnable.run();达到线程复用的效果
相关文章推荐
- JAVA线程池(ThreadPoolExecutor)源码分析
- Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
- java线程池框架源码分析
- Java并发编程:线程池创建及源码分析
- java.util.concurrent 包源码分析之线程池
- 【Java8源码分析】线程池-Executor与ExecutorService的全面剖析
- Java 线程池 ThreadPoolExecutor 源码分析
- Java 线程池 ThreadPoolExecutor 源码分析
- java源码分析系列一 线程池Executors
- Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
- Java 线程池 ThreadPoolExecutor 源码分析
- Java ThreadPoolExecutor线程池原理及源码分析
- Java并发之线程池ThreadPoolExecutor源码分析学习
- Java并发之线程池ThreadPoolExecutor源码分析学习
- Java线程池--原理及源码分析
- Java多线程-五中线程池分析以及AnsyncTask源码分析
- [Java并发]-04-ThreadPoolExecutor类创建线程池对象和源码分析
- Java 线程池 --- ThreadPoolExecutor源码分析
- Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
- [转载] Java线程池框架源码分析