Quartzs的job,trriger监听器源码分析
2016-09-08 10:21
274 查看
在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。
Java代码
public class QuartzScheduler
implements RemotableQuartzScheduler
{
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException
{
validateState();
if(jobDetail == null)
throw new SchedulerException("JobDetail cannot be null");
if(trigger == null)
throw new SchedulerException("Trigger cannot be null");
if(jobDetail.getKey() == null)
throw new SchedulerException("Job's key cannot be null");
if(jobDetail.getJobClass() == null)
throw new SchedulerException("Job's class cannot be null");
//包装触发器
OperableTrigger trig = (OperableTrigger)trigger;
if(trigger.getJobKey() == null)
//设置触发器jobKey
trig.setJobKey(jobDetail.getKey());
else
if(!trigger.getJobKey().equals(jobDetail.getKey()))
throw new SchedulerException("Trigger does not reference given job!");
trig.validate();
Calendar cal = null;
if(trigger.getCalendarName() != null)
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
Date ft = trig.computeFirstFireTime(cal);
if(ft == null)
{
throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
} else
{
//存储job and trriger到jobStrore
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
//添加jobDetail到调度监听器
notifySchedulerListenersJobAdded(jobDetail);
//通知调度器下一刻调度时间
notifySchedulerThread(trigger.getNextFireTime().getTime());
//添加trigger到调度监听器
notifySchedulerListenersSchduled(trigger);
return ft;
}
//添加jobDetail
public void notifySchedulerListenersJobAdded(JobDetail jobDetail)
{
List schedListeners = buildSchedulerListenerList();
for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
{
SchedulerListener sl = (SchedulerListener)i$.next();
try
{
sl.jobAdded(jobDetail);
}
catch(Exception e)
{
getLog().error("Error while notifying SchedulerListener of JobAdded.", e);
}
}
}
//获取调度监听器
private List buildSchedulerListenerList()
{
List allListeners = new LinkedList();
allListeners.addAll(getListenerManager().getSchedulerListeners());
allListeners.addAll(getInternalSchedulerListeners());
return allListeners;
}
//获取内部调度器
public List getInternalSchedulerListeners()
{
ArrayList arraylist = internalSchedulerListeners;
JVM INSTR monitorenter ;
return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners));
Exception exception;
exception;
throw exception;
}
//通知调度器下一刻调度时间
protected void notifySchedulerThread(long candidateNewNextFireTime)
{
if(isSignalOnSchedulingChange())
signaler.signalSchedulingChange(candidateNewNextFireTime);
}
//添加trigger到调度监听器
public void notifySchedulerListenersSchduled(Trigger trigger)
{
List schedListeners = buildSchedulerListenerList();
for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
{
SchedulerListener sl = (SchedulerListener)i$.next();
try
{
sl.jobScheduled(trigger);
}
catch(Exception e)
{
getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job. Triger=").append(trigger.getKey()).toString(), e);
}
}
}
private static String VERSION_MAJOR;
private static String VERSION_MINOR;
private static String VERSION_ITERATION;
private QuartzSchedulerResources resources;
private QuartzSchedulerThread schedThread;
private ThreadGroup threadGroup;
private SchedulerContext context;
private ListenerManager listenerManager;//监听管理器
private HashMap internalJobListeners;
private HashMap internalTriggerListeners;
private ArrayList internalSchedulerListeners;
private JobFactory jobFactory;
ExecutingJobsManager jobMgr;
ErrorLogger errLogger;
private SchedulerSignaler signaler;
private Random random;
private ArrayList holdToPreventGC;
private boolean signalOnSchedulingChange;
private volatile boolean closed;
private volatile boolean shuttingDown;
private boolean boundRemotely;
private QuartzSchedulerMBean jmxBean;
private Date initialStart;
private final Timer updateTimer;
}
//调度监听器
下载
Java代码
public class ListenerManagerImpl
implements ListenerManager
{
public ListenerManagerImpl()
{
globalJobListeners = new LinkedHashMap(10);
globalTriggerListeners = new LinkedHashMap(10);
globalJobListenersMatchers = new LinkedHashMap(10);
globalTriggerListenersMatchers = new LinkedHashMap(10);
schedulerListeners = new ArrayList(10);
}
//添加调度监听器
public void addSchedulerListener(SchedulerListener schedulerListener)
{
synchronized(schedulerListeners)
{
schedulerListeners.add(schedulerListener);
}
}
//添加job监听器
public void addJobListener(JobListener jobListener, List matchers)
{
if(jobListener.getName() == null || jobListener.getName().length() == 0)
throw new IllegalArgumentException("JobListener name cannot be empty.");
synchronized(globalJobListeners)
{
globalJobListeners.put(jobListener.getName(), jobListener);
LinkedList matchersL = new LinkedList();
if(matchers != null && matchers.size() > 0)
matchersL.addAll(matchers);
else
matchersL.add(EverythingMatcher.allJobs());
globalJobListenersMatchers.put(jobListener.getName(), matchersL);
}
}
//添加Trigger监听器
public void addTriggerListener(TriggerListener triggerListener, List matchers)
{
if(triggerListener.getName() == null || triggerListener.getName().length() == 0)
throw new IllegalArgumentException("TriggerListener name cannot be empty.");
synchronized(globalTriggerListeners)
{
globalTriggerListeners.put(triggerListener.getName(), triggerListener);
LinkedList matchersL = new LinkedList();
if(matchers != null && matchers.size() > 0)
matchersL.addAll(matchers);
else
matchersL.add(EverythingMatcher.allTriggers());
globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL);
}
}
public List getSchedulerListeners()
{
ArrayList arraylist = schedulerListeners;
JVM INSTR monitorenter ;
return Collections.unmodifiableList(new ArrayList(schedulerListeners));
Exception exception;
exception;
throw exception;
}
private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器
private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器
private Map globalJobListenersMatchers;
private Map globalTriggerListenersMatchers;
private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器
}
//QuartzSchedulerMBean 下载
Java代码
public class QuartzSchedulerMBeanImpl extends StandardMBean
implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener
{
//添加jobDetail通知到JMX
public void jobAdded(JobDetail jobDetail)
{
sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail));
}
//添加trigger通知到JMX
public void jobScheduled(Trigger trigger)
{
sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger));
}
public void sendNotification(String eventType, Object data)
{
sendNotification(eventType, data, null);
}
public void sendNotification(String eventType, Object data, String msg)
{
Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg);
if(data != null)
notif.setUserData(data);
emitter.sendNotification(notif);
}
}
//调度通知器 下载
Java代码
public class SchedulerSignalerImpl
implements SchedulerSignaler
{
public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread)
{
log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl);
this.sched = sched;
this.schedThread = schedThread;
log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString());
}
public void signalSchedulingChange(long candidateNewNextFireTime)
{
schedThread.signalSchedulingChange(candidateNewNextFireTime);
}
Logger log;
protected QuartzScheduler sched;
protected QuartzSchedulerThread schedThread;
}
//调度器线程 下载
Java代码
public class QuartzSchedulerThread extends Thread
{
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)
{
this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);
}
//唤醒所有等待sigLock的调度器
public void signalSchedulingChange(long candidateNewNextFireTime)
{
synchronized(sigLock)
{
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
private QuartzScheduler qs;
private QuartzSchedulerResources qsRsrcs;
private final Object sigLock;
private boolean signaled;
private long signaledNextFireTime;
private boolean paused;
private AtomicBoolean halted;
private Random random;
private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
private long idleWaitTime;
private int idleWaitVariablness;
private final Logger log;
}
//SchedulerListener
下载
Java代码
public interface SchedulerListener
{
public abstract void jobScheduled(Trigger trigger);
public abstract void jobUnscheduled(TriggerKey triggerkey);
public abstract void triggerFinalized(Trigger trigger);
public abstract void triggerPaused(TriggerKey triggerkey);
public abstract void triggersPaused(String s);
public abstract void triggerResumed(TriggerKey triggerkey);
public abstract void triggersResumed(String s);
public abstract void jobAdded(JobDetail jobdetail);
public abstract void jobDeleted(JobKey jobkey);
public abstract void jobPaused(JobKey jobkey);
public abstract void jobsPaused(String s);
public abstract void jobResumed(JobKey jobkey);
public abstract void jobsResumed(String s);
public abstract void schedulerError(String s, SchedulerException schedulerexception);
public abstract void schedulerInStandbyMode();
public abstract void schedulerStarted();
public abstract void schedulerStarting();
public abstract void schedulerShutdown();
public abstract void schedulerShuttingdown();
public abstract void schedulingDataCleared();
}
//job监听器 下载
Java代码
public interface JobListener
{
public abstract String getName();
public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext);
public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext);
public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception);
}
//Trigger监听器
Java代码
public interface TriggerListener
{
public abstract String getName();
public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext);
public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext);
public abstract void triggerMisfired(Trigger trigger);
public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction);
}
Java代码
public class QuartzScheduler
implements RemotableQuartzScheduler
{
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException
{
validateState();
if(jobDetail == null)
throw new SchedulerException("JobDetail cannot be null");
if(trigger == null)
throw new SchedulerException("Trigger cannot be null");
if(jobDetail.getKey() == null)
throw new SchedulerException("Job's key cannot be null");
if(jobDetail.getJobClass() == null)
throw new SchedulerException("Job's class cannot be null");
//包装触发器
OperableTrigger trig = (OperableTrigger)trigger;
if(trigger.getJobKey() == null)
//设置触发器jobKey
trig.setJobKey(jobDetail.getKey());
else
if(!trigger.getJobKey().equals(jobDetail.getKey()))
throw new SchedulerException("Trigger does not reference given job!");
trig.validate();
Calendar cal = null;
if(trigger.getCalendarName() != null)
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
Date ft = trig.computeFirstFireTime(cal);
if(ft == null)
{
throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
} else
{
//存储job and trriger到jobStrore
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
//添加jobDetail到调度监听器
notifySchedulerListenersJobAdded(jobDetail);
//通知调度器下一刻调度时间
notifySchedulerThread(trigger.getNextFireTime().getTime());
//添加trigger到调度监听器
notifySchedulerListenersSchduled(trigger);
return ft;
}
//添加jobDetail
public void notifySchedulerListenersJobAdded(JobDetail jobDetail)
{
List schedListeners = buildSchedulerListenerList();
for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
{
SchedulerListener sl = (SchedulerListener)i$.next();
try
{
sl.jobAdded(jobDetail);
}
catch(Exception e)
{
getLog().error("Error while notifying SchedulerListener of JobAdded.", e);
}
}
}
//获取调度监听器
private List buildSchedulerListenerList()
{
List allListeners = new LinkedList();
allListeners.addAll(getListenerManager().getSchedulerListeners());
allListeners.addAll(getInternalSchedulerListeners());
return allListeners;
}
//获取内部调度器
public List getInternalSchedulerListeners()
{
ArrayList arraylist = internalSchedulerListeners;
JVM INSTR monitorenter ;
return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners));
Exception exception;
exception;
throw exception;
}
//通知调度器下一刻调度时间
protected void notifySchedulerThread(long candidateNewNextFireTime)
{
if(isSignalOnSchedulingChange())
signaler.signalSchedulingChange(candidateNewNextFireTime);
}
//添加trigger到调度监听器
public void notifySchedulerListenersSchduled(Trigger trigger)
{
List schedListeners = buildSchedulerListenerList();
for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
{
SchedulerListener sl = (SchedulerListener)i$.next();
try
{
sl.jobScheduled(trigger);
}
catch(Exception e)
{
getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job. Triger=").append(trigger.getKey()).toString(), e);
}
}
}
private static String VERSION_MAJOR;
private static String VERSION_MINOR;
private static String VERSION_ITERATION;
private QuartzSchedulerResources resources;
private QuartzSchedulerThread schedThread;
private ThreadGroup threadGroup;
private SchedulerContext context;
private ListenerManager listenerManager;//监听管理器
private HashMap internalJobListeners;
private HashMap internalTriggerListeners;
private ArrayList internalSchedulerListeners;
private JobFactory jobFactory;
ExecutingJobsManager jobMgr;
ErrorLogger errLogger;
private SchedulerSignaler signaler;
private Random random;
private ArrayList holdToPreventGC;
private boolean signalOnSchedulingChange;
private volatile boolean closed;
private volatile boolean shuttingDown;
private boolean boundRemotely;
private QuartzSchedulerMBean jmxBean;
private Date initialStart;
private final Timer updateTimer;
}
//调度监听器
下载
Java代码
public class ListenerManagerImpl
implements ListenerManager
{
public ListenerManagerImpl()
{
globalJobListeners = new LinkedHashMap(10);
globalTriggerListeners = new LinkedHashMap(10);
globalJobListenersMatchers = new LinkedHashMap(10);
globalTriggerListenersMatchers = new LinkedHashMap(10);
schedulerListeners = new ArrayList(10);
}
//添加调度监听器
public void addSchedulerListener(SchedulerListener schedulerListener)
{
synchronized(schedulerListeners)
{
schedulerListeners.add(schedulerListener);
}
}
//添加job监听器
public void addJobListener(JobListener jobListener, List matchers)
{
if(jobListener.getName() == null || jobListener.getName().length() == 0)
throw new IllegalArgumentException("JobListener name cannot be empty.");
synchronized(globalJobListeners)
{
globalJobListeners.put(jobListener.getName(), jobListener);
LinkedList matchersL = new LinkedList();
if(matchers != null && matchers.size() > 0)
matchersL.addAll(matchers);
else
matchersL.add(EverythingMatcher.allJobs());
globalJobListenersMatchers.put(jobListener.getName(), matchersL);
}
}
//添加Trigger监听器
public void addTriggerListener(TriggerListener triggerListener, List matchers)
{
if(triggerListener.getName() == null || triggerListener.getName().length() == 0)
throw new IllegalArgumentException("TriggerListener name cannot be empty.");
synchronized(globalTriggerListeners)
{
globalTriggerListeners.put(triggerListener.getName(), triggerListener);
LinkedList matchersL = new LinkedList();
if(matchers != null && matchers.size() > 0)
matchersL.addAll(matchers);
else
matchersL.add(EverythingMatcher.allTriggers());
globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL);
}
}
public List getSchedulerListeners()
{
ArrayList arraylist = schedulerListeners;
JVM INSTR monitorenter ;
return Collections.unmodifiableList(new ArrayList(schedulerListeners));
Exception exception;
exception;
throw exception;
}
private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器
private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器
private Map globalJobListenersMatchers;
private Map globalTriggerListenersMatchers;
private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器
}
//QuartzSchedulerMBean 下载
Java代码
public class QuartzSchedulerMBeanImpl extends StandardMBean
implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener
{
//添加jobDetail通知到JMX
public void jobAdded(JobDetail jobDetail)
{
sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail));
}
//添加trigger通知到JMX
public void jobScheduled(Trigger trigger)
{
sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger));
}
public void sendNotification(String eventType, Object data)
{
sendNotification(eventType, data, null);
}
public void sendNotification(String eventType, Object data, String msg)
{
Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg);
if(data != null)
notif.setUserData(data);
emitter.sendNotification(notif);
}
}
//调度通知器 下载
Java代码
public class SchedulerSignalerImpl
implements SchedulerSignaler
{
public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread)
{
log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl);
this.sched = sched;
this.schedThread = schedThread;
log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString());
}
public void signalSchedulingChange(long candidateNewNextFireTime)
{
schedThread.signalSchedulingChange(candidateNewNextFireTime);
}
Logger log;
protected QuartzScheduler sched;
protected QuartzSchedulerThread schedThread;
}
//调度器线程 下载
Java代码
public class QuartzSchedulerThread extends Thread
{
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)
{
this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);
}
//唤醒所有等待sigLock的调度器
public void signalSchedulingChange(long candidateNewNextFireTime)
{
synchronized(sigLock)
{
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
private QuartzScheduler qs;
private QuartzSchedulerResources qsRsrcs;
private final Object sigLock;
private boolean signaled;
private long signaledNextFireTime;
private boolean paused;
private AtomicBoolean halted;
private Random random;
private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
private long idleWaitTime;
private int idleWaitVariablness;
private final Logger log;
}
//SchedulerListener
下载
Java代码
public interface SchedulerListener
{
public abstract void jobScheduled(Trigger trigger);
public abstract void jobUnscheduled(TriggerKey triggerkey);
public abstract void triggerFinalized(Trigger trigger);
public abstract void triggerPaused(TriggerKey triggerkey);
public abstract void triggersPaused(String s);
public abstract void triggerResumed(TriggerKey triggerkey);
public abstract void triggersResumed(String s);
public abstract void jobAdded(JobDetail jobdetail);
public abstract void jobDeleted(JobKey jobkey);
public abstract void jobPaused(JobKey jobkey);
public abstract void jobsPaused(String s);
public abstract void jobResumed(JobKey jobkey);
public abstract void jobsResumed(String s);
public abstract void schedulerError(String s, SchedulerException schedulerexception);
public abstract void schedulerInStandbyMode();
public abstract void schedulerStarted();
public abstract void schedulerStarting();
public abstract void schedulerShutdown();
public abstract void schedulerShuttingdown();
public abstract void schedulingDataCleared();
}
//job监听器 下载
Java代码
public interface JobListener
{
public abstract String getName();
public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext);
public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext);
public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception);
}
//Trigger监听器
Java代码
public interface TriggerListener
{
public abstract String getName();
public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext);
public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext);
public abstract void triggerMisfired(Trigger trigger);
public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction);
}
相关文章推荐
- Quartz源码分析之Job
- quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系
- Quartzs的 job,trigger 存储、job,trigger 删除源码分析
- JobTracker启动流程源码级分析
- JobTracker之辅助线程和对象映射模型分析(源码分析第五篇)
- Mahout基于项目的协同过滤算法源码分析(1)--PreparePreferenceMatrixJob
- mahout算法源码分析之Itembased Collaborative Filtering(二)RowSimilarityJob
- Mahout 协同过滤 itemBase RecommenderJob源码分析
- Hadoop源码分析1: 客户端提交JOB
- hadoop中job提交的源码分析
- Hadoop源码分析25 JobInProgress 主要容器
- MapReduce执行过程源码分析(一)——Job任务的提交
- [源码分析]Community Server的Job & Timer
- MapReduce job在JobTracker初始化源码级分析
- Quartz recovery 及misfired机制的源码分析
- hadoop之MapReduce框架JobTracker端心跳机制分析(源码分析第七篇) 推荐
- Jenkins插件开发(6.1)—— 分析JenkinsJOB的CRUD源码
- Mahout 协同过滤 itemBase RecommenderJob源码分析
- mahout算法源码分析之Itembased Collaborative Filtering(一)PreparePreferenceMatrixJob
- Mahout 协同过滤 itemBase RecommenderJob源码分析