使用SPRING中的线程池ThreadPoolTaskExecutor实现JAVA并发
2016-10-11 11:22
711 查看
ThreadPoolTaskExecutor是一个spring的线程池技术,它是使用jdk中的java.util.concurrent.ThreadPoolExecutor进行实现。
ThreadPoolTaskExecutor的参数:
int corePoolSize:线程池维护线程的最小数量.
int maximumPoolSize:线程池维护线程的最大数量.
long keepAliveTime:空闲线程的存活时间.
TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值.
BlockingQueue<Runnable> workQueue:持有等待执行的任务队列.
RejectedExecutionHandler handler: 用来拒绝一个任务的执行,有两种情况会发生这种情况。
一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和;
二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。
ThredPoolTaskExcutor的处理流程:
当池子大小小于corePoolSize,就新建线程,并处理请求
当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
其会优先创建 CorePoolSiz 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSiz 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。
在spring中使用ThreadPoolTaskExecutor的配置:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201603/69c5a8ac3fa60e0848d784a6dd461da6.gif)
![](https://oscdn.geek-share.com/Uploads/Images/Content/201603/69c5a8ac3fa60e0848d784a6dd461da6.gif)
Reject策略预定义有四种:
(1)ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).
使用
一:不需要返回值的情况
1,初始化线程池
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setQueueCapacity(10000);
poolTaskExecutor.setCorePoolSize(5);
poolTaskExecutor.setMaxPoolSize(10);
poolTaskExecutor.setKeepAliveSeconds(5000);
poolTaskExecutor.initialize();
2,在线程池中执行某个线程
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
poolTaskExecutor.execute(new Thread(Objct...){...});
二:需要返回值的情况
1,初始化线程池poolTaskExecutor,同上
2,新建一个类,实现Callable接口
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
class GetFromDB implements Callable<User> {
private UserDao userDao;
private Long userId;
public GetFromDB(UserDao userDao, Long userId) {
this.userDao = userDao;
this.userId = userId;
}
public User call() throws DaoException {
User user = userDao.getUserById(userId);
return user;
}
}
3,用之前的GetFromDB类构造一个FutureTask类
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
FutureTask<User> dbtask = new FutureTask<User>(GetFromDB);
4,提交并执行
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
threadpool.submit(dbtask);
5,得到返回值
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
try {
User user = dbtask.get();
} catch (Exception e) {
if (e instanceof ExecutionException
&& ((ExecutionException) e).getCause() instanceof DaoException) {
throw (DaoException) ((ExecutionException) e).getCause();
} else {
其他处理方式
}
}
注:一旦调用了get()方法,如果线程还未产生返回值,则将阻塞get()方法,直到得到返回值。基于此,如果你想确保线程执行完后才执行下一步操作,即使你不想得到返回值也可以调用一下此方法。当然这与多线程的初衷不符。
ThreadPoolTaskExecutor的参数:
int corePoolSize:线程池维护线程的最小数量.
int maximumPoolSize:线程池维护线程的最大数量.
long keepAliveTime:空闲线程的存活时间.
TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值.
BlockingQueue<Runnable> workQueue:持有等待执行的任务队列.
RejectedExecutionHandler handler: 用来拒绝一个任务的执行,有两种情况会发生这种情况。
一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和;
二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。
ThredPoolTaskExcutor的处理流程:
当池子大小小于corePoolSize,就新建线程,并处理请求
当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
其会优先创建 CorePoolSiz 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSiz 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。
在spring中使用ThreadPoolTaskExecutor的配置:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201603/69c5a8ac3fa60e0848d784a6dd461da6.gif)
<!-- 异步线程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数 --> <property name="corePoolSize" value="3" /> <!-- 最大线程数 --> <property name="maxPoolSize" value="10" /> <!-- 队列最大长度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="25" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="300" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
![](https://oscdn.geek-share.com/Uploads/Images/Content/201603/69c5a8ac3fa60e0848d784a6dd461da6.gif)
Reject策略预定义有四种:
(1)ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).
使用
一:不需要返回值的情况
1,初始化线程池
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();
poolTaskExecutor.setQueueCapacity(10000);
poolTaskExecutor.setCorePoolSize(5);
poolTaskExecutor.setMaxPoolSize(10);
poolTaskExecutor.setKeepAliveSeconds(5000);
poolTaskExecutor.initialize();
2,在线程池中执行某个线程
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
poolTaskExecutor.execute(new Thread(Objct...){...});
二:需要返回值的情况
1,初始化线程池poolTaskExecutor,同上
2,新建一个类,实现Callable接口
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
class GetFromDB implements Callable<User> {
private UserDao userDao;
private Long userId;
public GetFromDB(UserDao userDao, Long userId) {
this.userDao = userDao;
this.userId = userId;
}
public User call() throws DaoException {
User user = userDao.getUserById(userId);
return user;
}
}
3,用之前的GetFromDB类构造一个FutureTask类
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
FutureTask<User> dbtask = new FutureTask<User>(GetFromDB);
4,提交并执行
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
threadpool.submit(dbtask);
5,得到返回值
Java代码
![](https://oscdn.geek-share.com/Uploads/Images/Content/201610/c47dc20d4fe3267bb3e7ca93c3491065.png)
try {
User user = dbtask.get();
} catch (Exception e) {
if (e instanceof ExecutionException
&& ((ExecutionException) e).getCause() instanceof DaoException) {
throw (DaoException) ((ExecutionException) e).getCause();
} else {
其他处理方式
}
}
注:一旦调用了get()方法,如果线程还未产生返回值,则将阻塞get()方法,直到得到返回值。基于此,如果你想确保线程执行完后才执行下一步操作,即使你不想得到返回值也可以调用一下此方法。当然这与多线程的初衷不符。
相关文章推荐
- 使用SPRING中的线程池ThreadPoolTaskExecutor实现JAVA并发
- 使用Spring中的线程池ThreadPoolTaskExecutor实现JAVA并发
- 使用SPRING中的线程池ThreadPoolTaskExecutor实现并发
- Spring的线程池ThreadPoolTaskExecutor使用案例
- 使用Spring ThreadPoolTaskExecutor实现多线程任务
- quartz 和 spring的 线程池 ThreadPoolTaskExecutor 使用
- Spring线程池配置使用---ThreadPoolTaskExecutor 配置
- 使用Spring ThreadPoolTaskExecutor实现多线程任务
- 通过线程池使用多线程并发:ThreadPoolTaskExecutor 的应用例子
- spring ThreadPoolTaskExecutor的线程池类实现多线程
- Spring教程____Spring线程池_ThreadPoolTaskExecutor的配置和使用
- 实现Spring整合线程池ThreadPoolTaskExecutor
- 使用SPRING中的线程池ThreadPoolTaskExecutor并且得到任务执行的结果
- java 线程池(ExecutorService与Spring配置threadPoolTaskExecutor)
- Java Executor并发框架(十四)Executor框架线程池使用原始方式实现生产者消费者模式
- spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue
- SPRING中的线程池ThreadPoolTaskExecutor
- 利用spring的线程池ThreadPoolTaskExecutor对多个库进行数据归档
- SPRING中的线程池ThreadPoolTaskExecutor
- SPRING中的线程池ThreadPoolTaskExecutor