您的位置:首页 > 编程语言 > Java开发

Java ScheduledThreadPoolExecutor 一个优于Timer、TimerTask的定时任务

2014-05-07 23:51 513 查看
最近因为工作的需要,要用到Java的定时任务,JDK1.5之前的Timer和TimerTask类已经过时了,

特地从网上找了一些材料汇总了以下,方便以后的学习。

//删除其中一个任务

http://www.iteye.com/problems/78147

http://zhidao.baidu.com/link?url=ZMRiUrVtBUbXLZIc1kgpTms7QrJAn-122-plrZFmoywmzAGHrYkTkgfLqaZZ-x2WQfA86x6bWIA2rnsnDmhtqoUAOFYTCW_SfGItsgcKjg_

http://bbs.csdn.net/topics/380145101

/article/3775915.html

http://www.2cto.com/kf/201208/145943.html

以下是原文转载,转载自:http://hi.baidu.com/yanfei_nn/item/c73802014089319b03ce1b5f


Java 一个优于Timer的定时器——ScheduledThreadPoolExecutor

1. TimeUnit 时间单元

换算进制:

1) 可以快速实现时间格式转换

toNanos、toMicros、toMillis、toSeconds、toMinutes、toHours、toDays

2) 以指定格式快速实现线程停滞

sleep

TimeUnit.SECONDS.sleep(1); <=> Thread.sleep(1000) // 线程等待1秒

2. ScheduledThreadPoolExecutor

1) ScheduledThreadPoolExecutor是在JDK1.5以后提供可以实现定时、重复运行任务的功能,类似Timer,优于Timer

2) 构造方法

a) ScheduledThreadPoolExecutor(int corePoolSize)使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor

b)ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactorythreadFactory)使用给定的初始参数创建一个新对象

b可以提供线程创建工厂
3) 调度方法

a) schedule(Callable callable, long delay, TimeUnit unit); 延迟delay时间后开始执行callable

b) scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 延迟initialDelay时间后开始执行command,并且按照period时间周期性重复调用,如果command运行时间较长、可能会同时执行(周期时间包括command运行时间)

c) scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); 延迟initialDelay时间后开始执行command,并且按照period时间周期性重复调用,并且保证在上次运行完后才会执行下一次(周期时间不包括command运行时间)

4) 全局线程方法参数

a) setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) 如果value为true,则在主线程shutdown()方法后如果还没有执行(未达到delay的时间),则延迟任务仍然有机会执行,反之则不会执行,直接退出。(针对b、c方法)

b) setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) 如果value为true,则在主线程shutdown()方法后如果还没有执行(未达到delay的时间),则延迟任务仍然有机会执行,反之则不会执行,直接退出。(针对a方法)

3. ScheduledFuture

在执行调度之后返回该对象。它可以判断任务是否已经执行、是否已经取消、取消以后的操作

4. 操作实例
以下转载的原文出处:http://www.360doc.com/content/12/0916/20/820209_236455178.shtml

Timer计时器有管理任务延迟执行("如1000ms后执行任务")以及周期性执行("如每500ms执行一次该任务")。但是,Timer存在一些缺陷,因此你应该考虑使用ScheduledThreadPoolExecutor作为代替品,Timer对调度的支持是基于绝对时间,而不是相对时间的,由此任务对系统时钟的改变是敏感的;ScheduledThreadExecutor只支持相对时间。
Timer的另一个问题在于,如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为。Timer线程并不捕获异常,所以TimerTask抛出的未检查的异常会终止timer线程。这种情况下,Timer也不会再重新恢复线程的执行了;它错误的认为整个Timer都被取消了。此时,已经被安排但尚未执行的TimerTask永远不会再执行了,新的任务也不能被调度了。

例子:

package com.concurrent.basic;

import java.util.Timer;

import java.util.TimerTask;

public class TimerTest {

private Timer timer = new Timer();

// 启动计时器

public void lanuchTimer() {

timer.schedule(new TimerTask() {

public void run() {

throw new RuntimeException();

}

}, 1000 * 3, 500);

}

// 向计时器添加一个任务

public void addOneTask() {

timer.schedule(new TimerTask() {

public void run() {

System.out.println("hello world");

}

}, 1000 * 1, 1000 * 5);

}

public static void main(String[] args) throws Exception {

TimerTest test = new TimerTest();

test.lanuchTimer();

Thread.sleep(1000 * 5);// 5秒钟之后添加一个新任务

test.addOneTask();

}

}

执行结果:





你可能希望第二个没有异常的线程会一直运行下去,然而实际情况如程序所示5秒钟后就中止了,还伴随着一个异常,异常的消息是"Timer already cancelled"。ScheduledThreadPoolExector妥善地处理了这个异常的任务,所以说在java5.0或更高的JDK中,几乎没有理由再使用Timer了。

java5.0后提供

public interface ScheduledExecutorService

extends ExecutorService

一个
ExecutorService
,可安排在给定的延迟后运行或定期执行的命令。
schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。
而且不受时钟限制。
例子:
package com.concurrent.basic;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

public class ScheduledExecutorTest {

//线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间,int类型的参数是设定

//线程池中线程的最小数目。当任务较多时,线程池可能会自动创建更多的工作线程来执行任务

public ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(1);

//启动计时器

public void lanuchTimer(){

Runnable task = new Runnable() {

public void run() {

throw new RuntimeException();

}

};

scheduExec.scheduleWithFixedDelay(task, 1000*5, 1000*10, TimeUnit.MILLISECONDS);

}

//添加新任务

public void addOneTask(){

Runnable task = new Runnable() {

public void run() {

System.out.println("welcome to china");

}

};

scheduExec.scheduleWithFixedDelay(task, 1000*1, 1000, TimeUnit.MILLISECONDS);

}

public static void main(String[] args) throws Exception {

ScheduledExecutorTest test = new ScheduledExecutorTest();

test.lanuchTimer();

Thread.sleep(1000*5);//5秒钟之后添加新任务

test.addOneTask();

}

}
但是ScheduledThreadPoolExecutor也有不利的地方,就是只能按相对的时间的,而不能设置具体某个时刻之后执行,如每天晚上12点定时执行任务之类的要求使用Timer更合适,如果是周期性的重复工作可以考虑使用ScheduledThreadPoolExecutor。
Java定时任务ScheduledThreadPoolExecutor

2012-08-05 13:15:00

以前定时任务一直用Timer这个类,后来发现ScheduledThreadPoolExecutor功能更强大,我们先看个简单的示例,然后再看看API中的描述:

这个定时任务是我的项目中,每隔5分钟去检查设备的在线状态的。

[java]

public class CheckDeviceStateExcuter {

private static final Log log = LogFactory.getLog(CheckDeviceStateExcuter.class);

private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private static DeviceDao deviceDao = new DeviceDaoImpl();

private static List<DeviceDTO> devices = new ArrayList<DeviceDTO>();

// invoke DLL method to get the details of device

static JoymindCommDLLLib instance = JoymindCommDLLLib.INSTANCE;

// check states

public static void checkStart() {

final Runnable checker = new Runnable() {

public void run() {

System.out.println("check");

devices = deviceDao.queryDevices();

for(DeviceDTO device : devices){

String ip = device.getIp();

String id = "auto_refresh_" + ip;

String iniPath = XmlOperationTool.PLAYLIST_TEMPFILE_FOLDER + id

+ ".ini";

int flag = instance.GetSysInfo(ip, iniPath);

if(flag == 1){

// get ini file

SystemInfoDTO info = null;

try {

info = FileOperationTool.parseIni(iniPath);

device.setMacAdd(info.getMacAddress());

device.setIp(info.getIp());

device.setGateway(info.getGateway());

device.setOnlineState("在线");

device.setBootTime(info.getBootTime());

device.setDeviceVersion(info.getVersion());

device.setAvailableSpace(info.getFreedisk());

device.setpNo(info.getpNo());

device.setWidth(info.getWidth());

device.setHeight(info.getHeight());

device.setStorage(info.getStorage());

device.setTime(info.getTime());

device.setPrgTotal(info.getPrgTotal());

device.setPrgIndex(info.getPrgIndex());

device.setStatusNo(info.getStatus());

if (info.getStorage().equals("1")) {

device.setStorageName("FLASH存储");

}

if (info.getStorage().equals("2")) {

device.setStorageName("RAM存储");

}

if (info.getStorage().equals("3")) {

device.setStorageName("SD卡存储");

}

device.setCurrentPlaylist("");

device.setCurrentTemplate("");

device.setLastCommunicateTime("");

device.setCurrentTransferFileName("");

device.setCurrentTransferSpeed("");

device.setCurrentPercentage("");

device.setVolume("");

device.setAutoBootTime("");

device.setAutoShutdownTime("");

device.setPlayingVideo("");

device.setProgramUpdateTime("");

device.setProgramUpdateState("");

} catch (IOException e1) {

if (log.isErrorEnabled()) {

log.error(e1.getMessage());

}

e1.printStackTrace();

}

boolean addFlag = deviceDao.updateDevice(device);

if (addFlag) {

if (log.isDebugEnabled()) {

log.debug("auto update device "+device.getName()+" successfully");

}

} else {

if (log.isErrorEnabled()) {

log.error("auto update device failed !!!");

}

}

}else{

deviceDao.updateDevice(ip, "离线");

if (log.isDebugEnabled()) {

log.debug("auto update device "+device.getName()+" statue offline");

}

}

}

}

};

// 此处的checker是一个线程,1表示启动延迟1个单位开始执行这个线程,然后每隔5分钟执行一次,单位是分钟

final ScheduledFuture<?> checkerHandle = scheduler.scheduleAtFixedRate(checker, 1, 5, TimeUnit.MINUTES);

// 这里注释的地方是取消这个定时任务的,是在3600天后停止,因为我这里是检查设备的,所以除非当程序退出才自动停止这个定时任务

/*

scheduler.schedule(new Runnable() {

public void run() {

checkerHandle.cancel(true);

}

}, 60 * 60, TimeUnit.DAYS);

*/

}

/**

* @param args

*/

public static void main(String[] args) {

// TODO Auto-generated method stub

}

再看看API的具体信息,总之这是个非常实用的类

public class ScheduledThreadPoolExecutor

extends ThreadPoolExecutor

implements ScheduledExecutorService

ThreadPoolExecutor,它可另行安排在给定的延迟后运行命令,或者定期执行命令。需要多个辅助线程时,或者要求 ThreadPoolExecutor 具有额外的灵活性或功能时,此类要优于 Timer。

一旦启用已延迟的任务就执行它,但是有关何时启用,启用后何时执行则没有任何实时保证。按照提交的先进先出 (FIFO) 顺序来启用那些被安排在同一执行时间的任务。

虽然此类继承自 ThreadPoolExecutor,但是几个继承的调整方法对此类并无作用。特别是,因为它作为一个使用 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有什么效果。

扩展注意事项:此类重写 AbstractExecutorService 的 submit 方法,以生成内部对象控制每个任务的延迟和调度。若要保留功能性,子类中任何进一步重写的这些方法都必须调用超类版本,超类版本有效地禁用附加任务的定制。但是,此类提供替代受保护的扩展方法 decorateTask(为 Runnable 和 Callable 各提供一种版本),可定制用于通过 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay
进入的执行命令的具体任务类型。默认情况下,ScheduledThreadPoolExecutor 使用一个扩展 FutureTask 的任务类型。但是,可以使用下列形式的子类修改或替换该类型。

public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {

static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }

protected <V> RunnableScheduledFuture<V> decorateTask(

Runnable r, RunnableScheduledFuture<V> task) {

return new CustomTask<V>(r, task);

}

protected <V> RunnableScheduledFuture<V> decorateTask(

Callable<V> c, RunnableScheduledFuture<V> task) {

return new CustomTask<V>(c, task);

}

// ... add constructors, etc.

}

以下原文转载的出处:http://www.ticmy.com/?p=329

自jdk1.5开始,Java开始提供ScheduledThreadPoolExecutor类来支持周期性任务的调度,在这之前,这些工作需要依靠Timer/TimerTask或者其它第三方工具来完成。但Timer有着不少缺陷,如Timer是单线程模式,调度多个周期性任务时,如果某个任务耗时较久就会影响其它任务的调度;如果某个任务出现异常而没有被catch则可能导致唯一的线程死掉而所有任务都不会再被调度。ScheduledThreadPoolExecutor解决了很多Timer存在的缺陷。

先来看看ScheduledThreadPoolExecutor的实现模型,它通过继承ThreadPoolExecutor来重用线程池的功能,里面做了几件事情:

