您的位置:首页 > 编程语言 > Java开发

Java线程池的几种实现方式

2017-09-13 15:42 267 查看
线程池的实现方式是通过Executors类创建几种不同类型的线程池,常用的有newFixedThreadPool(int nThreads),构造方法如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}


第一个参数代表corePoolSize,第二个代表maximumPoolSize,第三个代表keepAliveTime存活时间,最后一个是用队列保存超出的任务。

具体代码:

ExecutorService ec = Executors.newFixedThreadPool(2);
ec.submit(new Runnable(){
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
});


ec.submit方法调用的是抽象类AbstractExecutorService中的submit方法

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
FutureTask<Object> ftask = new FutureTask<Object>(task, null);
execute(ftask);
return ftask;
}


该方法创建一个未来任务FutureTask

public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}


Executors.callable方法如下:

public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable  task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}


未来任务FutureTask创建完毕之后, 执行execute(ftask)方法,调用ThreadPoolExecutor类。

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
for (;;) {
if (runState != RUNNING) {
reject(command);
return;
}
if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
return;
if (workQueue.offer(command))
return;
Runnable r = addIfUnderMaximumPoolSize(command);
if (r == command)
return;
if (r == null) {
reject(command);
return;
}
// else retry
}
}


会先执行if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)),因为目前线程数量还小于设定的核心数量。addIfUnderCorePoolSize(command)代码如下:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}


Worker是ThreadPoolExecutor一个内部类,将要执行的任务包装进Worker中,之后执行t.start(),调用Worker中的run方法。

public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null; // unnecessary but can help GC
}
} catch(InterruptedException ie) {
// fall through
} finally {
workerDone(this);
}
}


执行一次后,进入while循环,getTask方法如下:

Runnable getTask() throws InterruptedException {
for (;;) {
switch(runState) {
case RUNNING: {
if (poolSize <= corePoolSize)   // untimed wait if core
return workQueue.take();

long timeout = keepAliveTime;
if (timeout <= 0) // die immediately for 0 timeout
return null;
Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
if (r != null)
return r;
if (poolSize > corePoolSize) // timed out
return null;
// else, after timeout, pool shrank so shouldn't die, so retry
break;
}

case SHUTDOWN: {
// Help drain queue
Runnable r = workQueue.poll();
if (r != null)
return r;

// Check if can terminate
if (workQueue.isEmpty()) {
interruptIdleWorkers();
return null;
}

// There could still be delayed tasks in queue.
// Wait for one, re-checking state upon interruption
try {
return workQueue.take();
} catch(InterruptedException ignore) {}
break;
}

case STOP:
return null;
default:
assert false;
}
}
}


newFixedThreadPool创建的数量一直是poolSize 小于等于corePoolSize,所以从队列中获取下一条任务,如果没有则等待,workQueue.take();

下面开始讲newCachedThreadPool,该类的创建方式:Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


内部生成一个corePoolSize为0,poolSize 为整形最大数,存活时间为60s的线程池。他的执行任务方式和newFixedThreadPool一样,也是ec.submit。我们关心的是如何做到60s后自动回收的。主要在Worker中的getTask方法里。

while (task != null || (task = getTask()) != null) {
runTask(task);
task = null; // unnecessary but can help GC
}
switch(runState) {
case RUNNING: {
if (poolSize <= corePoolSize)   // untimed wait if core
return workQueue.take();

long timeout = keepAliveTime;
if (timeout <= 0) // die immediately for 0 timeout
return null;
Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
if (r != null)
return r;
if (poolSize > corePoolSize) // timed out
return null;
// else, after timeout, pool shrank so shouldn't die, so retry
break;
}


getTask方法中,由于corePoolSize为0,所以直接走下面,获取超时时间keepAliveTime,然后从队列中Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);根据超时时间获取任务,超过时限则break。之后外面run方法的循环结束,线程关闭。

public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null; // unnecessary but can help GC
}
} catch(InterruptedException ie) {
// fall through
} finally {
workerDone(this);
}
}


最后再讲一下定时及延时执行的线程newScheduledThreadPool,创建方式:ScheduledExecutorService ec = Executors.newScheduledThreadPool(int corePoolSize);

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue());
}


执行任务为:

ec1.scheduleAtFixedRate(new Runnable(){
public void run() {
System.out.println("i:"+j+" ********");
}
},10, 2, TimeUnit.SECONDS);


10代表首次执行延时10秒执行,之后每2秒执行一次。接着看他们的实现原理。方法如下:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
if (initialDelay < 0) initialDelay = 0;
long triggerTime = now() + unit.toNanos(initialDelay);
ScheduledFutureTask<?> t =
new ScheduledFutureTask<Object>(command,
null,
triggerTime,
unit.toNanos(period));
delayedExecute(t);
return t;
}

ScheduledFutureTask(Runnable r, V result, long ns,  long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}


创建任务大体相同,将延时时间设置为time,将每次执行时间设置为period。之后执行delayedExecute(t);方法。

private void delayedExecute(Runnable command) {
if (isShutdown()) {
reject(command);
return;
}
// Prestart a thread if necessary. We cannot prestart it
// running the task because the task (probably) shouldn't be
// run yet, so thread will just idle until delay elapses.
if (getPoolSize() < getCorePoolSize())
prestartCoreThread();

super.getQueue().add(command);
}


该方法比较目前的线程数量是否小于设置的核心数量,如果小于,则创建线程,否则加入队列。当我们第一次进入时,肯定是小于的。执行prestartCoreThread()方法。

public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}


该方法执行addIfUnderCorePoolSize()方法,和newFixedThreadPool相同,只不过传入的Runnable任务为null,表明只创建个线程被Worker包装,之后将该Runnable任务放入队列中。接着执行Worker的run方法,从getTask中获取任务。

Runnable getTask() throws InterruptedException {
for (;;) {
switch(runState) {
case RUNNING: {
if (poolSize <= corePoolSize)   // untimed wait if core
return workQueue.take();

long timeout = keepAliveTime;
if (timeout <= 0) // die immediately for 0 timeout
return null;
Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
if (r != null)
return r;
if (poolSize > corePoolSize) // timed out
return null;
// else, after timeout, pool shrank so shouldn't die, so retry
break;
}


这是workQueue.take()的实现类是DelayQueue队列,

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay =  first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;

}
}
}
} finally {
lock.unlock();
}
}


先获取time,第一次设置的10秒,等待10s后返回开始执行任务。执行的run方法是ScheduledThreadPoolExecutor中的run方法。

public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}


runPeriodic()方法内容:

private void runPeriodic() {
boolean ok = ScheduledFutureTask.super.runAndReset();
boolean down = isShutdown();
// Reschedule if not cancelled and not shutdown or policy allows
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isTerminating()))) {
long p = period;
if (p > 0)
time += p;
else
time = now() - p;
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
// This might have been the final executed delayed
// task.  Wake up threads to check.
else if (down)
interruptIdleWorkers();
}


方法在runAndReset中执行完毕,之后获取每次间隔多少时间执行的参数period,将time设为time+p。这样当再次循环从队列中找任务的时候, long delay = first.getDelay(TimeUnit.NANOSECONDS);

public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}


就是等待2S后执行了。

以上就是newFixedThreadPool、newScheduledThreadPool、newCachedThreadPool的实现原理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: