Java线程池的几种实现方式
2017-09-13 15:42
267 查看
线程池的实现方式是通过Executors类创建几种不同类型的线程池,常用的有newFixedThreadPool(int nThreads),构造方法如下:
第一个参数代表corePoolSize,第二个代表maximumPoolSize,第三个代表keepAliveTime存活时间,最后一个是用队列保存超出的任务。
具体代码:
ec.submit方法调用的是抽象类AbstractExecutorService中的submit方法
该方法创建一个未来任务FutureTask
Executors.callable方法如下:
未来任务FutureTask创建完毕之后, 执行execute(ftask)方法,调用ThreadPoolExecutor类。
会先执行if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)),因为目前线程数量还小于设定的核心数量。addIfUnderCorePoolSize(command)代码如下:
Worker是ThreadPoolExecutor一个内部类,将要执行的任务包装进Worker中,之后执行t.start(),调用Worker中的run方法。
执行一次后,进入while循环,getTask方法如下:
newFixedThreadPool创建的数量一直是poolSize 小于等于corePoolSize,所以从队列中获取下一条任务,如果没有则等待,workQueue.take();
下面开始讲newCachedThreadPool,该类的创建方式:Executors.newCachedThreadPool();
内部生成一个corePoolSize为0,poolSize 为整形最大数,存活时间为60s的线程池。他的执行任务方式和newFixedThreadPool一样,也是ec.submit。我们关心的是如何做到60s后自动回收的。主要在Worker中的getTask方法里。
getTask方法中,由于corePoolSize为0,所以直接走下面,获取超时时间keepAliveTime,然后从队列中Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);根据超时时间获取任务,超过时限则break。之后外面run方法的循环结束,线程关闭。
最后再讲一下定时及延时执行的线程newScheduledThreadPool,创建方式:ScheduledExecutorService ec = Executors.newScheduledThreadPool(int corePoolSize);
执行任务为:
10代表首次执行延时10秒执行,之后每2秒执行一次。接着看他们的实现原理。方法如下:
创建任务大体相同,将延时时间设置为time,将每次执行时间设置为period。之后执行delayedExecute(t);方法。
该方法比较目前的线程数量是否小于设置的核心数量,如果小于,则创建线程,否则加入队列。当我们第一次进入时,肯定是小于的。执行prestartCoreThread()方法。
该方法执行addIfUnderCorePoolSize()方法,和newFixedThreadPool相同,只不过传入的Runnable任务为null,表明只创建个线程被Worker包装,之后将该Runnable任务放入队列中。接着执行Worker的run方法,从getTask中获取任务。
这是workQueue.take()的实现类是DelayQueue队列,
先获取time,第一次设置的10秒,等待10s后返回开始执行任务。执行的run方法是ScheduledThreadPoolExecutor中的run方法。
runPeriodic()方法内容:
方法在runAndReset中执行完毕,之后获取每次间隔多少时间执行的参数period,将time设为time+p。这样当再次循环从队列中找任务的时候, long delay = first.getDelay(TimeUnit.NANOSECONDS);
就是等待2S后执行了。
以上就是newFixedThreadPool、newScheduledThreadPool、newCachedThreadPool的实现原理。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
第一个参数代表corePoolSize,第二个代表maximumPoolSize,第三个代表keepAliveTime存活时间,最后一个是用队列保存超出的任务。
具体代码:
ExecutorService ec = Executors.newFixedThreadPool(2); ec.submit(new Runnable(){ public void run() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } });
ec.submit方法调用的是抽象类AbstractExecutorService中的submit方法
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); FutureTask<Object> ftask = new FutureTask<Object>(task, null); execute(ftask); return ftask; }
该方法创建一个未来任务FutureTask
public FutureTask(Runnable runnable, V result) { sync = new Sync(Executors.callable(runnable, result)); }
Executors.callable方法如下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; }
未来任务FutureTask创建完毕之后, 执行execute(ftask)方法,调用ThreadPoolExecutor类。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); for (;;) { if (runState != RUNNING) { reject(command); return; } if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) return; if (workQueue.offer(command)) return; Runnable r = addIfUnderMaximumPoolSize(command); if (r == command) return; if (r == null) { reject(command); return; } // else retry } }
会先执行if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)),因为目前线程数量还小于设定的核心数量。addIfUnderCorePoolSize(command)代码如下:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
Worker是ThreadPoolExecutor一个内部类,将要执行的任务包装进Worker中,之后执行t.start(),调用Worker中的run方法。
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; // unnecessary but can help GC } } catch(InterruptedException ie) { // fall through } finally { workerDone(this); } }
执行一次后,进入while循环,getTask方法如下:
Runnable getTask() throws InterruptedException { for (;;) { switch(runState) { case RUNNING: { if (poolSize <= corePoolSize) // untimed wait if core return workQueue.take(); long timeout = keepAliveTime; if (timeout <= 0) // die immediately for 0 timeout return null; Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); if (r != null) return r; if (poolSize > corePoolSize) // timed out return null; // else, after timeout, pool shrank so shouldn't die, so retry break; } case SHUTDOWN: { // Help drain queue Runnable r = workQueue.poll(); if (r != null) return r; // Check if can terminate if (workQueue.isEmpty()) { interruptIdleWorkers(); return null; } // There could still be delayed tasks in queue. // Wait for one, re-checking state upon interruption try { return workQueue.take(); } catch(InterruptedException ignore) {} break; } case STOP: return null; default: assert false; } } }
newFixedThreadPool创建的数量一直是poolSize 小于等于corePoolSize,所以从队列中获取下一条任务,如果没有则等待,workQueue.take();
下面开始讲newCachedThreadPool,该类的创建方式:Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
内部生成一个corePoolSize为0,poolSize 为整形最大数,存活时间为60s的线程池。他的执行任务方式和newFixedThreadPool一样,也是ec.submit。我们关心的是如何做到60s后自动回收的。主要在Worker中的getTask方法里。
while (task != null || (task = getTask()) != null) { runTask(task); task = null; // unnecessary but can help GC } switch(runState) { case RUNNING: { if (poolSize <= corePoolSize) // untimed wait if core return workQueue.take(); long timeout = keepAliveTime; if (timeout <= 0) // die immediately for 0 timeout return null; Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); if (r != null) return r; if (poolSize > corePoolSize) // timed out return null; // else, after timeout, pool shrank so shouldn't die, so retry break; }
getTask方法中,由于corePoolSize为0,所以直接走下面,获取超时时间keepAliveTime,然后从队列中Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);根据超时时间获取任务,超过时限则break。之后外面run方法的循环结束,线程关闭。
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; // unnecessary but can help GC } } catch(InterruptedException ie) { // fall through } finally { workerDone(this); } }
最后再讲一下定时及延时执行的线程newScheduledThreadPool,创建方式:ScheduledExecutorService ec = Executors.newScheduledThreadPool(int corePoolSize);
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue()); }
执行任务为:
ec1.scheduleAtFixedRate(new Runnable(){ public void run() { System.out.println("i:"+j+" ********"); } },10, 2, TimeUnit.SECONDS);
10代表首次执行延时10秒执行,之后每2秒执行一次。接着看他们的实现原理。方法如下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); if (initialDelay < 0) initialDelay = 0; long triggerTime = now() + unit.toNanos(initialDelay); ScheduledFutureTask<?> t = new ScheduledFutureTask<Object>(command, null, triggerTime, unit.toNanos(period)); delayedExecute(t); return t; } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
创建任务大体相同,将延时时间设置为time,将每次执行时间设置为period。之后执行delayedExecute(t);方法。
private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
该方法比较目前的线程数量是否小于设置的核心数量,如果小于,则创建线程,否则加入队列。当我们第一次进入时,肯定是小于的。执行prestartCoreThread()方法。
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); }
该方法执行addIfUnderCorePoolSize()方法,和newFixedThreadPool相同,只不过传入的Runnable任务为null,表明只创建个线程被Worker包装,之后将该Runnable任务放入队列中。接着执行Worker的run方法,从getTask中获取任务。
Runnable getTask() throws InterruptedException { for (;;) { switch(runState) { case RUNNING: { if (poolSize <= corePoolSize) // untimed wait if core return workQueue.take(); long timeout = keepAliveTime; if (timeout <= 0) // die immediately for 0 timeout return null; Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); if (r != null) return r; if (poolSize > corePoolSize) // timed out return null; // else, after timeout, pool shrank so shouldn't die, so retry break; }
这是workQueue.take()的实现类是DelayQueue队列,
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } }
先获取time,第一次设置的10秒,等待10s后返回开始执行任务。执行的run方法是ScheduledThreadPoolExecutor中的run方法。
public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
runPeriodic()方法内容:
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isTerminating()))) { long p = period; if (p > 0) time += p; else time = now() - p; ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); }
方法在runAndReset中执行完毕,之后获取每次间隔多少时间执行的参数period,将time设为time+p。这样当再次循环从队列中找任务的时候, long delay = first.getDelay(TimeUnit.NANOSECONDS);
public long getDelay(TimeUnit unit) { long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); return d; }
就是等待2S后执行了。
以上就是newFixedThreadPool、newScheduledThreadPool、newCachedThreadPool的实现原理。
相关文章推荐
- Android 文本滚动效果的几种实现方式(一)
- Undo/Redo几种实现方式介绍(三)——保存操作方式
- 详解Android 进程间通信的几种实现方式
- 文件自删除的几种实现方式
- jqMobi基础(3)--jqMobi中实现header定义的几种方式
- ExtJS Grid tooltip的几种实现方式
- js实现页面跳转的几种方式 (转)
- js实现页面跳转的几种方式
- 负载均衡的几种实现方式
- Javascript实现页面跳转的几种方式分享
- C++回调机制的几种实现方式
- 快速排序算法的思想和几种实现方式
- session可以存储的方式有几种,如果程序采取分布式,怎么样实现session共享
- ListView的实现步骤及几种实现方式
- Ext.LoadMask遮罩的效果几种实现方式
- 单例模式几种实现方式
- 负载均衡的几种实现方式
- js实现继承的几种方式
- 事务有哪些特性?spring的事务管理有几种方式实现,如何实现?spring 中常用的两种事务配置方式以及事务的传播性、隔离级别
- (十六)java并发编程--线程的死锁解决方案(生产者和消费者几种实现方式)