您的位置:首页 > 编程语言 > Java开发

Java线程池源码分析(基于JDK1.8)

2017-09-03 10:23 656 查看

前言

线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:

1、降低资源消耗;

2、提高响应速度;

3、提高线程的可管理性。

源码分析

构造方法

首先来看ThreadPoolExecutor类的四种构造方法

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7) {
this(var1, var2, var3, var5, var6, var7, defaultHandler);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, RejectedExecutionHandler var7) {
this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7);
}

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if(var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if(var6 != null && var7 != null && var8 != null) {
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}


从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释下一下构造器中各个参数的含义:

corePoolSize:核心池的大小,默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

maximumPoolSize:线程池最大线程数,一般你用不到,当大于了这个值就会将任务由一个丢弃处理机制来处理,但是当你调用newFixedThreadPool()的时候,corePoolSizemaximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中。

workQueue等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的线程属性为:workers,为一个HashSet;我们的Runnable内部被包装了一层,后面会看到这部分代码;这个队列默认是一个无界队列(你也可以设定一个有界队列),所以在生产者疯狂生产的时候,考虑如何控制的问题。

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread()方法即可。

handler:也就是参数maximumPoolSize达到后丢弃处理的方法,Java提供了4种丢弃处理的方法:

1.CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程

2.DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。

3.DiscardPolicy:什么也不做

4.AbortPolicy默认使用的,抛出一个异常RejectedExecutionException。

当然你也可以自己根据实际情况去重写,主要是要实现接口:RejectedExecutionHandler中的方法public void rejectedExecution(Runnabler, ThreadPoolExecutor e)

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor之间的关系

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的。

ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等。

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。

然后ThreadPoolExecutor继承了类AbstractExecutorService。

主要流程

通常我们得到线程池后,会调用其中的submit()execute()去操作。

execute()实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()不同,它能够返回任务执行的结果,去看submit()的实现,会发现它实际上还是调用的execute(),只不过它利用了Future来获取任务执行结果。

下面就来看一下execute()

public void execute(Runnable var1) {
if(var1 == null) {
throw new NullPointerException();
} else {
int var2 = this.ctl.get();
if(workerCountOf(var2) < this.corePoolSize) {
if(this.addWorker(var1, true)) {
return;
}

var2 = this.ctl.get();
}

if(isRunning(var2) && this.workQueue.offer(var1)) {
int var3 = this.ctl.get();
if(!isRunning(var3) && this.remove(var1)) {
this.reject(var1);
} else if(workerCountOf(var3) == 0) {
this.addWorker((Runnable)null, false);
}
} else if(!this.addWorker(var1, false)) {
this.reject(var1);
}

}
}


这里大概的意思其实就是,先判断传递进来的Runnable 是否为空。

然后比较目前的线程池的大小corePoolSize,如果比corePoolSize小调用addWorker(),反之,把传递进来的Runnable装进workQueue中。

当然有的步骤我没分析,我们了解主要的流程即可。

接下来我们看到addWorker()

private boolean addWorker(Runnable var1, boolean var2) {
while(true) {
int var3 = this.ctl.get();
int var4 = runStateOf(var3);
if(var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {
return false;
}

while(true) {
int var5 = workerCountOf(var3);
if(var5 >= 536870911 || var5 >= (var2?this.corePoolSize:this.maximumPoolSize)) {
return false;
}

if(this.compareAndIncrementWorkerCount(var3)) {
boolean var18 = false;
boolean var19 = false;
ThreadPoolExecutor.Worker var20 = null;

try {
var20 = new ThreadPoolExecutor.Worker(var1);
Thread var6 = var20.thread;
if(var6 != null) {
ReentrantLock var7 = this.mainLock;
var7.lock();

try {
int var8 = runStateOf(this.ctl.get());
if(var8 < 0 || var8 == 0 && var1 == null) {
if(var6.isAlive()) {
throw new IllegalThreadStateException();
}

this.workers.add(var20);
int var9 = this.workers.size();
if(var9 > this.largestPoolSize) {
this.largestPoolSize = var9;
}

var19 = true;
}
} finally {
var7.unlock();
}

if(var19) {
var6.start();
var18 = true;
}
}
} finally {
if(!var18) {
this.addWorkerFailed(var20);
}

}

return var18;
}

var3 = this.ctl.get();
if(runStateOf(var3) != var4) {
break;
}
}
}
}


方法很长,我们仍然关注重点,其中有一段是这样的:

var20 = new ThreadPoolExecutor.Worker(var1);
Thread var6 = var20.thread;
……
var6.start();


我们看到new了一个Worker,并且把传递进来的Runnable对象var1作为自己构造函数的参数,WorkerThreadPoolExecutor的一个内部类:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
……
final Thread thread;
Runnable firstTask;
……
Worker(Runnable var2) {
this.setState(-1);
this.firstTask = var2;
this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
}
……
public void run() {
ThreadPoolExecutor.this.runWorker(this);
}
……
}


我们发现Worker其实就是一个Runnable,它的构造函数做了什么操作呢?

把传递进来的Runnable对象(就是我们最初调用execute方法传递进来的参数)用成员变量firstTask保存了。

然后通过threadFactory创建了一个新的线程,而我们此时的Worker对象,成了Thread构造函数中的Runnable对象了,并且把这个新建的线程用成员变量thread保存了。

那回到上面的代码:

var20 = new ThreadPoolExecutor.Worker(var1);
Thread var6 = var20.thread;
……
var6.start();


以上的var6其实就是新建的线程,而调用start(),其实调用的就是var20 run(),我们看到Worker中的run()就是调用了ThreadPoolExecutorrunWorker()。饶了一圈,原来是这样。

我们继续看runWorker()

final void runWorker(ThreadPoolExecutor.Worker var1) {
Thread var2 = Thread.currentThread();
Runnable var3 = var1.firstTask;
var1.firstTask = null;
var1.unlock();
boolean var4 = true;

try {
while(var3 != null || (var3 = this.getTask()) != null) {
var1.lock();
if((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) {
var2.interrupt();
}

try {
this.beforeExecute(var2, var3);
Object var5 = null;

try {
var3.run();
} catch (RuntimeException var28) {
var5 = var28;
throw var28;
} catch (Error var29) {
var5 = var29;
throw var29;
} catch (Throwable var30) {
var5 = var30;
throw new Error(var30);
} finally {
this.afterExecute(var3, (Throwable)var5);
}
} finally {
var3 = null;
++var1.completedTasks;
var1.unlock();
}
}

var4 = false;
} finally {
this.processWorkerExit(var1, var4);
}

}


这里的重点就是先执行了最早在execute()传递进来的Runnable任务,即变量var3,保存在Worker的成员变量firstTask中的。

然后在While循环中,继续通过getTask()取任务来执行,如果getTask()返回为空就跳出循环。

我们来看getTask()

private Runnable getTask() {
boolean var1 = false;

while(true) {
int var2 = this.ctl.get();
int var3 = runStateOf(var2);
if(var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) {
this.decrementWorkerCount();
return null;
}

int var4 = workerCountOf(var2);
boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize;
if(var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) {
try {
Runnable var6 = var5?(Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS):(Runnable)this.workQueue.take();
if(var6 != null) {
return var6;
}

var1 = true;
} catch (InterruptedException var7) {
var1 = false;
}
} else if(this.compareAndDecrementWorkerCount(var2)) {
return null;
}
}
}


我们发现就是从workQueue队列中,也就是等待队列中获取一个元素出来并返回,整个getTask()在自旋下完成。

其中
workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS)
表示的是如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null

workQueue.take()
表示的是如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。

以上分析可以参考BlockingQueue接口的定义:

/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;

/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
*        {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
*        {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
*         specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;


然后回到runWorker(),我们发现跳出while循环之后,最终会调用processWorkerExit()

private void processWorkerExit(ThreadPoolExecutor.Worker var1, boolean var2) {
if(var2) {
this.decrementWorkerCount();
}

ReentrantLock var3 = this.mainLock;
var3.lock();

try {
this.completedTaskCount += var1.completedTasks;
this.workers.remove(var1);
} finally {
var3.unlock();
}

this.tryTerminate();
int var4 = this.ctl.get();
if(runStateLessThan(var4, 536870912)) {
if(!var2) {
int var5 = this.allowCoreThreadTimeOut?0:this.corePoolSize;
if(var5 == 0 && !this.workQueue.isEmpty()) {
var5 = 1;
}

if(workerCountOf(var4) >= var5) {
return;
}
}

this.addWorker((Runnable)null, false);
}

}


做的操作主要就是将废弃的Worker移出workersworkers之前介绍过,就是存储Worker的一个HashSet。然后通过tryTerminate()进行一些回收操作。

创建线程池的五类方法

Executors类中有几个创建线程池的静态方法:

1.newFixedThreadPool():创建一个可重用的、具有固定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列。不过当线程池没有可执行任务时,也不会释放线程。

2.newSingleThreadExecutor()

初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。

3.newCachedThreadPool()

初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列。

和newFixedThreadPool()创建的线程池不同,newCachedThreadPool()在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。

4.newSingleThreadExecutor()

初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

5.newWorkStealingPool():创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争。

参考:

1.Java并发编程:线程池的使用

2.Java线程池架构(一)原理和源码解析

3.深入分析java线程池的实现原理
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java 线程池 源码 JDK