您的位置:首页 > 其它

线程池实现技术分析(concurrent)

2012-12-05 13:43 232 查看
一、线程池算法分析

        我使用的线程池可叫做线程执行器,它结合了线程池和通道技术,基于concurrent包实现,对其进行了一定的封装。其中采用了两种通道:一种是同步通道,另一种是有限链接队列。

        下面分析目前使用的线程池算法。

    1)线程池基本算法



    如上图所示,线程池有几个基本参数:默认线程数、最大线程数、最小线程数、通道大小(可选)。线程池中的所有线程使用同一个通道。当线程池启动时,会按照默认线程数创建线程。当请求到达时,请求线程会先判断线程池当前线程数是否小于最小线程数,如果是,则创建新线程执行,如果否,则放入通道中。如果通道放入失败,再判断当前线程数是否大于最大线程数,如果是,则执行阻塞行为,如果否,则创建新线程执行。

    2)同步通道线程池算法

    同步通道内部包含两个队列,即生产者队列和消费者队列。

    基本算法如下:当通讯接入请求到达时,会先判断线程池当前线程数是否小于最小线程数,如果是,则创建新线程执行,如果否,则放入通道中(即offer)。由于同步通道放入始终返回失败,因此,再判断当前线程数是否大于最大线程数,如果是,则执行阻塞行为,如果否,则创建新线程执行。

    线程池中所有线程都空闲时的算法如下:



如上图所示,当空闲线程取不到生产者任务时,算法如下:

(1)       创建一个空任务放入消费者队列中,同时自己持有该任务;

(2)       空闲线程等待该任务被取走(即在该任务对象上wait);

(3)       新的请求到达时,请求线程从消费者队列中将空任务取出,并将请求任务设置到空任务中;

(4)       请求线程通知任务对象(即对该任务对象notify);

(5)       空闲线程执行请求任务。

 

同步通道线程池在阻塞情况下的WaitWhenBlocked算法如下:



如上图所示,同步通道线程池的WaitWhenBlocked算法是:

(1)       将新任务加入生产者队列;

(2)       请求线程等待该任务被取走(即在该任务对象上wait);

(3)       当线程池中有空闲线程时,它会从生产者队列中取走该任务;

(4)       该线程通知任务对象(即对该任务对象notify);

(5)       请求线程则继续执行后续处理。

    3)有限链接队列线程池算法

基本算法如下:当通讯接入请求到达时,会先判断线程池当前线程数是否小于最小线程数,如果是,则创建新线程执行,如果否,则放入队列中。如果队列放满了,再判断当前线程数是否大于最大线程数,如果是,则执行阻塞行为,如果否,则创建新线程执行。

线程池中所有线程都空闲时的算法如下:



如上图所示,当空闲线程取不到生产者任务时,算法如下:

(1)       空闲线程等待Take锁(即在Take锁对象上wait);

(2)       当新的请求到达时,请求线程将请求任务放入队列中;

(3)       请求线程通知Take锁(即对Take锁对象notify);

(4)       空闲线程从队列中取走任务执行。

 

2)有限链接队列线程池算法

有限链接队列线程池在阻塞情况下的WaitWhenBlocked算法如下:



如上图所示,有限链接队列线程池的WaitWhenBlocked算法是:

(1)       请求线程等待队列能够接收新的任务(即在队列对象上wait);

(2)       当线程池有空闲线程时,空闲线程从队列中取走一个任务;

(3)       空闲线程通知队列对象;

(4)       请求线程将新任务放入队列中;

(5)       请求线程继续后续处理。

二、concurrent包线程池的代码分析

concurrent包线程池的代码实现可以分为两大关键部分:线程执行器和通道。

1)线程执行器
(1)主流程

线程执行器的核心实现为PooledExecutor。其主要执行流程如下:



(2)阻塞行为

      从以上流程可以看出,当线程池中的线程够用的时候都会从线程池中获取线程,但是当线程池中的线程耗尽的时候则不再从池中获取线程,而是执行BlockAction。线程池提供5种BlockAction:1)“run”,即立即执行;2)“wait”,即同步等待;3)“abort”,即退出;4)“discard”,即抛弃;5)“discardOldest”,即抛弃等待最久的。目前EAIB生产环境中BlockAction配置为默认值,即“run”。

RunWhenBlocked:BlockAction=“run”,会执行RunWhenBlocked的blockedAction。其实现为使用当前线程来执行后续处理过程。

WaitWhenBlocked:BlockAction=“wait”,会执行WaitWhenBlocked的blockedAction。其实现为往通道中放入一个元素,即调用通道的put方法。

DiscardWhenBlocked:BlockAction=“discard”,会执行DiscardWhenBlocked的blockedAction。其实现为不做任何处理,直接返回true。

AbortWhenBlocked:BlockAction=“abort”,会执行AbortWhenBlocked的blockedAction。其实现为抛出RuntimeException。

DiscardOldestWhenBlocked:BlockAction=“discardOldest”,会执行DiscardOldestWhenBlocked的blockedAction。其实现为从通道中删除最老的元素,然后放入一个新元素;如果放入失败,则使用当前线程执行后续处理过程。

      从以上几种BlockAction的分析可以看出,通道的实现不同,会引起WaitWhenBlocked和DiscardOldestWhenBlocked的处理也会不同。

2)通道

    线程池的通道采用接口设计。通道的接口如下:

public void put(Object item) throws InterruptedException;

put方法的功能是往通道中添加一个元素,如果通道不能接收,它会等待,直到通道能够接收为止。

public boolean offer(Object item, long msecs) throwsInt
4000
erruptedException;

  offer方法的功能是往通道中添加一个元素,如果通道不能接收,它会等待一段时间,直到通道能够接收或者达到超时时间。

public Object take() throws InterruptedException;

  take方法的功能是从通道中取出一个元素,如果通道中没有元素,它会等待,直到通道中有元素为止。

public Object poll(long msecs) throws InterruptedException;

  poll方法的功能是从通道中取出一个元素,如果通道中没有元素,它会等待一段时间,直到通道中有元素或者达到超时时间。

public Object peek();

  peek方法的功能是返回通道头部元素,但并不从通道中删除。

 

以上功能针对不同算法可有不同的实现。下面主要描述EAIB系统使用的两种通道。

(1)同步通道
同步通道的实现流程如下:

put



offer



take



poll



peek:返回null。

(2)有限链接队列

有限链接队列的实现流程如下:

put



offer



take



poll



peek:返回队列头部节点。如果头部节点为空,则返回null。

extract



insert

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: