您的位置:首页 > 编程语言 > Java开发

java线程(6)——线程池(下)

2016-04-17 15:00 429 查看
上篇博客java线程(5)——线程池(上)介绍了线程池的基本知识,这篇博客我们介绍一下常用的ThreadPoolExecutor。

定义

类图关系:



ThreadPoolExecutor继承了AbstractExecutorService抽象类,而AbstractExecutorService实现了ExecutorService接口。

下面来看下ThreadPoolExecutor的代码:

public class ThreadPoolExecutor extends AbstractExecutorService {

//核心线程池的大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
private volatile int corePoolSize;

//线程池中线程的最大数量
private volatile int maximumPoolSize;

//Timeout时间,没有任务执行时最多保持多久时间之后终止。
private volatile long keepAliveTime;

//线程工厂,所有的线程都是通过他创建的。
private volatile ThreadFactory threadFactory;
......

//给定初始化参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
......
}


Tips:

Volatile关键字,意为“不稳定的,易变的”。它经常用在多线程中的类型修饰符,他的作用是确保某条指令或代码不会被省略,而且每次都是直接读取值,而不是使用备份数据。

线程池状态

线程池的几种状态基本上是级别层层递进的,区分的因素有:

是否接受新任务,

是否允许运行队列中的任务,

是否继续执行正在处理的任务等等。

//线程池能接受任务新任务,并且可以运行队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;

//不再接受新任务,但可继续运行队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//任务都不再执行,正在处理的任务要中断
private static final int STOP       =  1 << COUNT_BITS;
//终止
private static final int TIDYING    =  2 << COUNT_BITS;
//terminated()方法执行结束
private static final int TERMINATED =  3 << COUNT_BITS;


状态之间的转化关系图如下:



Worker类

说到线程池,我们经常会把线程池比作是一个工厂,而把线程比作工人。

在ThreadPoolExecutor类中,还真就有一个Worker类,先看一下他的定义。

private final class Worker  extends AbstractQueuedSynchronizer implements Runnable{
//正在运行
final Thread thread;
//初始运行的任务,可能为null
Runnable firstTask;
//前一个线程任务
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

//检查是否可以添加新的线程到当前的线程池中
private boolean addWorker(Runnable firstTask, boolean core) {

}
//创建线程时失败回滚
private void addWorkerFailed(Worker w) {
}

//移除任务
public boolean remove(Runnable task) {
}
......

}


分析:

Worker继承了AbstractQueuedSynchronizer,他是一个基于FIFO队列,也叫同步器。

Worker类实现了Runnable接口,说明他是一个线程类。此外,内部还提供了新增、移除任务等方法。那么什么时候可以新增什么时候又需要移除呢,就涉及到核心方法execute()了。

execute()

核心代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
//如果当前活动的线程数<核心池大小
if (workerCountOf(c) < corePoolSize) {
//创建线程处理任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果当前线程池状态为running,且成功加入指定队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次检查是否加入队列
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}


分析:

上面的代码由三个大的if块组成,前两个比较容易理解,第三个有点绕。

1、当前活动线程<核心池大小,继续调用addWorker创建线程处理任务。

2、前面说到Running状态可以接受新任务,运行队列中的任务。判断当前线程的状态是否为Running,如果是,并且任务成功加入队列,还需要进行二次检查,防止出现shut down现象。

3、如果当前线程状态不是Running或任务加入队列失败,则跳转到else if中,执行reject()方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池 线程