您的位置:首页 > 其它

阿里面经之解答 by cmershen(4)——线程池

2016-07-06 16:49 615 查看
18.什么叫线程池?如何实现线程池?

因为新建一个线程的成本比较高,所以在大型系统中使用线程池可以很好的提高性能。线程池就是在系统启动的时候先创建大量的空闲线程,当该线程的run()或call()方法执行结束后,该线程不会立即死亡,而是在线程池中等待再次唤醒。这很适合大型系统中需要大量、重复、短暂地启动线程的操作。

除此之外,线程池技术可以有效的控制并发数量,以防线程开启过多时导致jvm崩溃。

介绍如何实现线程池之前,我们先了解一下如何使用线程池,线程池有哪些功能。

Java中有一个Executors工厂类,它提供了以下静态方法:

newFixedThreadPool(int nThreads): 创建一个固定的线程数的线程池。

newWorkStealingPool(int parallelism): 给定一个并行度,创建一个保持足够线程数以支持这个并行度的线程池,并使用多个队列来减少竞争。(jdk源码中原文:Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. 李刚《疯狂Java讲义》对这句话的翻译简直不知所云,不知道是不是谷歌翻译的。 )

newFixedThreadPool(int nThreads,ThreadFactory threadFactory): newFixedThreadPool(int nThreads)方法的重载,增加了ThreadFactory对象,代表从一个线程工厂里创建线程池。

newSingleThreadExecutor(),创建只有一个只有单线程的线程池。

newSingleThreadExecutor(ThreadFactory threadFactory):上述方法的重载。

newCachedThreadPool():创建一个具有缓存功能的线程池。

newSingleThreadScheduledExecutor():创建只有一个只有单线程的线程池,它可以在指定延迟过后执行线程。

newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟过后执行线程。

创建了不指定延迟的线程池之后,返回的是一个ExecutorService类型的对象。

ExecutorService是个接口,它里面主要有如下抽象方法:

*
void shutdown();
关闭线程池,禁止新的线程提交至线程池。

*
Future<?> submit(Runnable task);
提交一个任务。也可以提交Callable对象。

如果创建了指定延迟的线程池之后,返回的是ScheduledExecutorService类型的对象。类似地,这个接口(继承ExecutorService)里面有如下抽象方法:

*
schedule(Runnable command, long delay, TimeUnit unit);
指定一个任务在unit的delay倍延迟后执行。也可以替换成Callable。

*
scheduleAtFixedRate(Runnable command,   long initialDelay, long period,  TimeUnit unit);
指定一个任务在unit的delay倍延迟后执行,而且以period为周期重复执行。

那么用法就已经很明确了:先创建线程池,再调用submit将线程对象提交进线程池中,不需要提交线程了就调用shutdown()关闭线程池。

接下来我们看看jdk 1.8中是如何实现这些方法的。

先看最简单的固定线程池,它返回的是ThreadPoolExecutor对象。这个对象的构造器如下:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}


入口参数分别是:corePoolSize-线程池中要保持运行的线程数,maximumPoolSize-线程池中允许的最多线程个数(默认这两个值都是相等的),keepAliveTime,unit-当线程池已满的时候若有新线程提交,则新线程在队列里最多等待的时间(默认为0),workQueue-线程在执行前放在一个阻塞队列中等待。

接下来看submit():(submit的原型在父类AbstractExecutorService.java里面)

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}


RunnableFuture是Runnable和Future的封装,其实知乎里也有人问了为什么把他们两个封装起来,在这里本学妹也不想深究,反正2-3行代码的意思就是把任务封装进这么一个奇怪的类里面,然后调用execute方法执行这个任务。

再来看execute()方法,当然大家可能注意到了,它和构造器都是public的,我们亦可以直接构造ThreadPoolExecutor并调用execute()方法执行里面的线程。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread.  If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}


Doug Lea大仙已经解释了这段代码,分成三步,如果当前线程池中正在执行的线程数比corePoolSize少,就调用addWorker方法执行这个线程,否则将其加入阻塞队列等待。这里还有一个二次检查,因为Doug Lea也说到了,有可能这时恰好有一个线程结束了或者线程池已经shutdown了,在这种情况下就不应放入阻塞队列等待,应该roll back这次操作(
remove(command);
)第三步如果没放进队列中去,那么我们再提交一次这个任务,这次与maximumPoolSize比,如果还没提交上去,则reject这个任务。

那么重头戏来了,接下来就是封装在最底层的这个addWorker方法,一切都装在这个里面。

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


这个方法有两个参数,第一个不用说了,第二个是一个标志位,代表比较线程该不该加入线程池是与corePoolSize比还是与maximumPoolSize比。

这个函数首先检查各种状态(具体细节真没看懂),如果满足可以加入的条件,则跳出retry循环,进入函数的第二部分。第二部分首先将要执行的线程封装到Worker类里面,并线程同步地把要执行的线程加入线程池的workers属性里面(恩,忘说了,已经在执行的线程是用一个HashSet<Worker>维护的),并置标志位workerAdded=true;,再开启此线程。

这里有的读者可能要问了,我们研究到这里,只知道了在HashSet<Worker>里面放入线程,那执行完了应该从HashSet里面取出来,怎么没看见呢?别急。ThreadPoolExecutor的第950行这个
t.start();
里面还有文章呢!

t.start();
真正调用的是这个runWorker()方法:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}


可以观察到,task.run执行之后,会进入finally块,在最后一个finally块里面有一个processWorkerExit(w, completedAbruptly).在这个方法里面有如下片段:

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}


这里程序线程同步地修改已经完成的任务数目,并把当前线程从workers集合里移除。

沿着submit()我们分析了这么多,接下来我们开始研究shutdown().

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}


这个方法里面调用了四个方法,下面一一分析之。

checkShutdownAccess();
方法大概是检查操作系统是否允许JVM操作进程。

advanceRunState(SHUTDOWN);
方法将当前线程池的状态置为SHUTDOWN。

interruptIdleWorkers();
这个方法顾名思义,就是将线程池中等待调度的线程中断。

onShutdown();
该方法在这个类里面是空的,但在其子接口ScheduledThreadPoolExecutor中则用于取消处在延迟期的任务。

接下来就是ScheduledThreadPoolExecutor类了(其实我也没细看),这里面submit替换成了schedule,而schedule()的源码与submit()十分类似,只是execute()替换成了delayedExecute():

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}


我怀疑这个包并不是Doug Lea开发的,这个delayedExecute变得迷之简单,大概意思就是将任务加入队列,如果不满足一定条件则回滚,否则并确保可以执行即可……

shutdown()不用说了,直接一句
super.shutdown();
了事………………

本期就写了一道题,因为内容展开了很多,所以本学妹脑子已经炸了。。。。。。。。。。。。。。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池