Quartz源码——scheduler.start()启动源码分析(二)
2016-12-08 16:40
441 查看
scheduler.start()是Quartz的启动方式!下面进行分析,方便自己查看!
我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!
Quartz学习——QuartzSchedulerThread.run 源码分析:http://blog.csdn.net/u010648555/article/details/53525041
解释:
{0} : 表的前缀 ,如表qrtz_trigger ,{0}== qrtz_
{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
scheduler.start() 调用 .QuartzScheduler.start();
Quartz 的启动要调用start()方法进行线程的启动,并执行需要出发的Trigger,start方法里面进行的操作:
启动的初始化
判断是否集群,对应不同的操作
若是非集群,首先有恢复机制,恢复任何失败或misfire的作业,并根据需要清理数据存储。
初始化线程管理,唤醒所有等待的线程!
下面就是简单的源码分析:
1.QuartzScheduler.start();
1.1 this.resources.getJobStore().schedulerStarted();//主要分析的地方
1.2如果已经初始化过,则恢复调度器运行 resources.getJobStore().schedulerResumed();
2.1 恢复job recoverJobs();
2.1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);
2.1.2 恢复未完全执行的标记为恢复的作业
2.1.3 移除state == complete
2.1.4 清理任何已触发的触发器条目
2.2 获取ThreadExecutor 线程管理 misfireHandler.initialize();
3.1 唤醒所有等待的线程 schedThread.togglePause(false);
附1:true 是如何转换为 1 的:
2.Quartz学习——Quartz简单入门Demo(二)
3.Quartz学习——Spring和Quartz集成详解(三)
4.Quartz学习——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
5.Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
6.Quartz源码——scheduler.start()启动源码分析(二)
7.Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
8.Quartz源码——Quartz调度器的Misfire处理规则(四)
欢迎访问我的csdn博客,我们一同成长!
“不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!”
博客首页:http://blog.csdn.net/u010648555
我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!
Quartz学习——QuartzSchedulerThread.run 源码分析:http://blog.csdn.net/u010648555/article/details/53525041
解释:
{0} : 表的前缀 ,如表qrtz_trigger ,{0}== qrtz_
{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
scheduler.start() 调用 .QuartzScheduler.start();
Quartz 的启动要调用start()方法进行线程的启动,并执行需要出发的Trigger,start方法里面进行的操作:
启动的初始化
判断是否集群,对应不同的操作
若是非集群,首先有恢复机制,恢复任何失败或misfire的作业,并根据需要清理数据存储。
初始化线程管理,唤醒所有等待的线程!
下面就是简单的源码分析:
1.QuartzScheduler.start();
public void start() throws SchedulerException { if (shuttingDown|| closed) { throw new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null) {//初始化标识为null,进行初始化操作 initialStart = new Date(); this.resources.getJobStore().schedulerStarted();//1.1 主要分析的地方 startPlugins(); } else { resources.getJobStore().schedulerResumed();//1.2如果已经初始化过,则恢复jobStore } schedThread.togglePause(false);//3.1 唤醒所有等待的线程 getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted(); }
1.1 this.resources.getJobStore().schedulerStarted();//主要分析的地方
public void schedulerStarted() throws SchedulerException { //是集群 if (isClustered()) { clusterManagementThread = new ClusterManager(); if(initializersLoader != null) clusterManagementThread.setContextClassLoader(initializersLoader); clusterManagementThread.initialize(); } else {//不是集群 try { recoverJobs();//2.1 恢复job } catch (SchedulerException se) { throw new SchedulerConfigException( "Failure occured during job recovery.", se); } } misfireHandler = new MisfireHandler(); if(initializersLoader != null) misfireHandler.setContextClassLoader(initializersLoader); misfireHandler.initialize();//2.2 获取ThreadExecutor 线程管理 schedulerRunning = true; getLog().debug("JobStore background threads started (as scheduler was started)."); }
1.2如果已经初始化过,则恢复调度器运行 resources.getJobStore().schedulerResumed();
private volatile boolean schedulerRunning = false;//默认schedulerRunning = false public void schedulerResumed() { schedulerRunning = true; }
2.1 恢复job recoverJobs();
//启动的时候 有一个恢复机制: //recoverJobs ----- 将恢复任何失败或misfire的作业,并根据需要清理数据存储。 protected void recoverJobs() throws JobPersistenceException { executeInNonManagedTXLock( LOCK_TRIGGER_ACCESS, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { recoverJobs(conn);//恢复job } }, null); } protected void recoverJobs(Connection conn) throws JobPersistenceException { try { (1)//更新不一致的作业状态 先修改状态,将 ACQUIRED 和 BLOCKED ---> WAITING int rows = getDelegate().updateTriggerStatesFromOtherStates(conn, STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED); rows += getDelegate().updateTriggerStatesFromOtherStates(conn, STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED); //----更新sql--- //"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)" getLog().info( "Freed " + rows + " triggers from 'acquired' / 'blocked' state."); // clean up misfired jobs //2.1.1 清理misfire的jobs recoverMisfiredJobs(conn, true); // recover jobs marked for recovery that were not fully executed //2.1.2 恢复未完全执行的标记为恢复的作业 --查询 qrtz_fire_trigger List<OperableTrigger> recoveringJobTriggers = getDelegate() .selectTriggersForRecoveringJobs(conn); getLog() .info( "Recovering " + recoveringJobTriggers.size() + " jobs that were in-progress at the time of the last shut-down."); for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) { if (jobExists(conn, recoveringJobTrigger.getJobKey())) { recoveringJobTrigger.computeFirstFireTime(null); storeTrigger(conn, recoveringJobTrigger, null, false, STATE_WAITING, false, true); } } getLog().info("Recovery complete."); // remove lingering 'complete' triggers... //2.1.3 移除state == complete List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); for(TriggerKey ct: cts) { removeTrigger(conn, ct); } getLog().info( "Removed " + cts.size() + " 'complete' triggers."); // clean up any fired trigger entries //2.1.4 清理任何已触发的触发器条目 int n = getDelegate().deleteFiredTriggers(conn); getLog().info("Removed " + n + " stale fired job entries."); } catch (JobPersistenceException e) { throw e; } catch (Exception e) { throw new JobPersistenceException("Couldn't recover jobs: " + e.getMessage(), e); } }
2.1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);
//是否有misfire的Trigger //我们必须仍然寻找MISFIRED状态,以防触发器被遗忘 //在此状态下升级到此版本不支持 (a1)hasMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime(), maxMisfiresToHandleAtATime, misfiredTriggers); ////getMisfireTime() 当前时间 -(减去) 一分钟 ,maxMisfiresToHandleAtATime == -1 ,misfiredTriggers== null "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC" 上面sql查询出来结果是个list (aa1)若resultList.size() == count 返回 TRUE!! 否则 返回false! (aa2)不等于 count ,封装数据,到resultList中,triggername TriggerGroup //查询出来有misfire 的 Trigger (b2) misfiredTriggers.size() > 0 (bb1)输出日志信息 :getLog().info( "Handling " + misfiredTriggers.size() + " trigger(s) that missed their scheduled fire-time."); (bb2)循环 misfiredTriggers List集合 for (TriggerKey triggerKey: misfiredTriggers) { //retrieveTrigger ,检索Trigger,检索到进行数据封装 OperableTrigger trig = retrieveTrigger(conn, triggerKey); //retrieveTrigger 执行的操作 (1)"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" (2)关联Trigger对应的类型,如simpleTrigger等 if (trig == null) { continue; } //do 更新misfire的触发器 doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering); //recovering===TRUE (1)cal = retrieveCalendar(conn, trig.getCalendarName()); 搞这个表,qrtz_calendar (2)trig.updateAfterMisfire(cal); //simpleTrigger默认的misfire 机制 setNextFireTime(new Date()); //设置下次执行的时间(next_fire_time)为当前时间!这里比较重要!!! (3) getNextFireTime != null if (trig.getNextFireTime() == null) { storeTrigger(conn, trig, null, true, STATE_COMPLETE, forceState, recovering); schedSignaler.notifySchedulerListenersFinalized(trig); } else { storeTrigger(conn, trig, null, true, newStateIfNotComplete, forceState, false); // job == null replaceExisting ==true state==waitting forceState==false recovering==false storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) //Insert or update a trigger. boolean existingTrigger = triggerExists(conn, newTrigger.getKey()); if (existingTrigger) { //state == waitting getDelegate().updateTrigger(conn, newTrigger, state, job); //更新sql /* "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"*/ } else { getDelegate().insertTrigger(conn, newTrigger, state, job); //插入sql /*"INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"*/ } } (c3) long earliestNewTime = Long.MAX_VALUE; // long earliestNewTime = Long.MAX_VALUE; === 9223372036854775807 if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime){ earliestNewTime = trig.getNextFireTime().getTime(); }
2.1.2 恢复未完全执行的标记为恢复的作业
List<OperableTrigger> recoveringJobTriggers = getDelegate() .selectTriggersForRecoveringJobs(conn); // INSTANCE_NAME == dufy_test REQUESTS_RECOVERY == true 实际封装到数据库查询是 REQUESTS_RECOVERY== 1 "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?" //具体怎么是 true是怎么转换成为 1的见附1图片! Recovery complete.恢复完成!!
2.1.3 移除state == complete
List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE); ----------------------------------------------------------------------------- "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?" ----------------------------------------------------------------------------- for(TriggerKey ct: cts) { removeTrigger(conn, ct); --------------------------------------------------------------------- (a)删除前,先查询jobDetail JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false); "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP" (b)删除触发器,其侦听器及其Simple / Cron / BLOB子表条目。 boolean removedTrigger = deleteTriggerAndChildren(conn, key); deleteTrigger(Connection conn, TriggerKey triggerKey) (b1)deleteTriggerExtension "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" (b2)"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" (c)是否删除jobdetail ,判断 isDurable 默认 为false。 if (null != job && !job.isDurable()) { int numTriggers = getDelegate().selectNumTriggersForJob(conn, job.getKey()); --------------------------------------------------------- "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" --------------------------------------------------------- if (numTriggers == 0) { // Don't call removeJob() because we don't want to check for // triggers again. //不要调用removeJob(),因为我们不想再次检查触发器。 deleteJobAndChildren(conn, job.getKey()); //删除作业及其侦听器。 ----------------------------------------------------- //deleteJobDetail(Connection conn, JobKey jobKey) 删除给定作业的作业明细记录。 "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" ----------------------------------------------------- } } }
2.1.4 清理任何已触发的触发器条目
int n = getDelegate().deleteFiredTriggers(conn); ---------------------------------------------------------------------------- "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}" ----------------------------------------------------------------------------
2.2 获取ThreadExecutor 线程管理 misfireHandler.initialize();
public void initialize() { ThreadExecutor executor = getThreadExecutor(); //getThreadExecutor == private ThreadExecutor threadExecutor = new DefaultThreadExecutor(); executor.execute(MisfireHandler.this); //启动线程执行 对应job的 execute方法 //MisfireHandler == class MisfireHandler extends Thread 继承了Thread }
3.1 唤醒所有等待的线程 schedThread.togglePause(false);
schedThread.togglePause(false); //指示主处理循环在下一个可能的点暂停。 void togglePause(boolean pause) { synchronized (sigLock) { paused = pause; if (paused) { signalSchedulingChange(0); ------------------------------------------ //发信号通知主要处理循环,已经进行了调度的改变 - 以便中断在等待misfire时间到达时可能发生的任何睡眠。 public void signalSchedulingChange(long candidateNewNextFireTime) { synchronized(sigLock) { signaled = true; signaledNextFireTime = candidateNewNextFireTime; sigLock.notifyAll(); // private final Object sigLock = new Object(); } } ------------------------------------------ } else { sigLock.notifyAll();//唤醒所有等待的线程 } } }
附1:true 是如何转换为 1 的:
Quartz专栏系列
1.Quartz学习——Quartz大致介绍(一)2.Quartz学习——Quartz简单入门Demo(二)
3.Quartz学习——Spring和Quartz集成详解(三)
4.Quartz学习——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
5.Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
6.Quartz源码——scheduler.start()启动源码分析(二)
7.Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
8.Quartz源码——Quartz调度器的Misfire处理规则(四)
欢迎访问我的csdn博客,我们一同成长!
“不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!”
博客首页:http://blog.csdn.net/u010648555
相关文章推荐
- Quartz源码——scheduler.start()启动源码分析(二)
- ucos 之系统启动 OSStart() 源码分析 3
- Quartz源码分析之Scheduler
- Android源码解析之应用程序内部启动Activity过程(startActivity)的源代码分析
- Android笔记-service启动过程分析:bindService源码分析、startService和bindService区别
- 开源中国 OsChina Android 客户端源码分析(1)启动界面 app_start
- quartz2.x源码分析——启动过程
- Android源码解析之新进程中启动自定义服务过程(startService)的原理分析
- linux 3.6 启动源码分析(二) start_kernel
- Android服务启动之StartService源码分析
- Activity启动流程源码分析之startActivity启动(三)
- Android 8.0系统源码分析--startService启动过程源码分析
- linux 3.6 启动源码分析(二) start_kernel
- quartz2.2源码分析2-scheduler初始化及使用
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task
- glibc源码分析之进程启动(start.S)
- Android笔记-service启动过程分析:startService源码分析
- Service启动流程源码分析(一):startService
- Quartz源码——QuartzSchedulerThread.run() 源码分析(三)
- Mesos源码分析(10): MesosSchedulerDriver的启动及运行一个Task