您的位置:首页 > 编程语言 > Java开发

JAVA通过concurrent包实现线程池策略

2013-05-03 22:02 751 查看
使用线程池的好处?

答:1、减少在创建和销毁线程上所花的时间以及系统资源的开销。2、如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及”过度切换”。

为什么有任务拒绝的情况发生?

答:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现新任务的拒绝处理问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理

RejectedExecutionHandler提供了四种方式来处理任务拒绝策略:
1、不用线程池线程执行CallerRunsPolicy
2、直接丢弃任务       DiscardPolicy
3、丢弃队列中最旧任务DiscardOldestPolicy
4、抛出异常           AbortPolicy

如果在action中调用ThreadPoolManager.batchExecute会不会线程还没跑完就已经return页面了呢?

答:不会,Callable接口下的实现,会等到所有任务结束之后返回结果,比如20个任务,会等到花费时间最长的一个任务执行结果之后才return

ThreadPoolManager一般在哪初始化?

答:一般一个系统只初始化一次,放在实现了ServletContextListener接口的类中。

ThreadPoolExecutor

ThreadPoolExecutor参数: int corePoolSize,
                        
 int maximumPoolSize,
                        
 long keepAliveTime,
                         
TimeUnit unit,
                         
BlockingQueue<Runnable> workQueue,
                       ThreadFactory threadFactory
                       RejectedExecutionHandler handler
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的缓冲队列,即上文所指的任务队列。此队列仅保持由。execute - 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

package com.threadpool;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Description 线程池执行器实现,设计运行超时和线程拒绝的捕捉策略
* @author gshen
* @date 2013-3-30
*/
public class RetryThreadPoolExecutor extends ThreadPoolExecutor {
// 令牌锁
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tocken = tokenLock.newCondition();
// 等待令牌超时时间
private long timeout ;
// 线程拒绝处理器
private RejectedExecutionHandler rejectHandler;

/**
* 重试策略线程执行器
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param queueSize
* @param timeout
* @param rejectHandler
*/
public RetryThreadPoolExecutor( int corePoolSize, int maximumPoolSize,long keepAliveTime, int queueSize, long timeout, RejectedExecutionHandler rejectHandler )
{
/**
*  采用AbortPolicy策略
*  指定缓冲队列大小,FIFO策略
*/
super(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize), new ThreadPoolExecutor.AbortPolicy());
this.timeout = timeout;
this.rejectHandler = rejectHandler;
}

@Override
public void execute( Runnable task )
{
try
{
super.execute(task);

}catch( java.util.concurrent.RejectedExecutionException rejectException )
{
tokenLock.lock();
try {
// 等待令牌,如果超时,则放弃执行
if( tocken.await( timeout, TimeUnit.MILLISECONDS ) )
{
execute( task );
}else
{
if( null != rejectHandler )
{
rejectHandler.rejectedExecution( task, this );
}
throw rejectException;
}

} catch (InterruptedException e)
{
throw rejectException;
} finally
{
tokenLock.unlock();
}
}
}

@Override
protected void afterExecute( Runnable task, Throwable t )
{
super.afterExecute(task, t);

// 如果有线程正在等待令牌
tokenLock.lock();
try {
//唤醒一个等待线程,交给令牌
if( tokenLock.hasWaiters( tocken ) )
{
tocken.signal();
}

} finally
{
tokenLock.unlock();
}
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
{
return super.invokeAll( tasks, timeout, TimeUnit.MILLISECONDS );
}

public long getTimeout() {
return timeout;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

public RejectedExecutionHandler getRejectHandler() {
return rejectHandler;
}

public void setRejectHandler(RejectedExecutionHandler rejectHandler) {
this.rejectHandler = rejectHandler;
}
}


package com.threadpool;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Description 线程池配置类
* @author gshen
* @date 2013-3-30
*/
public class ThreadPoolConfig
{
// 线程池保持的最小的线程数
private int minPoolSize;

// 线程池中大于minmumPoolSize的线程保持活动的最长时间(单位:毫秒)
private long keepAliveTime;

// 线程池中正在执行的最大线程数
private int maxPoolSize;

// 线程池缓冲队列中等待执行的线程数
private int workQueueSize;

// 线程执行超时时间
private long timeout;

// 线程执行器
private ThreadPoolExecutorexecutor;

// 没有获得执行的线程捕捉器
private RejectedExecutionHandlerrejectedExecutionHandler;

public ThreadPoolConfig()
{

}

public int getMinPoolSize()
{
return minPoolSize;
}

public void setMinPoolSize(int _minPoolSize)
{
this.minPoolSize = _minPoolSize;
}

public int getMaxPoolSize()
{
return maxPoolSize;
}

public void setMaxPoolSize(int _maxPoolSize)
{
this.maxPoolSize = _maxPoolSize;
}

public long getKeepAliveTime()
{
return keepAliveTime;
}

public void setKeepAliveTime(long _keepAliveTime)
{
this.keepAliveTime = _keepAliveTime;
}

public int getWorkQueueSize()
{
return workQueueSize;
}

public void setWorkQueueSize(int _workQueueSize)
{
this.workQueueSize = _workQueueSize;
}

public long getTimeout()
{
return timeout;
}

public void setTimeout(long _timeout)
{
this.timeout = _timeout;
}

public ThreadPoolExecutor getExecutor()
{
return executor;
}

public void setExecutor(ThreadPoolExecutor _executor)
{
this.executor = _executor;
}

public RejectedExecutionHandler getRejectedExecutionHandler()
{
return rejectedExecutionHandler;
}

public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedHandler)
{
this.rejectedExecutionHandler = rejectedHandler;
}

}


package com.threadpool;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* @Description 线程池管理器
* @author gshen
* @date 2013-3-30
*/
public class ThreadPoolManager {

private final static Log log = LogFactory.getLog( ThreadPoolManager.class );

private static ThreadPoolExecutor poolExecutor = null;

private static final ReentrantLock serviceLock = new ReentrantLock();

/**
* 启动服务
*
*/
public static void initialize( ThreadPoolConfig config )
{
serviceLock.lock();
try
{
if( null == poolExecutor )
{
ThreadPoolExecutor exector = config.getExecutor();
if( null == exector )
{
exector = new RetryThreadPoolExecutor( config.getMinPoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(),
config.getWorkQueueSize(), config.getTimeout(), config.getRejectedExecutionHandler() );
}

poolExecutor = exector;
log.info("----------->ThreadPoolManager start service success..." );
}

}finally
{
serviceLock.unlock();
}
}

/**
* 执行一个线程
* @param task 待执行的线程
* @return
*/
public static boolean execute( Runnable task )
{
if( null == task )
{
throw new NullPointerException();
}

// 线程池服务未启动
if( null == poolExecutor )
{
throw new NullPointerException( "ThreadPoolManager service is not started..." );
}

try
{
poolExecutor.execute( task );

return true;
}catch( java.util.concurrent.RejectedExecutionException rejectException )
{

}

return false ;
}

/**
* 批量执行多个线程
*
* @param <T>
* @param tasks
* @return
* @throws InterruptedException
*/
public static <T> List<Future<T>> batchExecute(Collection<? extends Callable<T>> tasks) throws InterruptedException
{
if( null == tasks )
{
throw new NullPointerException();
}

// 线程池服务未启动
if( null == poolExecutor )
{
throw new NullPointerException( "ThreadPoolManager service is not started..." );
}

if( 0 < tasks.size() )
{
return poolExecutor.invokeAll( tasks );
}

return null;
}

/**
* 终止服务
*/
public static void shutdown()
{
serviceLock.lock();
try
{
if( null != poolExecutor )
{
poolExecutor.shutdown();
poolExecutor = null;

log.info("--------------->ThreadPoolManager stop service......");
}
}finally
{
serviceLock.unlock();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java 线程池 多线程