Java线程池源码分析(基于JDK1.8)
2017-09-03 10:23
656 查看
前言
线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:1、降低资源消耗;
2、提高响应速度;
3、提高线程的可管理性。
源码分析
构造方法
首先来看ThreadPoolExecutor类的四种构造方法:public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6) { this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7) { this(var1, var2, var3, var5, var6, var7, defaultHandler); } public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, RejectedExecutionHandler var7) { this(var1, var2, var3, var5, var6, Executors.defaultThreadFactory(), var7); } public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) { this.ctl = new AtomicInteger(ctlOf(-536870912, 0)); this.mainLock = new ReentrantLock(); this.workers = new HashSet(); this.termination = this.mainLock.newCondition(); if(var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) { if(var6 != null && var7 != null && var8 != null) { this.corePoolSize = var1; this.maximumPoolSize = var2; this.workQueue = var6; this.keepAliveTime = var5.toNanos(var3); this.threadFactory = var7; this.handler = var8; } else { throw new NullPointerException(); } } else { throw new IllegalArgumentException(); } }
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:线程池最大线程数,一般你用不到,当大于了这个值就会将任务由一个丢弃处理机制来处理,但是当你调用newFixedThreadPool()的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中。
workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的线程属性为:workers,为一个HashSet;我们的Runnable内部被包装了一层,后面会看到这部分代码;这个队列默认是一个无界队列(你也可以设定一个有界队列),所以在生产者疯狂生产的时候,考虑如何控制的问题。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread()方法即可。
handler:也就是参数maximumPoolSize达到后丢弃处理的方法,Java提供了4种丢弃处理的方法:
1.CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程
2.DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。
3.DiscardPolicy:什么也不做
4.AbortPolicy:默认使用的,抛出一个异常RejectedExecutionException。
当然你也可以自己根据实际情况去重写,主要是要实现接口:RejectedExecutionHandler中的方法public void rejectedExecution(Runnabler, ThreadPoolExecutor e) 。
ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor之间的关系
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的。ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等。
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。
然后ThreadPoolExecutor继承了类AbstractExecutorService。
主要流程
通常我们得到线程池后,会调用其中的submit()或execute()去操作。execute()实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
而submit()是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()不同,它能够返回任务执行的结果,去看submit()的实现,会发现它实际上还是调用的execute(),只不过它利用了Future来获取任务执行结果。
下面就来看一下execute():
public void execute(Runnable var1) { if(var1 == null) { throw new NullPointerException(); } else { int var2 = this.ctl.get(); if(workerCountOf(var2) < this.corePoolSize) { if(this.addWorker(var1, true)) { return; } var2 = this.ctl.get(); } if(isRunning(var2) && this.workQueue.offer(var1)) { int var3 = this.ctl.get(); if(!isRunning(var3) && this.remove(var1)) { this.reject(var1); } else if(workerCountOf(var3) == 0) { this.addWorker((Runnable)null, false); } } else if(!this.addWorker(var1, false)) { this.reject(var1); } } }
这里大概的意思其实就是,先判断传递进来的Runnable 是否为空。
然后比较目前的线程池的大小和corePoolSize,如果比corePoolSize小调用addWorker(),反之,把传递进来的Runnable装进workQueue中。
当然有的步骤我没分析,我们了解主要的流程即可。
接下来我们看到addWorker():
private boolean addWorker(Runnable var1, boolean var2) { while(true) { int var3 = this.ctl.get(); int var4 = runStateOf(var3); if(var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) { return false; } while(true) { int var5 = workerCountOf(var3); if(var5 >= 536870911 || var5 >= (var2?this.corePoolSize:this.maximumPoolSize)) { return false; } if(this.compareAndIncrementWorkerCount(var3)) { boolean var18 = false; boolean var19 = false; ThreadPoolExecutor.Worker var20 = null; try { var20 = new ThreadPoolExecutor.Worker(var1); Thread var6 = var20.thread; if(var6 != null) { ReentrantLock var7 = this.mainLock; var7.lock(); try { int var8 = runStateOf(this.ctl.get()); if(var8 < 0 || var8 == 0 && var1 == null) { if(var6.isAlive()) { throw new IllegalThreadStateException(); } this.workers.add(var20); int var9 = this.workers.size(); if(var9 > this.largestPoolSize) { this.largestPoolSize = var9; } var19 = true; } } finally { var7.unlock(); } if(var19) { var6.start(); var18 = true; } } } finally { if(!var18) { this.addWorkerFailed(var20); } } return var18; } var3 = this.ctl.get(); if(runStateOf(var3) != var4) { break; } } } }
方法很长,我们仍然关注重点,其中有一段是这样的:
var20 = new ThreadPoolExecutor.Worker(var1); Thread var6 = var20.thread; …… var6.start();
我们看到new了一个Worker,并且把传递进来的Runnable对象var1作为自己构造函数的参数,Worker是ThreadPoolExecutor的一个内部类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { …… final Thread thread; Runnable firstTask; …… Worker(Runnable var2) { this.setState(-1); this.firstTask = var2; this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this); } …… public void run() { ThreadPoolExecutor.this.runWorker(this); } …… }
我们发现Worker其实就是一个Runnable,它的构造函数做了什么操作呢?
把传递进来的Runnable对象(就是我们最初调用execute方法传递进来的参数)用成员变量firstTask保存了。
然后通过threadFactory创建了一个新的线程,而我们此时的Worker对象,成了Thread构造函数中的Runnable对象了,并且把这个新建的线程用成员变量thread保存了。
那回到上面的代码:
var20 = new ThreadPoolExecutor.Worker(var1); Thread var6 = var20.thread; …… var6.start();
以上的var6其实就是新建的线程,而调用start(),其实调用的就是var20 的run(),我们看到Worker中的run()就是调用了ThreadPoolExecutor的runWorker()。饶了一圈,原来是这样。
我们继续看runWorker():
final void runWorker(ThreadPoolExecutor.Worker var1) { Thread var2 = Thread.currentThread(); Runnable var3 = var1.firstTask; var1.firstTask = null; var1.unlock(); boolean var4 = true; try { while(var3 != null || (var3 = this.getTask()) != null) { var1.lock(); if((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) { var2.interrupt(); } try { this.beforeExecute(var2, var3); Object var5 = null; try { var3.run(); } catch (RuntimeException var28) { var5 = var28; throw var28; } catch (Error var29) { var5 = var29; throw var29; } catch (Throwable var30) { var5 = var30; throw new Error(var30); } finally { this.afterExecute(var3, (Throwable)var5); } } finally { var3 = null; ++var1.completedTasks; var1.unlock(); } } var4 = false; } finally { this.processWorkerExit(var1, var4); } }
这里的重点就是先执行了最早在execute()传递进来的Runnable任务,即变量var3,保存在Worker的成员变量firstTask中的。
然后在While循环中,继续通过getTask()取任务来执行,如果getTask()返回为空就跳出循环。
我们来看getTask():
private Runnable getTask() { boolean var1 = false; while(true) { int var2 = this.ctl.get(); int var3 = runStateOf(var2); if(var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) { this.decrementWorkerCount(); return null; } int var4 = workerCountOf(var2); boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize; if(var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) { try { Runnable var6 = var5?(Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS):(Runnable)this.workQueue.take(); if(var6 != null) { return var6; } var1 = true; } catch (InterruptedException var7) { var1 = false; } } else if(this.compareAndDecrementWorkerCount(var2)) { return null; } } }
我们发现就是从workQueue队列中,也就是等待队列中获取一个元素出来并返回,整个getTask()在自旋下完成。
其中
workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS)表示的是如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null。
workQueue.take()表示的是如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
以上分析可以参考BlockingQueue接口的定义:
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element becomes available. * * @return the head of this queue * @throws InterruptedException if interrupted while waiting */ E take() throws InterruptedException; /** * Retrieves and removes the head of this queue, waiting up to the * specified wait time if necessary for an element to become available. * * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the * {@code timeout} parameter * @return the head of this queue, or {@code null} if the * specified waiting time elapses before an element is available * @throws InterruptedException if interrupted while waiting */ E poll(long timeout, TimeUnit unit) throws InterruptedException;
然后回到runWorker(),我们发现跳出while循环之后,最终会调用processWorkerExit():
private void processWorkerExit(ThreadPoolExecutor.Worker var1, boolean var2) { if(var2) { this.decrementWorkerCount(); } ReentrantLock var3 = this.mainLock; var3.lock(); try { this.completedTaskCount += var1.completedTasks; this.workers.remove(var1); } finally { var3.unlock(); } this.tryTerminate(); int var4 = this.ctl.get(); if(runStateLessThan(var4, 536870912)) { if(!var2) { int var5 = this.allowCoreThreadTimeOut?0:this.corePoolSize; if(var5 == 0 && !this.workQueue.isEmpty()) { var5 = 1; } if(workerCountOf(var4) >= var5) { return; } } this.addWorker((Runnable)null, false); } }
做的操作主要就是将废弃的Worker移出workers,workers之前介绍过,就是存储Worker的一个HashSet。然后通过tryTerminate()进行一些回收操作。
创建线程池的五类方法
Executors类中有几个创建线程池的静态方法:1.newFixedThreadPool():创建一个可重用的、具有固定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列。不过当线程池没有可执行任务时,也不会释放线程。
2.newSingleThreadExecutor():
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
3.newCachedThreadPool():
初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列。
和newFixedThreadPool()创建的线程池不同,newCachedThreadPool()在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。
4.newSingleThreadExecutor():
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。
5.newWorkStealingPool():创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争。
参考:
1.Java并发编程:线程池的使用
2.Java线程池架构(一)原理和源码解析
3.深入分析java线程池的实现原理
相关文章推荐
- Java -- 基于JDK1.8的ArrayList源码分析
- ArrayBlockingQueue源码分析(基于JDK1.8)
- ConcurrentHashMap源码分析(基于JDK1.8)
- ArrayList源码分析(基于JDK1.8)
- Java -- 基于JDK1.8的LinkedList源码分析
- HashMap源码分析-基于JDK1.8
- LinkedList源码分析(基于jdk1.8)
- ArrayList源码分析(基于JDK1.8)
- Java集合框架成员之ArrayList类的源码分析(基于JDK1.8版本)
- Java集合框架成员之LinkedList类的源码分析(基于JDK1.8版本)
- ConcurrentHashMap源码分析(基于JDK1.8)
- 长文慎入 HashMap 源码分析 基于 JDK 1.8
- Java集合框架成员之HashTable类的源码分析(基于JDK1.8版本)
- HashMap源码分析(基于JDK1.8)
- 【集合框架】JDK1.8源码分析之HashMap & LinkedHashMap迭代器(三)
- Stack源码探讨(基于JDK1.8)
- HashMap源码分析(基于jdk1.6)
- ConcurrentLinkedQueue源码分析(基于JDK1.8)
- HashMap源码分析——JDK1.8
- JDK1.8源码分析之HashMap