Jetty源码分析之线程池:QueuedThreadPool
2017-12-24 22:10
961 查看
前面分析Jetty整体架构的时候介绍过Jetty的三大组件:Acceptor、Handler和ThreadPool;前两者工作的时候都是需要线程的,而所需的线程正是从ThreadPool中获取的。这篇文件就是来分析ThreadPool的一个具体实现:QueuedThreadPool。下面是它的类图:
继承了父类AbstractLifeCycle之后,QueuedThreadPool就可以当成一个LifeCycle类型的组件管理,这个父类在前面介绍生命周期的时候已经介绍过了,这里就不重复介绍了。
ThreadPool是一个接口,里面定义了一些操作和获取线程信息的方法,它的完整定义如下:
上面的几个方法定义都很简单,没有什么很难理解的方法。
SizedThreadPool是ThreadPool的一个内部接口,它在ThreadPool的基础上增加了数量限制,可以认为是一个数量有限的线程池,可以指定这个线程池中的最小和最大线程数。下面是其完整定义,可以看到也很简单。
Executor是java.util.concurrent包下的一个接口,接口中只有一个方法如下:
execute()方法接受一个Runnable类型的对象,然后负责在一个线程中执行这个task。至于这个线程就是当前调用execute()方法的线程还是另外分配的一个线程,都是由具体实现的子类决定的。这种方式的好处在于调用者不用再自己创建线程,线程的管理完全都有Executor的子类负责。显然QueuedThreadPool是很适合这种场景的。
上面分析完继承的父类之后就发现定义都比较简单,没有什么特别难理解的方法或者调用关系。所以下面来分析QueuedThreadPool中是如何实现这几个父类中的方法的。
1) doStart()方法
doStart()方法时在容器启动的时候就会被调用的一个方法,QueuedThreadPool和其它组件一样在这个方法中进行初始化。
上面这段代码逻辑很简单,需要注意的几个点如下:
QueuedThreadPool中有两个属性来记录当前已启动和空闲的线程数,它们都是线程安全的AtomicInteger类型变量,如下:
ArrayBlockingQueue是java.util.concurrent包下用数组实现的阻塞队列,性能不是很好。而BlockingArrayQueue是jetty内部基于循环数据实现的一个阻塞队列,性能会好一点。
下面来看下startThread(int threads)方法中的逻辑:
整个方法就是新建一个线程并启动然后将其加入到线程池_threads中。_threads是QueuedThreadPool中用来存储线程对象的容器,是一个无限容量的队列。
下面来重点关注下创建线程的时候,传递给线程的Runnable对象_runnable中的逻辑。这个对象的定义如下:
首先可以看到线程池和外部没有直接进行交互(并不是从线程池中取出Thread对象给外部使用);而是通过_jobs这么个阻塞队列和外部进行交互,具体来说,外部有任务需要安排一个线程执行的时候就将任务加入到队列中(如果队列已满则会添加失败),而线程池中的线程会每隔一段时间就检查一次任务队列,如果有任务需要执行则会取出进行执行(job循环)。对于队列中没有任务需要处理的情况,可以通过设置_maxIdleTimeMs的值来控制线程的表现:如果_maxIdleTimeMs的值小于0,则线程会一直阻塞在_jobs.take()方法上;如果_maxIdleTimeMs的值大于0,则会先检查是否可以收缩线程池(检查的标准就是上次收缩的时间到目前要大于_maxIdleTimeMs并且当前启动的线程数目大于_minThreads),如果可以收缩则当前线程会被从线程池中移除,如果不可以则当前线程会在_jobs.poll()方法上阻塞_maxIdleTimeMs时间,如果在这段时间里这个方法返回一个job,则进行入job循环,否则继续上面的循环。
2) dispatch()和execute()方法
这两个方法都是线程池对外提供的执行方法,接受的参数都是一个Runnable对象。实际上execute()方法是通过dispatch()方法实现的:
可以看到是直接调用的dispatch()方法。dispatch()方法的源码如下:
3) setMinThreads()方法
最后看一下setMinThreads()方法:
这个方法就几行代码,值得注意的是在启动之后可以通过扩大_minThreads的值来实现线程池的动态扩大。
上面几个方法分析完,QueuedThreadPool也就算分析完了,说实话QueuedThreadPool的逻辑比前面那些Handler简单多了,所以分析源码也轻松很多。不过虽然QueuedThreadPool的逻辑很简单,但是并发性能可是很不错的。
继承了父类AbstractLifeCycle之后,QueuedThreadPool就可以当成一个LifeCycle类型的组件管理,这个父类在前面介绍生命周期的时候已经介绍过了,这里就不重复介绍了。
ThreadPool是一个接口,里面定义了一些操作和获取线程信息的方法,它的完整定义如下:
public interface ThreadPool { //将传入的任务进行分派 public abstract boolean dispatch(Runnable job); /** * Blocks until the thread pool is {@link LifeCycle#stop stopped}. */ public void join() throws InterruptedException; //返回当前线程池中的总线程数目 public int getThreads(); //返回线程池中空闲的线程数量 public int getIdleThreads(); /** * @return True if the pool is low on threads */ public boolean isLowOnThreads();
上面的几个方法定义都很简单,没有什么很难理解的方法。
SizedThreadPool是ThreadPool的一个内部接口,它在ThreadPool的基础上增加了数量限制,可以认为是一个数量有限的线程池,可以指定这个线程池中的最小和最大线程数。下面是其完整定义,可以看到也很简单。
public interface SizedThreadPool extends ThreadPool { public int getMinThreads();//获取限制的最小线程数量 public int getMaxThreads(); //获取限制的最大线程数量 public void setMinThreads(int threads);//设置最小线程数量 public void setMaxThreads(int threads);//设置最大线程数量 }
Executor是java.util.concurrent包下的一个接口,接口中只有一个方法如下:
public interface Executor { void execute(Runnable command); }
execute()方法接受一个Runnable类型的对象,然后负责在一个线程中执行这个task。至于这个线程就是当前调用execute()方法的线程还是另外分配的一个线程,都是由具体实现的子类决定的。这种方式的好处在于调用者不用再自己创建线程,线程的管理完全都有Executor的子类负责。显然QueuedThreadPool是很适合这种场景的。
上面分析完继承的父类之后就发现定义都比较简单,没有什么特别难理解的方法或者调用关系。所以下面来分析QueuedThreadPool中是如何实现这几个父类中的方法的。
1) doStart()方法
doStart()方法时在容器启动的时候就会被调用的一个方法,QueuedThreadPool和其它组件一样在这个方法中进行初始化。
@Override protected void doStart() throws Exception { //super.doStart()会调用AbstractLifeCycle的doStart()方法, //而那个方法中是没有做任何事情的。 super.doStart(); //将启动的线程数设置为0 _threadsStarted.set(0); //_jobs是QueuedThreadPool用来存储传入Runnable对象的数据结构,可以在jetty的xml配置文件中指定具体类型,如果没有指定则会根据_maxQueued属性的值来选择,具体如下。 if (_jobs==null) { _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued) :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads); } //如果当前启动的线程数小于设定的最小值,则不断启动新的线程, //直到线程池中有_minThreads个线程 int threads=_threadsStarted.get(); while (isRunning() && threads<_minThreads) { startThread(threads);//startThread方法负责启动新线程 threads=_threadsStarted.get(); } }
上面这段代码逻辑很简单,需要注意的几个点如下:
QueuedThreadPool中有两个属性来记录当前已启动和空闲的线程数,它们都是线程安全的AtomicInteger类型变量,如下:
private final AtomicInteger _threadsStarted = new AtomicInteger(); //记录已启动的线程数量 private final AtomicInteger _threadsIdle = new AtomicInteger(); //记录空闲的线程数量
ArrayBlockingQueue是java.util.concurrent包下用数组实现的阻塞队列,性能不是很好。而BlockingArrayQueue是jetty内部基于循环数据实现的一个阻塞队列,性能会好一点。
下面来看下startThread(int threads)方法中的逻辑:
private boolean startThread(int threads) { final int next=threads+1; //原子变量的先比较后更新操作,如果比较失败,说明有其它线程在并发操作线程池,这种情况下如果不返还则会导致_threadsStarted记录的启动线程数目出错。如果将整个startThread()方法都进行加锁是可以避免这种情况的,但是那样的话会极大的降低并发性。 if (!_threadsStarted.compareAndSet(threads,next)) return false; boolean started=false; try { Thread thread=newThread(_runnable); thread.setDaemon(_daemon); thread.setPriority(_priority); thread.setName(_name+"-"+thread.getId()); _threads.add(thread); thread.start(); started=true; } finally { if (!started) _threadsStarted.decrementAndGet(); } return started; }
整个方法就是新建一个线程并启动然后将其加入到线程池_threads中。_threads是QueuedThreadPool中用来存储线程对象的容器,是一个无限容量的队列。
private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
下面来重点关注下创建线程的时候,传递给线程的Runnable对象_runnable中的逻辑。这个对象的定义如下:
private Runnable _runnable = new Runnable() { public void run() { boolean shrink=false; try { //从任务队列中取出一个任务来,poll()是非阻塞的操作,没有元素时返回null Runnable job=_jobs.poll(); while (isRunning()) { // 如果任务队列中一直有任务,则不断取出其中的任务执行 while (job!=null && isRunning()) { runJob(job);//其实就是调用job.run()方法 job=_jobs.poll(); } // 任务队列中没有任务时的空闲循环 try { //增加空闲线程的数量 _threadsIdle.incrementAndGet(); while (isRunning() && job==null) { //将空闲等待时间的设为不大于0的情况下(默认是1分钟),则当前线程会一直阻塞在任务队列的take()操作上. if (_maxIdleTimeMs<=0) job=_jobs.take(); else { // 下面是是否要收缩线程池的判断 final int size=_threadsStarted.get();//目前以穷的线程数 if (size>_minThreads) { long last=_lastShrink.get(); //上次进行收缩线程池操作的时间,未进行过则为0 long now=System.currentTimeMillis(); if (last==0 || (now-last)>_maxIdleTimeMs) { //最后设置_lastShrink和_threadsStarted的数目,并且使用的都是原子变量的compareAndSet类型操作,防止并发修改的问题。 shrink=_lastShrink.compareAndSet(last,now) && _threadsStarted.compareAndSet(size,size-1); //如果确实可以收缩 if (shrink) return; } } //如果不满足收缩的条件,则通过_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS)方法阻塞_maxIdleTimeMs,如果再次期间拿到job则返回job循环,否则再进行一次空闲循环的检查。 job=idleJobPoll(); } } } finally //移除成功,减少空闲线程计数 _threadsIdle.decrementAndGet(); } } } catch(InterruptedException e) { LOG.ignore(e); } catch(Exception e) { LOG.warn(e); } finally {//进入这个finally有两种情况,第一种是收缩线程池了(shrink为true),另一种是异常退出了或改变服务器状态退出了,这种情况下需要减少已启动线程计数。 if (!shrink) _threadsStarted.decrementAndGet(); //直接将当前线程对象从线程池中移除,随后会被垃圾回收。 _threads.remove(Thread.currentThread()); } } };
首先可以看到线程池和外部没有直接进行交互(并不是从线程池中取出Thread对象给外部使用);而是通过_jobs这么个阻塞队列和外部进行交互,具体来说,外部有任务需要安排一个线程执行的时候就将任务加入到队列中(如果队列已满则会添加失败),而线程池中的线程会每隔一段时间就检查一次任务队列,如果有任务需要执行则会取出进行执行(job循环)。对于队列中没有任务需要处理的情况,可以通过设置_maxIdleTimeMs的值来控制线程的表现:如果_maxIdleTimeMs的值小于0,则线程会一直阻塞在_jobs.take()方法上;如果_maxIdleTimeMs的值大于0,则会先检查是否可以收缩线程池(检查的标准就是上次收缩的时间到目前要大于_maxIdleTimeMs并且当前启动的线程数目大于_minThreads),如果可以收缩则当前线程会被从线程池中移除,如果不可以则当前线程会在_jobs.poll()方法上阻塞_maxIdleTimeMs时间,如果在这段时间里这个方法返回一个job,则进行入job循环,否则继续上面的循环。
2) dispatch()和execute()方法
这两个方法都是线程池对外提供的执行方法,接受的参数都是一个Runnable对象。实际上execute()方法是通过dispatch()方法实现的:
public void execute(Runnable job) { if (!dispatch(job)) throw new RejectedExecutionException(); }
可以看到是直接调用的dispatch()方法。dispatch()方法的源码如下:
public boolean dispatch(Runnable job) { if (isRunning()) { final int jobQ = _jobs.size(); final int idle = getIdleThreads(); if(_jobs.offer(job)) //offer是非阻塞操作,如果底层队列空间不够,则立即返回false { //如果没有空闲线程或当前队列中等待的任务数大于空闲的线程数,并且线程池容量还没达到_maxThreads的时候会新增一个处理线程。 if (idle==0 || jobQ>idle) { int threads=_threadsStarted.get(); if (threads<_maxThreads) startThread(threads); } return true; } } LOG.debug("Dispatched {} to stopped {}",job,this); return false; }
3) setMinThreads()方法
最后看一下setMinThreads()方法:
public void setMinThreads(int minThreads) { _minThreads=minThreads; if (_minThreads>_maxThreads) _maxThreads=_minThreads; int threads=_threadsStarted.get(); while (isStarted() && threads<_minThreads) { startThread(threads); threads=_threadsStarted.get(); } }
这个方法就几行代码,值得注意的是在启动之后可以通过扩大_minThreads的值来实现线程池的动态扩大。
上面几个方法分析完,QueuedThreadPool也就算分析完了,说实话QueuedThreadPool的逻辑比前面那些Handler简单多了,所以分析源码也轻松很多。不过虽然QueuedThreadPool的逻辑很简单,但是并发性能可是很不错的。
相关文章推荐
- 线程池--jetty中QueuedThreadPool分析(一)
- jetty源码分析:QueuedThreadPool
- Jetty的线程池实现QueuedThreadPool
- jetty的线程池实现QueuedThreadPool
- python threadpool 源码分析以及自己封装的简易版线程池
- java threadPool 线程池简单分析
- Dubbo线程池耗尽异常原理分析RejectedExecutionException:Thread pool is EXHAUSTED
- [图解tensorflow源码] 线程池模块分析 (CPU thread pool device)
- Quartz SimpleThreadPool的源码,一个简单的线程池的实现原理
- Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比
- Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比
- Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比
- 线程池ThreadPool及Task调度死锁分析
- Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现
- cartographer源码分析(10)-common-thread_pool.h
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池
- 【源码剖析】threadpool —— 基于 pthread 实现的简单线程池
- Android通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比
- Android 应用开发 之通过AsyncTask与ThreadPool(线程池)两种方式异步加载大量数据的分析与对比--转载
- mariadb 5.5 threadpool 源码分析