JAVA通过concurrent包实现线程池策略
2013-05-03 22:02
751 查看
使用线程池的好处?
答:1、减少在创建和销毁线程上所花的时间以及系统资源的开销。2、如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及”过度切换”。
为什么有任务拒绝的情况发生?
答:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现新任务的拒绝处理问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理
RejectedExecutionHandler提供了四种方式来处理任务拒绝策略:
1、不用线程池线程执行CallerRunsPolicy
2、直接丢弃任务 DiscardPolicy
3、丢弃队列中最旧任务DiscardOldestPolicy
4、抛出异常 AbortPolicy
如果在action中调用ThreadPoolManager.batchExecute会不会线程还没跑完就已经return页面了呢?
答:不会,Callable接口下的实现,会等到所有任务结束之后返回结果,比如20个任务,会等到花费时间最长的一个任务执行结果之后才return。
ThreadPoolManager一般在哪初始化?
答:一般一个系统只初始化一次,放在实现了ServletContextListener接口的类中。
ThreadPoolExecutor
ThreadPoolExecutor参数: int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
RejectedExecutionHandler handler
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的缓冲队列,即上文所指的任务队列。此队列仅保持由。execute - 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
答:1、减少在创建和销毁线程上所花的时间以及系统资源的开销。2、如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及”过度切换”。
为什么有任务拒绝的情况发生?
答:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现新任务的拒绝处理问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理
RejectedExecutionHandler提供了四种方式来处理任务拒绝策略:
1、不用线程池线程执行CallerRunsPolicy
2、直接丢弃任务 DiscardPolicy
3、丢弃队列中最旧任务DiscardOldestPolicy
4、抛出异常 AbortPolicy
如果在action中调用ThreadPoolManager.batchExecute会不会线程还没跑完就已经return页面了呢?
答:不会,Callable接口下的实现,会等到所有任务结束之后返回结果,比如20个任务,会等到花费时间最长的一个任务执行结果之后才return。
ThreadPoolManager一般在哪初始化?
答:一般一个系统只初始化一次,放在实现了ServletContextListener接口的类中。
ThreadPoolExecutor
ThreadPoolExecutor参数: int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
RejectedExecutionHandler handler
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的缓冲队列,即上文所指的任务队列。此队列仅保持由。execute - 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
package com.threadpool; import java.util.Collection; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @Description 线程池执行器实现,设计运行超时和线程拒绝的捕捉策略 * @author gshen * @date 2013-3-30 */ public class RetryThreadPoolExecutor extends ThreadPoolExecutor { // 令牌锁 private final ReentrantLock tokenLock = new ReentrantLock(); private final Condition tocken = tokenLock.newCondition(); // 等待令牌超时时间 private long timeout ; // 线程拒绝处理器 private RejectedExecutionHandler rejectHandler; /** * 重试策略线程执行器 * @param corePoolSize * @param maximumPoolSize * @param keepAliveTime * @param queueSize * @param timeout * @param rejectHandler */ public RetryThreadPoolExecutor( int corePoolSize, int maximumPoolSize,long keepAliveTime, int queueSize, long timeout, RejectedExecutionHandler rejectHandler ) { /** * 采用AbortPolicy策略 * 指定缓冲队列大小,FIFO策略 */ super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize), new ThreadPoolExecutor.AbortPolicy()); this.timeout = timeout; this.rejectHandler = rejectHandler; } @Override public void execute( Runnable task ) { try { super.execute(task); }catch( java.util.concurrent.RejectedExecutionException rejectException ) { tokenLock.lock(); try { // 等待令牌,如果超时,则放弃执行 if( tocken.await( timeout, TimeUnit.MILLISECONDS ) ) { execute( task ); }else { if( null != rejectHandler ) { rejectHandler.rejectedExecution( task, this ); } throw rejectException; } } catch (InterruptedException e) { throw rejectException; } finally { tokenLock.unlock(); } } } @Override protected void afterExecute( Runnable task, Throwable t ) { super.afterExecute(task, t); // 如果有线程正在等待令牌 tokenLock.lock(); try { //唤醒一个等待线程,交给令牌 if( tokenLock.hasWaiters( tocken ) ) { tocken.signal(); } } finally { tokenLock.unlock(); } } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return super.invokeAll( tasks, timeout, TimeUnit.MILLISECONDS ); } public long getTimeout() { return timeout; } public void setTimeout(long timeout) { this.timeout = timeout; } public RejectedExecutionHandler getRejectHandler() { return rejectHandler; } public void setRejectHandler(RejectedExecutionHandler rejectHandler) { this.rejectHandler = rejectHandler; } }
package com.threadpool; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @Description 线程池配置类 * @author gshen * @date 2013-3-30 */ public class ThreadPoolConfig { // 线程池保持的最小的线程数 private int minPoolSize; // 线程池中大于minmumPoolSize的线程保持活动的最长时间(单位:毫秒) private long keepAliveTime; // 线程池中正在执行的最大线程数 private int maxPoolSize; // 线程池缓冲队列中等待执行的线程数 private int workQueueSize; // 线程执行超时时间 private long timeout; // 线程执行器 private ThreadPoolExecutorexecutor; // 没有获得执行的线程捕捉器 private RejectedExecutionHandlerrejectedExecutionHandler; public ThreadPoolConfig() { } public int getMinPoolSize() { return minPoolSize; } public void setMinPoolSize(int _minPoolSize) { this.minPoolSize = _minPoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int _maxPoolSize) { this.maxPoolSize = _maxPoolSize; } public long getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(long _keepAliveTime) { this.keepAliveTime = _keepAliveTime; } public int getWorkQueueSize() { return workQueueSize; } public void setWorkQueueSize(int _workQueueSize) { this.workQueueSize = _workQueueSize; } public long getTimeout() { return timeout; } public void setTimeout(long _timeout) { this.timeout = _timeout; } public ThreadPoolExecutor getExecutor() { return executor; } public void setExecutor(ThreadPoolExecutor _executor) { this.executor = _executor; } public RejectedExecutionHandler getRejectedExecutionHandler() { return rejectedExecutionHandler; } public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedHandler) { this.rejectedExecutionHandler = rejectedHandler; } }
package com.threadpool; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * @Description 线程池管理器 * @author gshen * @date 2013-3-30 */ public class ThreadPoolManager { private final static Log log = LogFactory.getLog( ThreadPoolManager.class ); private static ThreadPoolExecutor poolExecutor = null; private static final ReentrantLock serviceLock = new ReentrantLock(); /** * 启动服务 * */ public static void initialize( ThreadPoolConfig config ) { serviceLock.lock(); try { if( null == poolExecutor ) { ThreadPoolExecutor exector = config.getExecutor(); if( null == exector ) { exector = new RetryThreadPoolExecutor( config.getMinPoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(), config.getWorkQueueSize(), config.getTimeout(), config.getRejectedExecutionHandler() ); } poolExecutor = exector; log.info("----------->ThreadPoolManager start service success..." ); } }finally { serviceLock.unlock(); } } /** * 执行一个线程 * @param task 待执行的线程 * @return */ public static boolean execute( Runnable task ) { if( null == task ) { throw new NullPointerException(); } // 线程池服务未启动 if( null == poolExecutor ) { throw new NullPointerException( "ThreadPoolManager service is not started..." ); } try { poolExecutor.execute( task ); return true; }catch( java.util.concurrent.RejectedExecutionException rejectException ) { } return false ; } /** * 批量执行多个线程 * * @param <T> * @param tasks * @return * @throws InterruptedException */ public static <T> List<Future<T>> batchExecute(Collection<? extends Callable<T>> tasks) throws InterruptedException { if( null == tasks ) { throw new NullPointerException(); } // 线程池服务未启动 if( null == poolExecutor ) { throw new NullPointerException( "ThreadPoolManager service is not started..." ); } if( 0 < tasks.size() ) { return poolExecutor.invokeAll( tasks ); } return null; } /** * 终止服务 */ public static void shutdown() { serviceLock.lock(); try { if( null != poolExecutor ) { poolExecutor.shutdown(); poolExecutor = null; log.info("--------------->ThreadPoolManager stop service......"); } }finally { serviceLock.unlock(); } } }
相关文章推荐
- Java通过Executors实现四种线程池
- Java通过开启线程池实现多线程
- JAVA通过反映方法实现C#的委托
- java通过模拟表单实现post跳转
- 线程池的原理及java实现
- java实现分段读取文件并通过HTTP上传的方法
- 设计模式(Design Pattern) - 行为型模式(Behavioral Pattern) - 策略模式(Strategy) - Java实现
- JCIFS--java通过域登录实现单点登录
- Java中通过Executors调用静态方法来提供四种线程池介绍
- JAVA通过反映方法实现C#的委托
- 通过JNI实现Java和C++的相互调用
- 通过使用java.lang.reflect.Proxy实现动态代理
- (源码实例)通过层DIV实现,当鼠标放在链接上面,显示图片及文字 - 流星絮语 JAVA学习笔记 - CSDNBlog
- 【Zookeeper】JAVA通过ZK实现服务注册和服务发现
- Java 线程池的原理与实现 (转)
- java 通过 socket 实现 服务器和客户端的通信 TCP
- Java通过SMS短信平台实现发短信功能
- java线程池系列(1)-ThreadPoolExecutor实现原理
- Java 线程池的原理与实现
- Java:多线程,线程池,使用CompletionService通过Future来处理Callable的返回结果