Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
2017-04-26 15:45
357 查看
我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!
{0} :表的前缀 ,如表qrtz_trigger ,{0}== qrtz_
{1}:quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
要使用定时器,并讲任务持久到数据库,我们一定明白JobDetail和Trigger是如何操作进入数据库,如何注册到Scheduler中!前面我分析了Scheduler的start()和QuartzSchedulerThread的run()!这些都是要在下面讲的这个流程的基础上!
那么下面就开始分析源码,分析源码前首先加一个小的demo,方面解释!
真正的源码分析开始了!!!!
主要分析下面这段,从这里进去,看看源码里面到底是怎么进行处理的!
scheduler.scheduleJob(jb, t);//源码分析
ched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger
//存储给定的org.quartz.JobDetail和org.quartz.Trigger。
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
//(1)保存JobDetail
storeJob(conn, newJob, false);
//(2)保存Trigger
storeTrigger(conn, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
到这里,保存就结束了!
欢迎 一起讨论,学习!互相成长!
附学习博文地址:quartz2.2源码分析4-JobStore
欢迎访问我的csdn博客,我们一同成长!
"不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!"
博客首页:http://blog.csdn.net/u010648555
{0} :表的前缀 ,如表qrtz_trigger ,{0}== qrtz_
{1}:quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName
要使用定时器,并讲任务持久到数据库,我们一定明白JobDetail和Trigger是如何操作进入数据库,如何注册到Scheduler中!前面我分析了Scheduler的start()和QuartzSchedulerThread的run()!这些都是要在下面讲的这个流程的基础上!
那么下面就开始分析源码,分析源码前首先加一个小的demo,方面解释!
//1.创建Scheduler的工厂 SchedulerFactory sf = new StdSchedulerFactory(); //2.从工厂中获取调度器实例 Scheduler scheduler = sf.getScheduler(); //3.创建JobDetail JobDetail jb = JobBuilder.newJob(RAMJob.class) .withDescription("this is a ram job") //job的描述 .withIdentity("ramJob", "ramGroup") //job 的name和group .build(); //任务运行的时间,SimpleSchedle类型触发器有效 long time= System.currentTimeMillis() + 3*1000L; //3秒后启动任务 Date statTime = new Date(time); //4.创建Trigger //使用SimpleScheduleBuilder或者CronScheduleBuilder Trigger t = TriggerBuilder.newTrigger() .withDescription("") .withIdentity("ramTrigger", "ramTriggerGroup") //.withSchedule(SimpleScheduleBuilder.simpleSchedule()) .startAt(statTime) //默认当前时间启动 .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次 .build(); //5.注册任务和定时器 scheduler.scheduleJob(jb, t);//源码分析 //6.启动 调度器 scheduler.start();
真正的源码分析开始了!!!!
主要分析下面这段,从这里进去,看看源码里面到底是怎么进行处理的!
scheduler.scheduleJob(jb, t);//源码分析
/** * <p> * Calls the equivalent method on the 'proxied' <code>QuartzScheduler</code>. * 调用“代理”<QuartzScheduler>上的等效方法。 * * 实现 在 StdScheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) * </p> */ public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { //这里的sched 是 QuartzScheduler 对象,Quartz和核心类,Quartz调度器 return sched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger }
ched.scheduleJob(jobDetail, trigger);//保存JobDetail和Trigger
/** * <p> * Add the <code>{@link org.quartz.Job}</code> identified by the given * <code>{@link org.quartz.JobDetail}</code> to the Scheduler, and * associate the given <code>{@link org.quartz.Trigger}</code> with it. * </p> * * <p> * If the given Trigger does not reference any <code>Job</code>, then it * will be set to reference the Job passed with it into this method. * </p> * * @throws SchedulerException * if the Job or Trigger cannot be added to the Scheduler, or * there is an internal Scheduler error. * 将给定org.quartz.JobDetail标识的org.quartz.Job添加到Scheduler, * 并将给定的org.quartz.Trigger与其关联。 * 如果给定的触发器不引用任何作业,则它将被设置为引用与其一起传递的作业到此方法中。 * * 实现在 QuartzScheduler.scheduleJob(JobDetail jobDetail, * Trigger trigger) */ public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState();//验证调度器是否关闭,关闭抛出异常 //检查 jobDetail和trigger if (jobDetail == null) { throw new SchedulerException("JobDetail cannot be null"); } if (trigger == null) { throw new SchedulerException("Trigger cannot be null"); } if (jobDetail.getKey() == null) { throw new SchedulerException("Job's key cannot be null"); } if (jobDetail.getJobClass() == null) { throw new SchedulerException("Job's class cannot be null"); } OperableTrigger trig = (OperableTrigger)trigger; //getJobKey 获取 getJobName(), getJobGroup() if (trigger.getJobKey() == null) { trig.setJobKey(jobDetail.getKey()); } else if (!trigger.getJobKey().equals(jobDetail.getKey())) { throw new SchedulerException( "Trigger does not reference given job!"); } //验证trigger trig.validate(); Calendar cal = null; if (trigger.getCalendarName() != null) { cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());//检索Calendar "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?" } //在触发器首次添加到调度程序时由调度程序调用,以便让触发器基于任何关联的日历计算 //其第一次触发时间。调用此方法后,getNextFireTime()应返回有效的答案。 Date ft = trig.computeFirstFireTime(cal); if (ft == null) { throw new SchedulerException( "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire."); } //存储给定的org.quartz.JobDetail和org.quartz.Trigger。 resources.getJobStore().storeJobAndTrigger(jobDetail, trig); notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger); return ft; }
//存储给定的org.quartz.JobDetail和org.quartz.Trigger。
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
public void storeJobAndTrigger(final JobDetail newJob, final OperableTrigger newTrigger) throws JobPersistenceException { executeInLock( (isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { //(1)保存JobDetail storeJob(conn, newJob, false); //(2)保存Trigger storeTrigger(conn, newTrigger, newJob, false, Constants.STATE_WAITING, false, false); } }); }
//(1)保存JobDetail
storeJob(conn, newJob, false);
protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting) throws JobPersistenceException { //判断JobDetail是否已经存在,根据name和group boolean existingJob = jobExists(conn, newJob.getKey()); try { if (existingJob) { if (!replaceExisting) { throw new ObjectAlreadyExistsException(newJob); } //更新JobDetail getDelegate().updateJobDetail(conn, newJob); "UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" } else { //插入JobDetail getDelegate().insertJobDetail(conn, newJob); "INSERT INTO {0}JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, DESCRIPTION, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)" } } catch (IOException e) { throw new JobPersistenceException("Couldn't store job: " + e.getMessage(), e); } catch (SQLException e) { throw new JobPersistenceException("Couldn't store job: " + e.getMessage(), e); } }
//(2)保存Trigger
storeTrigger(conn, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) throws JobPersistenceException { //判断Trigger是否已经存在,根据name和group boolean existingTrigger = triggerExists(conn, newTrigger.getKey()); if ((existingTrigger) && (!replaceExisting)) { throw new ObjectAlreadyExistsException(newTrigger); } try { boolean shouldBepaused; //进行一些状态的判断 if (!forceState) { shouldBepaused = getDelegate().isTriggerGroupPaused( conn, newTrigger.getKey().getGroup()); if(!shouldBepaused) { shouldBepaused = getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED); if (shouldBepaused) { getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup()); } } if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) { state = STATE_PAUSED; } } //若job为null,重新获取! if(job == null) { job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper()); "SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" } if (job == null) { throw new JobPersistenceException("The job (" + newTrigger.getJobKey() + ") referenced by the trigger does not exist."); } //判断是否有DisallowConcurrentExecution注解,recovering恢复 if (job.isConcurrentExectionDisallowed() && !recovering) { state = checkBlockedState(conn, job.getKey(), state); "if (jobName != null) { ps = conn.prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB)); "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?" ps.setString(1, jobName); ps.setString(2, groupName); } else { ps = conn .prepareStatement(rtp(SELECT_FIRED_TRIGGERS_OF_JOB_GROUP)); "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_GROUP = ?" ps.setString(1, groupName); }" } if (existingTrigger) { //更新trigger getDelegate().updateTrigger(conn, newTrigger, state, job); " 大概贴出一些,具体想看完整的进入源码看 if(updateJobData) { ps = conn.prepareStatement(rtp(UPDATE_TRIGGER)); "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" } else { ps = conn.prepareStatement(rtp(UPDATE_TRIGGER_SKIP_DATA)); "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?" } " } else { //插入trigger getDelegate().insertTrigger(conn, newTrigger, state, job); "INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" } } catch (Exception e) { throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" + newTrigger.getJobKey() + "' job:" + e.getMessage(), e); } }
到这里,保存就结束了!
欢迎 一起讨论,学习!互相成长!
附学习博文地址:quartz2.2源码分析4-JobStore
欢迎访问我的csdn博客,我们一同成长!
"不管做什么,只要坚持下去就会看到不一样!在路上,不卑不亢!"
博客首页:http://blog.csdn.net/u010648555
相关文章推荐
- Quartz源码——JobStore保存JonDetail和Trigger源码分析(一)
- Quartz源码——JobStore保存JonDetail和Trigger源码分析
- Quartzs的 job,trigger 存储、job,trigger 删除源码分析
- Quartz源码分析之Trigger
- quartz源码分析之深刻理解job,sheduler,calendar,trigger及listener之间的关系
- quartz2.2源码分析4-JobStore
- Quartz源码分析(一)------ 以线程等待的方式实现按时间调度
- 第二人生的源码分析(八十四)保存UI的界面布局
- [置顶] 自己动手写CSDN博客提取器源码分析之三:处理网页保存为pdf文件
- 第二人生的源码分析(八十四)保存UI的界面布局
- quartz源码分析(四)
- 自己动手写CSDN博客提取器源码分析之二:处理网页保存为doc文件
- 【Cocos2d-x源码分析】 UserDefault如何保存本地数据
- quartz集群调度机制调研及源码分析---转载
- Quartz源码分析(二)
- memcached源码分析(五):数据保存及内存管理
- [置顶] 自己动手写CSDN博客提取器源码分析之一:处理网页保存为txt文件
- 蔡军生先生第二人生的源码分析(十六)保存人物角色的XML文件
- 自己动手写CSDN博客提取器源码分析之二:处理网页保存为doc文件
- spark1.2.0源码分析之spark streaming保存数据