您的位置:首页 > 产品设计 > UI/UE

Jetty源码分析之线程池:QueuedThreadPool

2017-12-24 22:10 961 查看
前面分析Jetty整体架构的时候介绍过Jetty的三大组件:Acceptor、Handler和ThreadPool;前两者工作的时候都是需要线程的,而所需的线程正是从ThreadPool中获取的。这篇文件就是来分析ThreadPool的一个具体实现:QueuedThreadPool。下面是它的类图:



继承了父类AbstractLifeCycle之后,QueuedThreadPool就可以当成一个LifeCycle类型的组件管理,这个父类在前面介绍生命周期的时候已经介绍过了,这里就不重复介绍了。

ThreadPool是一个接口,里面定义了一些操作和获取线程信息的方法,它的完整定义如下:

public interface ThreadPool
{
//将传入的任务进行分派
public abstract boolean dispatch(Runnable job);

/**
* Blocks until the thread pool is {@link LifeCycle#stop stopped}.
*/
public void join() throws InterruptedException;

//返回当前线程池中的总线程数目
public int getThreads();

//返回线程池中空闲的线程数量
public int getIdleThreads();

/**
* @return True if the pool is low on threads
*/
public boolean isLowOnThreads();


上面的几个方法定义都很简单,没有什么很难理解的方法。

SizedThreadPool是ThreadPool的一个内部接口,它在ThreadPool的基础上增加了数量限制,可以认为是一个数量有限的线程池,可以指定这个线程池中的最小和最大线程数。下面是其完整定义,可以看到也很简单。

public interface SizedThreadPool extends ThreadPool
{
public int getMinThreads();//获取限制的最小线程数量
public int getMaxThreads(); //获取限制的最大线程数量
public void setMinThreads(int threads);//设置最小线程数量
public void setMaxThreads(int threads);//设置最大线程数量
}


Executor是java.util.concurrent包下的一个接口,接口中只有一个方法如下:

public interface Executor {

void execute(Runnable command);
}


execute()方法接受一个Runnable类型的对象,然后负责在一个线程中执行这个task。至于这个线程就是当前调用execute()方法的线程还是另外分配的一个线程,都是由具体实现的子类决定的。这种方式的好处在于调用者不用再自己创建线程,线程的管理完全都有Executor的子类负责。显然QueuedThreadPool是很适合这种场景的。

上面分析完继承的父类之后就发现定义都比较简单,没有什么特别难理解的方法或者调用关系。所以下面来分析QueuedThreadPool中是如何实现这几个父类中的方法的。

1) doStart()方法

doStart()方法时在容器启动的时候就会被调用的一个方法,QueuedThreadPool和其它组件一样在这个方法中进行初始化。

@Override
protected void doStart() throws Exception
{
//super.doStart()会调用AbstractLifeCycle的doStart()方法,
//而那个方法中是没有做任何事情的。
super.doStart();
//将启动的线程数设置为0
_threadsStarted.set(0);
//_jobs是QueuedThreadPool用来存储传入Runnable对象的数据结构,可以在jetty的xml配置文件中指定具体类型,如果没有指定则会根据_maxQueued属性的值来选择,具体如下。
if (_jobs==null)
{
_jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
:new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
}

//如果当前启动的线程数小于设定的最小值,则不断启动新的线程,
//直到线程池中有_minThreads个线程
int threads=_threadsStarted.get();
while (isRunning() && threads<_minThreads)
{
startThread(threads);//startThread方法负责启动新线程
threads=_threadsStarted.get();
}
}


上面这段代码逻辑很简单,需要注意的几个点如下:

QueuedThreadPool中有两个属性来记录当前已启动和空闲的线程数,它们都是线程安全的AtomicInteger类型变量,如下:

private final AtomicInteger _threadsStarted = new       AtomicInteger(); //记录已启动的线程数量
private final AtomicInteger _threadsIdle = new AtomicInteger(); //记录空闲的线程数量


ArrayBlockingQueue是java.util.concurrent包下用数组实现的阻塞队列,性能不是很好。而BlockingArrayQueue是jetty内部基于循环数据实现的一个阻塞队列,性能会好一点。

下面来看下startThread(int threads)方法中的逻辑:

private boolean startThread(int threads)
{
final int next=threads+1;
//原子变量的先比较后更新操作,如果比较失败,说明有其它线程在并发操作线程池,这种情况下如果不返还则会导致_threadsStarted记录的启动线程数目出错。如果将整个startThread()方法都进行加锁是可以避免这种情况的,但是那样的话会极大的降低并发性。
if (!_threadsStarted.compareAndSet(threads,next))
return false;
boolean started=false;
try
{
Thread thread=newThread(_runnable);
thread.setDaemon(_daemon);
thread.setPriority(_priority);
thread.setName(_name+"-"+thread.getId());
_threads.add(thread);

thread.start();
started=true;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
return started;
}


整个方法就是新建一个线程并启动然后将其加入到线程池_threads中。_threads是QueuedThreadPool中用来存储线程对象的容器,是一个无限容量的队列。

private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();


下面来重点关注下创建线程的时候,传递给线程的Runnable对象_runnable中的逻辑。这个对象的定义如下:

private Runnable _runnable = new Runnable()
{
public void run()
{
boolean shrink=false;
try
{
//从任务队列中取出一个任务来,poll()是非阻塞的操作,没有元素时返回null
Runnable job=_jobs.poll();
while (isRunning())
{
// 如果任务队列中一直有任务,则不断取出其中的任务执行
while (job!=null && isRunning())
{
runJob(job);//其实就是调用job.run()方法
job=_jobs.poll();
}

// 任务队列中没有任务时的空闲循环
try
{
//增加空闲线程的数量
_threadsIdle.incrementAndGet();

while (isRunning() && job==null)
{
//将空闲等待时间的设为不大于0的情况下(默认是1分钟),则当前线程会一直阻塞在任务队列的take()操作上.
if (_maxIdleTimeMs<=0)
job=_jobs.take();
else
{
// 下面是是否要收缩线程池的判断
final int size=_threadsStarted.get();//目前以穷的线程数
if (size>_minThreads)
{
long last=_lastShrink.get(); //上次进行收缩线程池操作的时间,未进行过则为0
long now=System.currentTimeMillis();
if (last==0 || (now-last)>_maxIdleTimeMs)
{

//最后设置_lastShrink和_threadsStarted的数目,并且使用的都是原子变量的compareAndSet类型操作,防止并发修改的问题。                         shrink=_lastShrink.compareAndSet(last,now) &&
_threadsStarted.compareAndSet(size,size-1);
//如果确实可以收缩
if (shrink)
return;
}
}
//如果不满足收缩的条件,则通过_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS)方法阻塞_maxIdleTimeMs,如果再次期间拿到job则返回job循环,否则再进行一次空闲循环的检查。
job=idleJobPoll();
}
}
}
finally
//移除成功,减少空闲线程计数
_threadsIdle.decrementAndGet();
}
}
}
catch(InterruptedException e)
{
LOG.ignore(e);
}
catch(Exception e)
{
LOG.warn(e);
}
finally
{//进入这个finally有两种情况,第一种是收缩线程池了(shrink为true),另一种是异常退出了或改变服务器状态退出了,这种情况下需要减少已启动线程计数。
if (!shrink)
_threadsStarted.decrementAndGet();
//直接将当前线程对象从线程池中移除,随后会被垃圾回收。
_threads.remove(Thread.currentThread());
}
}
};


首先可以看到线程池和外部没有直接进行交互(并不是从线程池中取出Thread对象给外部使用);而是通过_jobs这么个阻塞队列和外部进行交互,具体来说,外部有任务需要安排一个线程执行的时候就将任务加入到队列中(如果队列已满则会添加失败),而线程池中的线程会每隔一段时间就检查一次任务队列,如果有任务需要执行则会取出进行执行(job循环)。对于队列中没有任务需要处理的情况,可以通过设置_maxIdleTimeMs的值来控制线程的表现:如果_maxIdleTimeMs的值小于0,则线程会一直阻塞在_jobs.take()方法上;如果_maxIdleTimeMs的值大于0,则会先检查是否可以收缩线程池(检查的标准就是上次收缩的时间到目前要大于_maxIdleTimeMs并且当前启动的线程数目大于_minThreads),如果可以收缩则当前线程会被从线程池中移除,如果不可以则当前线程会在_jobs.poll()方法上阻塞_maxIdleTimeMs时间,如果在这段时间里这个方法返回一个job,则进行入job循环,否则继续上面的循环。

2) dispatch()和execute()方法

这两个方法都是线程池对外提供的执行方法,接受的参数都是一个Runnable对象。实际上execute()方法是通过dispatch()方法实现的:

public void execute(Runnable job)
{
if (!dispatch(job))
throw new RejectedExecutionException();
}


可以看到是直接调用的dispatch()方法。dispatch()方法的源码如下:

public boolean dispatch(Runnable job)
{
if (isRunning())
{
final int jobQ = _jobs.size();
final int idle = getIdleThreads();
if(_jobs.offer(job)) //offer是非阻塞操作,如果底层队列空间不够,则立即返回false
{
//如果没有空闲线程或当前队列中等待的任务数大于空闲的线程数,并且线程池容量还没达到_maxThreads的时候会新增一个处理线程。
if (idle==0 || jobQ>idle)
{
int threads=_threadsStarted.get();
if (threads<_maxThreads)
startThread(threads);
}
return true;
}
}
LOG.debug("Dispatched {} to stopped {}",job,this);
return false;
}


3) setMinThreads()方法

最后看一下setMinThreads()方法:

public void setMinThreads(int minThreads)
{
_minThreads=minThreads;

if (_minThreads>_maxThreads)
_maxThreads=_minThreads;

int threads=_threadsStarted.get();
while (isStarted() && threads<_minThreads)
{
startThread(threads);
threads=_threadsStarted.get();
}
}


这个方法就几行代码,值得注意的是在启动之后可以通过扩大_minThreads的值来实现线程池的动态扩大。

上面几个方法分析完,QueuedThreadPool也就算分析完了,说实话QueuedThreadPool的逻辑比前面那些Handler简单多了,所以分析源码也轻松很多。不过虽然QueuedThreadPool的逻辑很简单,但是并发性能可是很不错的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池 源码 jetty