为线程池设置了一个DelayedWorkQueue,该queue同时具有PriorityQueue(优先级大的元素会放到队首)和DelayQueue(如果队列里第一个元素的getDelay返回值大于0,则take调用会阻塞)的功能
将传入的任务封装成ScheduledFutureTask,这个类有两个特点,实现了java.lang.Comparable和java.util.concurrent.Delayed接口,也就是说里面有两个重要的方法:compareTo和getDelay。ScheduledFutureTask里面存储了该任务距离下次调度还需要的时间(使用的是基于System#nanoTime实现的相对时间,不会因为系统时间改变而改变,如距离下次执行还有10秒,不会因为将系统时间调前6秒而变成4秒后执行)。getDelay方法就是返回当前时间(运行getDelay的这个时刻)距离下次调用之间的时间差;compareTo用于比较两个任务的优先关系,距离下次调度间隔较短的优先级高。那么,当有任务丢进上面说到的DelayedWorkQueue时,因为它有DelayQueue(DelayQueue的内部使用PriorityQueue来实现的)的功能,所以新的任务会与队列中已经存在的任务进行排序,距离下次调度间隔短的任务排在前面,也就是说这个队列并不是先进先出的;另外,在调用DelayedWorkQueue的take方法的时候,如果没有元素,会阻塞,如果有元素而第一个元素的getDelay返回值大于0(前面说过已经排好序了,第一个元素的getDelay不会大于后面元素的getDelay返回值),也会一直阻塞。
ScheduledFutureTask提供了一个run的实现,线程池执行的就是这个run方法。看看run的源码(本文的代码取自hotspot1.5.0_22,jdk后续版本的代码可能已经不一样了,如jdk1.7中使用了自己实现的DelayedWorkQueue,而不再使用PriorityQueue作为存储,不过从外面看它们的行为还是一样的,所以并不影响对ScheduledThreadPoolExecutor调度机制的理解):

如果不是周期性任务就直接执行任务(也就是else部分),这个主要是用于实现ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),后面会讲到它们的实现,这里先关注周期任务的执行方式。周期性任务执行的是runPeriodic(),看下它的实现:

这里可以看到,先执行了任务本身(ScheduledFutureTask.super.runAndReset),这个调用有一个返回值,来看下它的实现:

跟进去看下innerRunAndReset():

可以发现,这里需要关注的是第三个return,也就是如果任务执行出现了异常,会被catch且返回false.

继续看runPeriodic()方法,if里面,如果刚才任务执行的返回值是true且线程池还在运行就在if块中的操作,如果线程池被关闭了就做else if里的操作。也就是说,如果之前的任务执行出现的异常返回了false,那么if里以及else if里的代码都不会执行了,那有什么影响?接下来看看if里做了什么。

if里的代码很简单,分为两部分,一是计算这个任务下次调度的间隔,二是将任务重新放回队列中。回到出现异常的情况,如果刚才的任务执行出现了异常,就不会将任务再放回队列中,换而言之,也就是这个任务再也得不到调度了!但是,这并不影响其它周期任务的调度。

综上,可以看到,ScheduledThreadPoolExecutor执行周期性任务的模型就是:调度一次任务,计算并设置该任务下次间隔,将任务放回队列中供线程池执行。这里的队列起了很大的作用,且有一些特点:距离下次调度间隔短的任务总是在队首,队首的任务若距离下次调度的间隔时间大于0就无法从该队列的take()方法中拿到任务。

接下来看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)这两个非周期性任务的实现方式,先看看它们的源码:

实现方式也很简单,在创建ScheduledThreadPoolExecutor内部任务(即ScheduledFutureTask)的时候就将调度间隔计算并设置好,如果当前线程数小于设置的核心线程数,就启动一个线程(可能是线程池刚启动里面还没有线程,也可能是里面的线程执行任务时挂掉了。如果线程池中的线程挂掉了而又没有调用这些schedule方法谁去补充挂掉的线程?不用担心,线程池自己会处理的)去监听队列里的任务,然后将任务放到队列里,在任务执行间隔不大于0的时候,线程就可以拿到这个任务并执行。

周期性任务的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))与非周期性任务是类似的,它们处理方式不同的地方在于前文说到的ScheduledFutureTask#run()中。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: