quartz的schedulerFactory实现解析
2017-03-17 16:55
113 查看
由于需要实现一个scheduler,用于添加job,更改job等功能,但是没有调度功能。没错主要是用于监控和管理集群用的。所以需要仔细看scheduler的接口规范,于是又了这篇文章。
SchedulerFactory 是一个接口,用于Scheduler的创建和管理。接口很简单,这里主要是解析StdSchedulerFactory 的实现方式。
1、构造函数:
[java] view
plain copy
public StdSchedulerFactory(Properties props) throws SchedulerException {
[java] view
plain copy
//把初始化参数代理给<span style="font-family: Arial, Helvetica, sans-serif;">initialize,通过Properties来做初始化</span>
initialize(props);
}
public StdSchedulerFactory(String fileName) throws SchedulerException {
[java] view
plain copy
//通过配置文件初始化,这也会把文件解析为Properties
initialize(fileName);
}
2、初始化过程
初始化过程,首先会看是否传入了配置文件或Properties属性,没有的话,会根据系统变量--》工程classpath--》quartz的包的顺序查找quartz.properties文件
查找配置文件过程如下:
[java] view
plain copy
public void initialize() throws SchedulerException {
// short-circuit if already initialized
if (cfg != null) {//找到文件后会构建一个cfg属性,所以如果cfg!=null说明已经初始化过了
return;
}
if (initException != null) {
throw initException;
}
//通过查找系统变量查找配置文件
String requestedFile = System.getProperty(PROPERTIES_FILE);
[java] view
plain copy
//如果没有找到就用默认的文件名
String propFileName = requestedFile != null ? requestedFile
: "quartz.properties";
[java] view
plain copy
File propFile = new File(propFileName);
Properties props = new Properties();
InputStream in = null;
try {
[java] view
plain copy
//先查看指定的配置文件是否存在,存在就读取,注意这里的读取方式用文件系统的路径来读取的
if (propFile.exists()) {
try {
if (requestedFile != null) {
propSrc = "specified file: '" + requestedFile + "'";
} else {
propSrc = "default file in current working dir: 'quartz.properties'";
}
in = new BufferedInputStream(new FileInputStream(propFileName));
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ propFileName + "' could not be read.", ioe);
throw initException;
}
[java] view
plain copy
//如果通过文件系统的路径读不到,那么就通过<span style="font-family: Arial, Helvetica, sans-serif;">ContextClassLoader的</span>路径来读取
} else if (requestedFile != null) {
in =
Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile);
if(in == null) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be found.");
throw initException;
}
propSrc = "specified file: '" + requestedFile + "' in the class resource path.";
in = new BufferedInputStream(in);
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be read.", ioe);
throw initException;
}
} else {
propSrc = "default resource file in Quartz package: 'quartz.properties'";
//通过当前的classLoader来读取,但是一般情况下这个和当前线程的classLoader基本是一个
ClassLoader cl = getClass().getClassLoader();
if(cl == null)
cl = findClassloader();
if(cl == null)
throw new SchedulerConfigException("Unable to find a class loader on the current thread or class.");
in = cl.getResourceAsStream(
"quartz.properties");
if (in == null) {
in = cl.getResourceAsStream(
"/quartz.properties");
}
[java] view
plain copy
<span style="font-family: Arial, Helvetica, sans-serif;"> //是在找不到,那么就从quartz的包找了</span>
[java] view
plain copy
if (in == null) {
in = cl.getResourceAsStream(
"org/quartz/quartz.properties");
}
if (in == null) {
initException = new SchedulerException(
"Default quartz.properties not found in class path");
throw initException;
}
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException(
"Resource properties file: 'org/quartz/quartz.properties' "
+ "could not be read from the classpath.", ioe);
throw initException;
}
}
} finally {
if(in != null) {
try { in.close(); } catch(IOException ignore) { /* ignore */ }
}
}
//最后解析为一个Properties去配置schuduler
initialize(overrideWithSysProps(props));
上面都在准备配置信息,这些最后会生成一个Cfg对象,用它去获取属性,配置Scheduler,在调用
[java] view
plain copy
Scheduler getScheduler()
的时候根据会首先检查是否已经创建了,没有创建会根据这些属性创建新的Scheduler:
[java] view
plain copy
instantiate()
instantiate的关键代码:
关键属性:
[java] view
plain copy
JobStore js = null;
ThreadPool tp = null;
QuartzScheduler qs = null;
DBConnectionManager dbMgr = null;
String instanceIdGeneratorClass = null;
Properties tProps = null;
String userTXLocation = null;
boolean wrapJobInTx = false;
boolean autoId = false;
long idleWaitTime = -1;
long dbFailureRetry = 15000L; // 15 secs
String classLoadHelperClass;
String jobFactoryClass;
ThreadExecutor threadExecutor;
schedulerId的生成:
[java] view
plain copy
String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,
DEFAULT_INSTANCE_ID);
//如果配置成自动生成,那么就去获取自动生成器生成
if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) {
autoId = true;
instanceIdGeneratorClass = cfg.getStringProperty(
PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,
"org.quartz.simpl.SimpleInstanceIdGenerator");
}
[java] view
plain copy
//如果配置成系统属性Id,那么就用相应的class生成
else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) {
autoId = true;
instanceIdGeneratorClass =
"org.quartz.simpl.SystemPropertyInstanceIdGenerator";
}
获取UserTansaction的Jndi url,在分布式事务中使用
[java] view
plain copy
userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL,
userTXLocation);
是否用UserTansaction来执行Job:
[java] view
plain copy
wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,
wrapJobInTx);
job实例化factory:
[java] view
plain copy
jobFactoryClass = cfg.getStringProperty(
PROP_SCHED_JOB_FACTORY_CLASS, null);
jmx设置相关:
[java] view
plain copy
boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);
String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);
boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);
String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);
rmi相关:
[java] view
plain copy
boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false);
boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false);
String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost");
int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099);
int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1);
String rmiCreateRegistry = cfg.getStringProperty(
PROP_SCHED_RMI_CREATE_REGISTRY,
QuartzSchedulerResources.CREATE_REGISTRY_NEVER);
String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);
不过Jmx和rmi是不能同时开启的:
[java] view
plain copy
if (jmxProxy && rmiProxy) {
throw new SchedulerConfigException("Cannot proxy both RMI and JMX.");
}
如果是rmi代理的就创建remoteScheduler:(不会有集群效果,毕竟所有操作都是远程那边在做)
[java] view
plain copy
if (rmiProxy) {
if (autoId) {
schedInstId = DEFAULT_INSTANCE_ID;
}
String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
schedName, schedInstId) : rmiBindName;
RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);
schedRep.bind(remoteScheduler);
return remoteScheduler;
}
如果是jmx远程代理就用jmxscheduler:
[java] view
plain copy
if (jmxProxy) {
if (autoId) {
schedInstId = DEFAULT_INSTANCE_ID;
}
if (jmxProxyClass == null) {
throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
}
RemoteMBeanScheduler jmxScheduler = null;
try {
jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
.newInstance();
} catch (Exception e) {
throw new SchedulerConfigException(
"Unable to instantiate RemoteMBeanScheduler class.", e);
}
if (jmxObjectName == null) {
jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
}
jmxScheduler.setSchedulerObjectName(jmxObjectName);
tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
try {
setBeanProps(jmxScheduler, tProps);
} catch (Exception e) {
initException = new SchedulerException("RemoteMBeanScheduler class '"
+ jmxProxyClass + "' props could not be configured.", e);
throw initException;
}
jmxScheduler.initialize();
schedRep.bind(jmxScheduler);
return jmxScheduler;
}
初始化jobFactory:
[java] view
plain copy
JobFactory jobFactory = null;
if(jobFactoryClass != null) {
try {
jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
.newInstance();
} catch (Exception e) {
throw new SchedulerConfigException(
"Unable to instantiate JobFactory class: "
+ e.getMessage(), e);
}
//获取jobFactory需要的属性,这里的方法就是定义自定义属性名(固定前缀)的实现方式,比如说常用的datasource
tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
try {
setBeanProps(jobFactory, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobFactory class '"
+ jobFactoryClass + "' props could not be configured.", e);
throw initException;
}
}
配置ThreadPool,属性值的获取方式和JobFactory一直:
[java] view
plain copy
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {
initException = new SchedulerException(
"ThreadPool class not specified. ");
throw initException;
}
try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' could not be instantiated.", e);
throw initException;
}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
setBeanProps(tp, tProps);
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' props could not be configured.", e);
throw initException;
}
jobStore的配置:
[java] view
plain copy
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
if (jsClass == null) {
initException = new SchedulerException(
"JobStore class not specified. ");
throw initException;
}
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' could not be instantiated.", e);
throw initException;
}
//设置JobStore的scheduleName和scheduleId两个值,就是简单的反射设置而已
SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
setBeanProps(js, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' props could not be configured.", e);
throw initException;
}
//如果是JobStoreSupport的话,那么就需要设置锁控制器,用于集群的调度同步
if (js instanceof JobStoreSupport) {
// Install custom lock handler (Semaphore)
String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
if (lockHandlerClass != null) {
try {
Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);
// If this lock handler requires the table prefix, add it to its properties.
if (lockHandler instanceof TablePrefixAware) {
tProps.setProperty(
PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
tProps.setProperty(
PROP_SCHED_NAME, schedName);
}
try {
setBeanProps(lockHandler, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
+ "' props could not be configured.", e);
throw initException;
}
((JobStoreSupport)js).setLockHandler(lockHandler);
getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);
} catch (Exception e) {
initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
+ "' could not be instantiated.", e);
throw initException;
}
}
}
获取datasource,可能多个:
[java] view
plain copy
String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);//获取多个dataSource,所以需要JTX分布式事务的支持,因为有多数据源的job存在的
for (int i = 0; i < dsNames.length; i++) {
PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));
//先检测connectionProvider这种配置方式,本质上所有的datasource都会被封装为某种Provider,Provider是quartz对datasource的一中封装而已
String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);
// custom connectionProvider...
if(cpClass != null) {
ConnectionProvider cp = null;
try {
cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ConnectionProvider class '" + cpClass
+ "' could not be instantiated.", e);
throw initException;
}
try {//移除Provider这个配置后,把其他配置关联到provider
// remove the class name, so it isn't attempted to be set
pp.getUnderlyingProperties().remove(
PROP_CONNECTION_PROVIDER_CLASS);
setBeanProps(cp, pp.getUnderlyingProperties());
cp.initialize();
} catch (Exception e) {
initException = new SchedulerException("ConnectionProvider class '" + cpClass
+ "' props could not be configured.", e);
throw initException;
}
//把所有的Datasource加到manager
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} else {
[java] view
plain copy
//查找Jndi这种配置
String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);
if (dsJndi != null) {
boolean dsAlwaysLookup = pp.getBooleanProperty(
PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
String dsJndiInitial = pp.getStringProperty(
PROP_DATASOURCE_JNDI_INITIAL);
String dsJndiProvider = pp.getStringProperty(
PROP_DATASOURCE_JNDI_PROVDER);
String dsJndiPrincipal = pp.getStringProperty(
PROP_DATASOURCE_JNDI_PRINCIPAL);
String dsJndiCredentials = pp.getStringProperty(
PROP_DATASOURCE_JNDI_CREDENTIALS);
Properties props = null;
if (null != dsJndiInitial || null != dsJndiProvider
|| null != dsJndiPrincipal || null != dsJndiCredentials) {
props = new Properties();
if (dsJndiInitial != null) {
props.put(PROP_DATASOURCE_JNDI_INITIAL,
dsJndiInitial);
}
if (dsJndiProvider != null) {
props.put(PROP_DATASOURCE_JNDI_PROVDER,
dsJndiProvider);
}
if (dsJndiPrincipal != null) {
props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
dsJndiPrincipal);
}
if (dsJndiCredentials != null) {
props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
dsJndiCredentials);
}
}
JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
props, dsAlwaysLookup);
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} else {
[java] view
plain copy
//检查local driver这种配置
String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);
if (dsDriver == null) {
initException = new SchedulerException(
"Driver not specified for DataSource: "
+ dsNames[i]);
throw initException;
}
if (dsURL == null) {
initException = new SchedulerException(
"DB URL not specified for DataSource: "
+ dsNames[i]);
throw initException;
}
try {
PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} catch (SQLException sqle) {
initException = new SchedulerException(
"Could not initialize DataSource: " + dsNames[i],
sqle);
throw initException;
}
}
}
接下来还有SchedulerPlugins、JobListeners、TriggerListeners、ThreadExecutor等的配置。基本都大同小异,毕竟都是配置文件解析而已。
其中ThreadExecutor是执行池,主要是执行schedule 和refire任务。
然后初始化JobRunShellFactory,这个是一个封装执行Job的factory,主要是为了一些listener的执行和异常处理:
[java] view
plain copy
JobRunShellFactory jrsf = null; // Create correct run-shell factory...
if (userTXLocation != null) {
UserTransactionHelper.setUserTxLocation(userTXLocation);
}
//更具JTA的配置情况初始化特定的RunShellFactory
if (wrapJobInTx) {
jrsf = new JTAJobRunShellFactory();
} else {
jrsf = new JTAAnnotationAwareJobRunShellFactory();
}
对于db JobStore的话是有一些处理的:
[java] view
plain copy
if (js instanceof JobStoreSupport) {
JobStoreSupport jjs = (JobStoreSupport)js;
jjs.setDbRetryInterval(dbFailureRetry);
if(threadsInheritInitalizersClassLoader)
jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
jjs.setThreadExecutor(threadExecutor);//所以 ThreadExecutor也用来处理数据库的链接的(比如说定时重试)
}
初始化以上对象和资源后,quartz会把他保存到QuartzSchedulerResources里面,所以你会看到代码的很多地方都有用到这个对象:
[java] view
plain copy
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
rsrcs.setRunUpdateCheck(!skipUpdateCheck);
rsrcs.setBatchTimeWindow(batchTimeWindow);
rsrcs.setMaxBatchSize(maxBatchSize);
rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
rsrcs.setJMXExport(jmxExport);
rsrcs.setJMXObjectName(jmxObjectName);
Sheduler的初始化和设置在下面:
[java] view
plain copy
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);//实例化Scheduler
qsInited = true;
// Create Scheduler ref...
Scheduler scheduler = instantiate(rsrcs, qs);//封装QuartzScheduler为StdQuartzScheduler,其实<span style="font-family: Arial, Helvetica, sans-serif;">StdQuartzScheduler就是一个装饰模式而已,QuartzSch//eduler是没有实现Sheduler接口的</span>
// set job factory if specified
if(jobFactory != null) {
qs.setJobFactory(jobFactory);
}
// Initialize plugins now that we have a Scheduler instance.
for (int i = 0; i < plugins.length; i++) {
plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
}
// add listeners
for (int i = 0; i < jobListeners.length; i++) {
qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
}
for (int i = 0; i < triggerListeners.length; i++) {
qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
}
// set scheduler context data...
for(Object key: schedCtxtProps.keySet()) {
String val = schedCtxtProps.getProperty((String) key);
scheduler.getContext().put((String)key, val);
}
// fire up job store, and runshell factory
//启动job
js.setInstanceId(schedInstId);
js.setInstanceName(schedName);
js.setThreadPoolSize(tp.getPoolSize());
js.initialize(loadHelper, qs.getSchedulerSignaler());//是一个回掉Job Scheduler的接口<span style="font-family: Arial, Helvetica, sans-serif;">SchedulerSignaler</span>
//启动jobRunShellFactory,<span style="font-family: Arial, Helvetica, sans-serif;">initialize方法就是标准的初始化方法,是quartz的代码习惯</span>
jrsf.initialize(scheduler);
//启动sheduler
qs.initialize();
getLog().info(
"Quartz scheduler '" + scheduler.getSchedulerName()
+ "' initialized from " + propSrc);
getLog().info("Quartz scheduler version: " + qs.getVersion());
//添加资源引用防止被垃圾回收
// prevents the repository from being garbage collected
qs.addNoGCObject(schedRep);
// prevents the db manager from being garbage collected
if (dbMgr != null) {
qs.addNoGCObject(dbMgr);
}
总结下来,其实挺简单的:读取配置文件,安装前缀的风格初始化各种类和属性,然后包装到QuartzSchedulerResources 给Sheduler用,然后初始化:
jobStore、ThreadPool、JobRunShellFactory、ThreadExecutor、jobFactory、开启各种mananger、初始化Scheduler并为其添加pluger和listener,
SchedulerFactory 是一个接口,用于Scheduler的创建和管理。接口很简单,这里主要是解析StdSchedulerFactory 的实现方式。
1、构造函数:
[java] view
plain copy
public StdSchedulerFactory(Properties props) throws SchedulerException {
[java] view
plain copy
//把初始化参数代理给<span style="font-family: Arial, Helvetica, sans-serif;">initialize,通过Properties来做初始化</span>
initialize(props);
}
public StdSchedulerFactory(String fileName) throws SchedulerException {
[java] view
plain copy
//通过配置文件初始化,这也会把文件解析为Properties
initialize(fileName);
}
2、初始化过程
初始化过程,首先会看是否传入了配置文件或Properties属性,没有的话,会根据系统变量--》工程classpath--》quartz的包的顺序查找quartz.properties文件
查找配置文件过程如下:
[java] view
plain copy
public void initialize() throws SchedulerException {
// short-circuit if already initialized
if (cfg != null) {//找到文件后会构建一个cfg属性,所以如果cfg!=null说明已经初始化过了
return;
}
if (initException != null) {
throw initException;
}
//通过查找系统变量查找配置文件
String requestedFile = System.getProperty(PROPERTIES_FILE);
[java] view
plain copy
//如果没有找到就用默认的文件名
String propFileName = requestedFile != null ? requestedFile
: "quartz.properties";
[java] view
plain copy
File propFile = new File(propFileName);
Properties props = new Properties();
InputStream in = null;
try {
[java] view
plain copy
//先查看指定的配置文件是否存在,存在就读取,注意这里的读取方式用文件系统的路径来读取的
if (propFile.exists()) {
try {
if (requestedFile != null) {
propSrc = "specified file: '" + requestedFile + "'";
} else {
propSrc = "default file in current working dir: 'quartz.properties'";
}
in = new BufferedInputStream(new FileInputStream(propFileName));
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ propFileName + "' could not be read.", ioe);
throw initException;
}
[java] view
plain copy
//如果通过文件系统的路径读不到,那么就通过<span style="font-family: Arial, Helvetica, sans-serif;">ContextClassLoader的</span>路径来读取
} else if (requestedFile != null) {
in =
Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile);
if(in == null) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be found.");
throw initException;
}
propSrc = "specified file: '" + requestedFile + "' in the class resource path.";
in = new BufferedInputStream(in);
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException("Properties file: '"
+ requestedFile + "' could not be read.", ioe);
throw initException;
}
} else {
propSrc = "default resource file in Quartz package: 'quartz.properties'";
//通过当前的classLoader来读取,但是一般情况下这个和当前线程的classLoader基本是一个
ClassLoader cl = getClass().getClassLoader();
if(cl == null)
cl = findClassloader();
if(cl == null)
throw new SchedulerConfigException("Unable to find a class loader on the current thread or class.");
in = cl.getResourceAsStream(
"quartz.properties");
if (in == null) {
in = cl.getResourceAsStream(
"/quartz.properties");
}
[java] view
plain copy
<span style="font-family: Arial, Helvetica, sans-serif;"> //是在找不到,那么就从quartz的包找了</span>
[java] view
plain copy
if (in == null) {
in = cl.getResourceAsStream(
"org/quartz/quartz.properties");
}
if (in == null) {
initException = new SchedulerException(
"Default quartz.properties not found in class path");
throw initException;
}
try {
props.load(in);
} catch (IOException ioe) {
initException = new SchedulerException(
"Resource properties file: 'org/quartz/quartz.properties' "
+ "could not be read from the classpath.", ioe);
throw initException;
}
}
} finally {
if(in != null) {
try { in.close(); } catch(IOException ignore) { /* ignore */ }
}
}
//最后解析为一个Properties去配置schuduler
initialize(overrideWithSysProps(props));
上面都在准备配置信息,这些最后会生成一个Cfg对象,用它去获取属性,配置Scheduler,在调用
[java] view
plain copy
Scheduler getScheduler()
的时候根据会首先检查是否已经创建了,没有创建会根据这些属性创建新的Scheduler:
[java] view
plain copy
instantiate()
instantiate的关键代码:
关键属性:
[java] view
plain copy
JobStore js = null;
ThreadPool tp = null;
QuartzScheduler qs = null;
DBConnectionManager dbMgr = null;
String instanceIdGeneratorClass = null;
Properties tProps = null;
String userTXLocation = null;
boolean wrapJobInTx = false;
boolean autoId = false;
long idleWaitTime = -1;
long dbFailureRetry = 15000L; // 15 secs
String classLoadHelperClass;
String jobFactoryClass;
ThreadExecutor threadExecutor;
schedulerId的生成:
[java] view
plain copy
String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,
DEFAULT_INSTANCE_ID);
//如果配置成自动生成,那么就去获取自动生成器生成
if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) {
autoId = true;
instanceIdGeneratorClass = cfg.getStringProperty(
PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,
"org.quartz.simpl.SimpleInstanceIdGenerator");
}
[java] view
plain copy
//如果配置成系统属性Id,那么就用相应的class生成
else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) {
autoId = true;
instanceIdGeneratorClass =
"org.quartz.simpl.SystemPropertyInstanceIdGenerator";
}
获取UserTansaction的Jndi url,在分布式事务中使用
[java] view
plain copy
userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL,
userTXLocation);
是否用UserTansaction来执行Job:
[java] view
plain copy
wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,
wrapJobInTx);
job实例化factory:
[java] view
plain copy
jobFactoryClass = cfg.getStringProperty(
PROP_SCHED_JOB_FACTORY_CLASS, null);
jmx设置相关:
[java] view
plain copy
boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);
String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);
boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);
String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);
rmi相关:
[java] view
plain copy
boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false);
boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false);
String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost");
int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099);
int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1);
String rmiCreateRegistry = cfg.getStringProperty(
PROP_SCHED_RMI_CREATE_REGISTRY,
QuartzSchedulerResources.CREATE_REGISTRY_NEVER);
String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);
不过Jmx和rmi是不能同时开启的:
[java] view
plain copy
if (jmxProxy && rmiProxy) {
throw new SchedulerConfigException("Cannot proxy both RMI and JMX.");
}
如果是rmi代理的就创建remoteScheduler:(不会有集群效果,毕竟所有操作都是远程那边在做)
[java] view
plain copy
if (rmiProxy) {
if (autoId) {
schedInstId = DEFAULT_INSTANCE_ID;
}
String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
schedName, schedInstId) : rmiBindName;
RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);
schedRep.bind(remoteScheduler);
return remoteScheduler;
}
如果是jmx远程代理就用jmxscheduler:
[java] view
plain copy
if (jmxProxy) {
if (autoId) {
schedInstId = DEFAULT_INSTANCE_ID;
}
if (jmxProxyClass == null) {
throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
}
RemoteMBeanScheduler jmxScheduler = null;
try {
jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
.newInstance();
} catch (Exception e) {
throw new SchedulerConfigException(
"Unable to instantiate RemoteMBeanScheduler class.", e);
}
if (jmxObjectName == null) {
jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
}
jmxScheduler.setSchedulerObjectName(jmxObjectName);
tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
try {
setBeanProps(jmxScheduler, tProps);
} catch (Exception e) {
initException = new SchedulerException("RemoteMBeanScheduler class '"
+ jmxProxyClass + "' props could not be configured.", e);
throw initException;
}
jmxScheduler.initialize();
schedRep.bind(jmxScheduler);
return jmxScheduler;
}
初始化jobFactory:
[java] view
plain copy
JobFactory jobFactory = null;
if(jobFactoryClass != null) {
try {
jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
.newInstance();
} catch (Exception e) {
throw new SchedulerConfigException(
"Unable to instantiate JobFactory class: "
+ e.getMessage(), e);
}
//获取jobFactory需要的属性,这里的方法就是定义自定义属性名(固定前缀)的实现方式,比如说常用的datasource
tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
try {
setBeanProps(jobFactory, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobFactory class '"
+ jobFactoryClass + "' props could not be configured.", e);
throw initException;
}
}
配置ThreadPool,属性值的获取方式和JobFactory一直:
[java] view
plain copy
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {
initException = new SchedulerException(
"ThreadPool class not specified. ");
throw initException;
}
try {
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' could not be instantiated.", e);
throw initException;
}
tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
try {
setBeanProps(tp, tProps);
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' props could not be configured.", e);
throw initException;
}
jobStore的配置:
[java] view
plain copy
String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());
if (jsClass == null) {
initException = new SchedulerException(
"JobStore class not specified. ");
throw initException;
}
try {
js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' could not be instantiated.", e);
throw initException;
}
//设置JobStore的scheduleName和scheduleId两个值,就是简单的反射设置而已
SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try {
setBeanProps(js, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore class '" + jsClass
+ "' props could not be configured.", e);
throw initException;
}
//如果是JobStoreSupport的话,那么就需要设置锁控制器,用于集群的调度同步
if (js instanceof JobStoreSupport) {
// Install custom lock handler (Semaphore)
String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
if (lockHandlerClass != null) {
try {
Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();
tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);
// If this lock handler requires the table prefix, add it to its properties.
if (lockHandler instanceof TablePrefixAware) {
tProps.setProperty(
PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());
tProps.setProperty(
PROP_SCHED_NAME, schedName);
}
try {
setBeanProps(lockHandler, tProps);
} catch (Exception e) {
initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
+ "' props could not be configured.", e);
throw initException;
}
((JobStoreSupport)js).setLockHandler(lockHandler);
getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);
} catch (Exception e) {
initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass
+ "' could not be instantiated.", e);
throw initException;
}
}
}
获取datasource,可能多个:
[java] view
plain copy
String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);//获取多个dataSource,所以需要JTX分布式事务的支持,因为有多数据源的job存在的
for (int i = 0; i < dsNames.length; i++) {
PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));
//先检测connectionProvider这种配置方式,本质上所有的datasource都会被封装为某种Provider,Provider是quartz对datasource的一中封装而已
String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);
// custom connectionProvider...
if(cpClass != null) {
ConnectionProvider cp = null;
try {
cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ConnectionProvider class '" + cpClass
+ "' could not be instantiated.", e);
throw initException;
}
try {//移除Provider这个配置后,把其他配置关联到provider
// remove the class name, so it isn't attempted to be set
pp.getUnderlyingProperties().remove(
PROP_CONNECTION_PROVIDER_CLASS);
setBeanProps(cp, pp.getUnderlyingProperties());
cp.initialize();
} catch (Exception e) {
initException = new SchedulerException("ConnectionProvider class '" + cpClass
+ "' props could not be configured.", e);
throw initException;
}
//把所有的Datasource加到manager
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} else {
[java] view
plain copy
//查找Jndi这种配置
String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);
if (dsJndi != null) {
boolean dsAlwaysLookup = pp.getBooleanProperty(
PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
String dsJndiInitial = pp.getStringProperty(
PROP_DATASOURCE_JNDI_INITIAL);
String dsJndiProvider = pp.getStringProperty(
PROP_DATASOURCE_JNDI_PROVDER);
String dsJndiPrincipal = pp.getStringProperty(
PROP_DATASOURCE_JNDI_PRINCIPAL);
String dsJndiCredentials = pp.getStringProperty(
PROP_DATASOURCE_JNDI_CREDENTIALS);
Properties props = null;
if (null != dsJndiInitial || null != dsJndiProvider
|| null != dsJndiPrincipal || null != dsJndiCredentials) {
props = new Properties();
if (dsJndiInitial != null) {
props.put(PROP_DATASOURCE_JNDI_INITIAL,
dsJndiInitial);
}
if (dsJndiProvider != null) {
props.put(PROP_DATASOURCE_JNDI_PROVDER,
dsJndiProvider);
}
if (dsJndiPrincipal != null) {
props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
dsJndiPrincipal);
}
if (dsJndiCredentials != null) {
props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
dsJndiCredentials);
}
}
JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
props, dsAlwaysLookup);
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} else {
[java] view
plain copy
//检查local driver这种配置
String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);
if (dsDriver == null) {
initException = new SchedulerException(
"Driver not specified for DataSource: "
+ dsNames[i]);
throw initException;
}
if (dsURL == null) {
initException = new SchedulerException(
"DB URL not specified for DataSource: "
+ dsNames[i]);
throw initException;
}
try {
PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
dbMgr = DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i], cp);
} catch (SQLException sqle) {
initException = new SchedulerException(
"Could not initialize DataSource: " + dsNames[i],
sqle);
throw initException;
}
}
}
接下来还有SchedulerPlugins、JobListeners、TriggerListeners、ThreadExecutor等的配置。基本都大同小异,毕竟都是配置文件解析而已。
其中ThreadExecutor是执行池,主要是执行schedule 和refire任务。
然后初始化JobRunShellFactory,这个是一个封装执行Job的factory,主要是为了一些listener的执行和异常处理:
[java] view
plain copy
JobRunShellFactory jrsf = null; // Create correct run-shell factory...
if (userTXLocation != null) {
UserTransactionHelper.setUserTxLocation(userTXLocation);
}
//更具JTA的配置情况初始化特定的RunShellFactory
if (wrapJobInTx) {
jrsf = new JTAJobRunShellFactory();
} else {
jrsf = new JTAAnnotationAwareJobRunShellFactory();
}
对于db JobStore的话是有一些处理的:
[java] view
plain copy
if (js instanceof JobStoreSupport) {
JobStoreSupport jjs = (JobStoreSupport)js;
jjs.setDbRetryInterval(dbFailureRetry);
if(threadsInheritInitalizersClassLoader)
jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
jjs.setThreadExecutor(threadExecutor);//所以 ThreadExecutor也用来处理数据库的链接的(比如说定时重试)
}
初始化以上对象和资源后,quartz会把他保存到QuartzSchedulerResources里面,所以你会看到代码的很多地方都有用到这个对象:
[java] view
plain copy
QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
rsrcs.setRunUpdateCheck(!skipUpdateCheck);
rsrcs.setBatchTimeWindow(batchTimeWindow);
rsrcs.setMaxBatchSize(maxBatchSize);
rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
rsrcs.setJMXExport(jmxExport);
rsrcs.setJMXObjectName(jmxObjectName);
Sheduler的初始化和设置在下面:
[java] view
plain copy
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);//实例化Scheduler
qsInited = true;
// Create Scheduler ref...
Scheduler scheduler = instantiate(rsrcs, qs);//封装QuartzScheduler为StdQuartzScheduler,其实<span style="font-family: Arial, Helvetica, sans-serif;">StdQuartzScheduler就是一个装饰模式而已,QuartzSch//eduler是没有实现Sheduler接口的</span>
// set job factory if specified
if(jobFactory != null) {
qs.setJobFactory(jobFactory);
}
// Initialize plugins now that we have a Scheduler instance.
for (int i = 0; i < plugins.length; i++) {
plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
}
// add listeners
for (int i = 0; i < jobListeners.length; i++) {
qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
}
for (int i = 0; i < triggerListeners.length; i++) {
qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
}
// set scheduler context data...
for(Object key: schedCtxtProps.keySet()) {
String val = schedCtxtProps.getProperty((String) key);
scheduler.getContext().put((String)key, val);
}
// fire up job store, and runshell factory
//启动job
js.setInstanceId(schedInstId);
js.setInstanceName(schedName);
js.setThreadPoolSize(tp.getPoolSize());
js.initialize(loadHelper, qs.getSchedulerSignaler());//是一个回掉Job Scheduler的接口<span style="font-family: Arial, Helvetica, sans-serif;">SchedulerSignaler</span>
//启动jobRunShellFactory,<span style="font-family: Arial, Helvetica, sans-serif;">initialize方法就是标准的初始化方法,是quartz的代码习惯</span>
jrsf.initialize(scheduler);
//启动sheduler
qs.initialize();
getLog().info(
"Quartz scheduler '" + scheduler.getSchedulerName()
+ "' initialized from " + propSrc);
getLog().info("Quartz scheduler version: " + qs.getVersion());
//添加资源引用防止被垃圾回收
// prevents the repository from being garbage collected
qs.addNoGCObject(schedRep);
// prevents the db manager from being garbage collected
if (dbMgr != null) {
qs.addNoGCObject(dbMgr);
}
总结下来,其实挺简单的:读取配置文件,安装前缀的风格初始化各种类和属性,然后包装到QuartzSchedulerResources 给Sheduler用,然后初始化:
jobStore、ThreadPool、JobRunShellFactory、ThreadExecutor、jobFactory、开启各种mananger、初始化Scheduler并为其添加pluger和listener,
相关文章推荐
- quartz的schedulerFactory实现解析
- Quartz.net官方开发指南 第十课: 配置、资源使用以及SchedulerFactory
- spring quartz.SchedulerFactoryBean 定时启动检测排期是否过期
- Spring整合quartz配置【一】基于MethodInvokingJobDetailFactoryBean实现任务管理
- Spring通过SchedulerFactoryBean实现调度任务的配置
- 关于quartz cron表达式饭解析验证输出实现部分截图代码
- 常见设计模式的解析和实现(C++)之一-Factory模式
- Spring通过SchedulerFactoryBean实现调度任务的配置
- quartz多个scheduler实现
- Quartz学习之Lesson10-Configuration, Resource Usage and SchedulerFactory
- 设计模式解析与实现(C++)之Factory模式
- Spring通过SchedulerFactoryBean实现调度任务的配置
- Quartz SchedulerFactoryBean异常--Spring自动装配惹的祸
- Quartz+Spring实例应用【一】基于Spring的MethodInvokingJobDetailFactoryBean实现
- 关于Failed to convert property value of type [org.quartz.impl.StdScheduler] to required type [org.springframework.scheduling.quartz.SchedulerFactoryBean
- Quartz中扩展MethodInvokingJobDetailFactoryBean实现对任务调度的拦截
- Quartz中扩展MethodInvokingJobDetailFactoryBean实现对任务调度的拦截
- Quartz 框架 教程(中文版)2.2.x 之第十课:Configuration,Resource Usage和SchedulerFactory
- Spring源码解析 - AbstractBeanFactory 实现接口与父类分析
- Quartz 之 scheduler 类的方法 【实现一个 quartz 管理类】