线程池&任务
2016-06-02 14:28
726 查看
1.线程池
1.无限制创建线程的缺点
#1:线程生命周期的开销。线程的创建与关闭不是“免费”的。实际的开销依据不同的平台不一样,而且创建线程是需要时间的,会带来请求的延迟。如果没处理一个请求创建一个线程就会大量消耗计算资源。#2:资源消耗量。活动线程会消耗系统资源,尤其是内存,当可运行的线程多于可用的处理器数,线程则会空闲。大量的空闲线程占用更多的内存,给垃圾回收器带来压力,而且在竞争CPU资源时,会产生额外的开销。
#3:稳定性。限制可创建线程的数目。限制的数目依不同平台耳钉,同样受到JVM的启动参数、Thread的构造函数中请求的栈大小等因素的影响,以及底层操作系统线程的限制。
1.1.Executor框架
public interface Executor { void execute(Runnable command); }
Executor接口虽然简单,但它可以用于异步任务执行,而且支持很多不同类型的任务执行策略。
1.2.执行策略
将任务的提交与任务的执行体进行解耦,价值在于给定任务执行执行策略。1.3.创建线程池
类库提供了一个灵活的线程池实现和一些有用的预设配置。可以通过调用Executors的某个静态工厂方法来创建线程池:newFixThreadPool:创建给定长的线程池。每提交一个任务就创建一个线程,直到达到池的最大长度,这是线程池会保持长度不再变化,如果一个线程由于非预期Exception结束,线程池会补充一个新的线程。队列使用无限的LinkedBlockingQueue。
newCachedThreadPool:创建一个可缓存的线程池。如果当前线程池的长度超过处理的需求,则会灵活回收线程;如果线程不足以满足需求,则会灵活增添新的线程,而并不会对池的长度作任何限制。队列使用SynchronousQueue。
newSingleThreadExecutor:创建一个单线程的Executor,如果该线程异常结束,会有另一个取代它。队列使用无限的LinkedBlockingQueue。
newScheduledThreadPool:创建一个定长的线程池,而且支持定时以及周期性的任务执行。
1.4.任务与执行策略的隐性耦合
#1:依赖性任务。任务依赖时序、其他任务的结果与边界效应,隐性为执行策略带来了约束。#2:采用线程限制的任务。任务线程不安全时,Executor从一个单线程改为一个多线程线程池,就会失去线程安全性。
#3:对响应时间敏感的任务。将一些执行时间较长的任务提交到只有少量线程的线程池,线程池管理的服务响应性会被削弱。
#4:使用ThreadLocal。thread-local值的生命周期被线程的生命周期所限制,在池的某线程中使用才有意义,线程池的线程会发生回收、添加,这时Thread-loca值会一起回收。
NOTE:在线程池中任务依赖于其他任务的执行,此时俩个任务恰好在单线程的线程池中执行,有可能线程会发生无限期的阻塞,等待依赖任务的完成,依赖任务阻塞在等待队列里,这时就发生了线程饥饿死锁。
1.5.定制线程池的大小
线程池合理的长度取决于未来提交的任务类型和所部署系统的特征。线程池过大,那么线程对稀缺的CPU和内存资源竞争,会导致内存的高使用量,还可能会耗尽资源。
线程池过小,对于可用的资源没有很好利用,会对吞吐量造成损失。:
定义:
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率,[0,1]
W / C = 等待时间 / 计算时间
为了保持处理器达到期望的使用率,最优的池的大小等于:
N = Ncpu * Ucpu * ( 1 + W / C)
Note:
#1:Ncpu = Runtime.getRuntime().availableProcessors();
#2:计算密集型任务,一般分配Ncpu + 1 个线程的线程池来获得最优的利用率(防止某线程发生报错而暂停,刚好有“额外”的线程,来保证CPU周期不会中断工作)。
#3:对于I/O和其他阻塞操作的任务,不是所有的线程都会在所有的时间被调度,因此需要一个更大的池。
1.6.配置ThreadPoolExecutor
1.61.线程的创建与销毁/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}#1:核心池大小是目标的大小,所有核心线程并非立即开始,而已等到有任务提交的时刻,慢慢补充,除非调用{@Code prestartAllCoreThreads()}。
#2:池的大小等于核心池的大小,并且直到工作队列充满前,不会创建更多的线程,但不会超过{@Code maxinumPoolSize}。
#3:如果一个非核心池线程已经闲置时间超过存活时间,它将成为回收的候选名单。
#4:切忌勿设置核心池大小为0为达到所有工作线程都最终会销毁的目的。如果工作队列不是使用SynchronousQueue,会引起任务不会开始,因为工作队列没有充满,线程池不会创建新的线程去处理任务,可以设置{@Code allowCoreThreadTimeOut()}来使核心池的线程也加入超时销毁的范围内。
1.62.队列任务的管理
ThreadPoolExecutor允许提供BlockingQueue来持有等待执行的任务。任务队列有3种基本方法:无限队列、有限队列和同步移交。
#1:newFixThreadPool和newSingleThreadExecutor都是默认使用一个无限的LinkedBlockingQueue。任务提交速度超过线程池的处理速度,队列会无限制增加。
#2:稳妥的资源管理可使用有限队列,如ArrayBlockingQueue和有限的LinkedBlockingQueue以及PriorityBlockingQueue。对于一个有界队列,队列的长度和池的大小要一起调节。
#3:庞大或者无限的池,可使用SynchronousQueue,完全绕开队列,直接提交到工作者线程。newCachedThreadPool就使用了SynchronousQueue。
#4:LinkedBlockingQueue和ArrayBlockingQueue都是FIFO队列,任务执行顺序跟达到顺序保持一致,对执行顺序进行控制可以使用PriorityBlockingQueue。
#5:如果任务彼此独立,有限的线程池比较合理。如果任务存在相互依赖,为防止线程饥饿死锁,可使用无限的池进行避免。
1.63.饱和策略
#1:AbortPolicy:直接抛出RejectedExecutionException异常。默认使用。
#2:CallerRunsPolicy:只用调用者所在线程来运行任务。
#3:DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务。
#4:DiscardPolicy:不处理,直接丢弃新提交的任务。
1.64.线程池监听
可以继承ThreadPoolExecutor并重写beforeExecute、afterExecute、terminated方法。
线程出现异常,面临死亡时,会告之JVM自己会挂掉,设置异常捕获,可以帮助查找原因:
Thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error(t.getThreadGroup().getName() + "-" + t.getName() + "出错:", e);
}
});
2.任务
2.1.延迟的、并具有周期性的任务
Timer工具管理任务的延迟执行以及周期执行。但是Timer存在缺陷,应该考虑使用ScheduledThreadPoolExecutor作为代替。Timer的缺陷:
#1:只创建唯一的线程执行所有提交的TimerTask。如果一个TimerTask的执行比较耗时,会导致其他TimerTask的失效准确性的问题。例如TimerTask a每10ms执行一次,TimerTask b每40ms执行一次,b比较耗时,b完成后,a要么会连续被调用4次,要么完全丢失4次调用(取决于他是否按照固定频率或延迟进行调度)。
#2:如果TimerTask抛出未检出异常,而Timer线程并不捕获异常,这会终止timer线程,而且Timer也不会自我重新恢复线程的执行,误认为整个Timer取消了。因此已经提交的TimerTask不会被执行,新的TimerTask也不会被执行。
2.2.可携带结果的任务:Callable和Future
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }ExecutorService中所有submit方法都返回一个Future,因此可以将一个Runnable或Callable提交给executor,然后得到一个Future,用它可以重新获得任务执行的结果或者取消任务。
FutureTask实现了RunnableFuture,而RunnableFuture继承了Runnable和Callable接口。
2.21.Future取消任务
Future<Object> f = executor.submit(task); try { Object result = f.get(1000L, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // 下面任务会取消 } catch (InterruptedException e) { throw new Error(e.getCause()); } catch (ExecutionException e) { throw new Error(e.getCause()); } finally { // 如果已经结束了,无害的 f.cancel(true); }
2.3.异类并行任务的局限性
public class Image { private final ExecutorService executor = Executors.newFixedThreadPool(10); void readPage(final List<String> urls) { Callable<List<String>> task = new Callable<List<String>>() { @Override public List<String> call() throws Exception { List<String> ret = new ArrayList<>(); for (String url : urls) { String result = url; // result = download from url } return ret; } }; Future<List<String>> f = executor.submit(task); List<String> result; try { result = f.get(); for (String re : result) { // do something } } catch (InterruptedException e) { Thread.currentThread().interrupt();// 声明线程中断状态 f.cancel(true);// 取消任务 } catch (ExecutionException e) { throw new Error(e); } } }#1:代码的复杂度提高了,但是性能并没有有多大的提高,大量相互独立且同类的任务进行并发处理,会将程序的任务量分配到不同的任务中,才能有真正的提升。
#2:就算将每个url下载资源作为一个独立的task提交到线程池去执行,也并不能很好地提升性能。因为不同task的处理时间是不一样的,for-each去get时有可能前面的没有处理完,后面的处理完了,后面的task被阻塞了。
ExecutorCompletionService可以处理以上的问题:
public class Image { private final ExecutorCompletionService<Object> executor = new ExecutorCompletionService<>(Executors.newFixedThreadPool(10)); void readPage(final List<String> urls) { for (final String url : urls) { Callable<Object> task = new Callable<Object>() { @Override public String call() throws Exception { String result = url; // result = download from url return result; } }; executor.submit(task); } Future<Object> f; try { for (int i = 0; i < urls.size(); i++) { f = executor.take(); // do something } } catch (InterruptedException e) { Thread.currentThread().interrupt();// 声明线程中断状态 } } }#1:多个ExecutorCompletionService可以共享同一个Executor。
#2:记录下提交给CompletionService的任务的个数,然后计算出获得了多少个已完成的结果,即使使用的是共享的Executor,也能知晓什么时候批处理任务的所有结果全部获得。
#3:批处理任务时,获取结果时,优先返回已经处理完的结果。
相关文章推荐
- java-模拟tomcat服务器
- C#多线程学习之(四)使用线程池进行多线程的自动管理
- C#多线程之Thread中Thread.IsAlive属性用法分析
- c++线程池实现方法
- C语言实现支持动态拓展和销毁的线程池
- c++实现简单的线程池
- Java线程池的几种实现方法和区别介绍
- Android开发笔记之:如何安全中止一个自定义线程Thread的方法
- 深入java线程池的使用详解
- java thread start()和run()方法简析
- Java中Runnable和Thread的区别分析
- java中通用的线程池实例代码
- Android开发笔记之:Handler Runnable与Thread的区别详解
- Java多线程实现同时输出
- Java编程中线程池的基本概念和使用
- java多线程编程制作电子时钟
- java多线程入门知识及示例程序
- C#多线程之Thread中Thread.Join()函数用法分析
- C#线程处理系列之线程池中的I/O线程
- C#线程池操作方法