您的位置:首页 > 其它

quartz的schedulerFactory实现解析

2017-03-17 16:55 113 查看
由于需要实现一个scheduler,用于添加job,更改job等功能,但是没有调度功能。没错主要是用于监控和管理集群用的。所以需要仔细看scheduler的接口规范,于是又了这篇文章。

SchedulerFactory 是一个接口,用于Scheduler的创建和管理。接口很简单,这里主要是解析StdSchedulerFactory 的实现方式。

1、构造函数:

[java] view
plain copy

public StdSchedulerFactory(Properties props) throws SchedulerException {

[java] view
plain copy

//把初始化参数代理给<span style="font-family: Arial, Helvetica, sans-serif;">initialize,通过Properties来做初始化</span>

initialize(props);

}

public StdSchedulerFactory(String fileName) throws SchedulerException {

[java] view
plain copy

//通过配置文件初始化,这也会把文件解析为Properties

initialize(fileName);

}

2、初始化过程

初始化过程,首先会看是否传入了配置文件或Properties属性,没有的话,会根据系统变量--》工程classpath--》quartz的包的顺序查找quartz.properties文件

查找配置文件过程如下:

[java] view
plain copy

public void initialize() throws SchedulerException {

// short-circuit if already initialized

if (cfg != null) {//找到文件后会构建一个cfg属性,所以如果cfg!=null说明已经初始化过了

return;

}

if (initException != null) {

throw initException;

}

//通过查找系统变量查找配置文件

String requestedFile = System.getProperty(PROPERTIES_FILE);

[java] view
plain copy

//如果没有找到就用默认的文件名

String propFileName = requestedFile != null ? requestedFile

: "quartz.properties";

[java] view
plain copy

File propFile = new File(propFileName);

Properties props = new Properties();

InputStream in = null;

try {

[java] view
plain copy

//先查看指定的配置文件是否存在,存在就读取,注意这里的读取方式用文件系统的路径来读取的

if (propFile.exists()) {

try {

if (requestedFile != null) {

propSrc = "specified file: '" + requestedFile + "'";

} else {

propSrc = "default file in current working dir: 'quartz.properties'";

}

in = new BufferedInputStream(new FileInputStream(propFileName));

props.load(in);

} catch (IOException ioe) {

initException = new SchedulerException("Properties file: '"

+ propFileName + "' could not be read.", ioe);

throw initException;

}

[java] view
plain copy

//如果通过文件系统的路径读不到,那么就通过<span style="font-family: Arial, Helvetica, sans-serif;">ContextClassLoader的</span>路径来读取

} else if (requestedFile != null) {

in =

Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile);

if(in == null) {

initException = new SchedulerException("Properties file: '"

+ requestedFile + "' could not be found.");

throw initException;

}

propSrc = "specified file: '" + requestedFile + "' in the class resource path.";

in = new BufferedInputStream(in);

try {

props.load(in);

} catch (IOException ioe) {

initException = new SchedulerException("Properties file: '"

+ requestedFile + "' could not be read.", ioe);

throw initException;

}

} else {

propSrc = "default resource file in Quartz package: 'quartz.properties'";

//通过当前的classLoader来读取,但是一般情况下这个和当前线程的classLoader基本是一个

ClassLoader cl = getClass().getClassLoader();

if(cl == null)

cl = findClassloader();

if(cl == null)

throw new SchedulerConfigException("Unable to find a class loader on the current thread or class.");

in = cl.getResourceAsStream(

"quartz.properties");

if (in == null) {

in = cl.getResourceAsStream(

"/quartz.properties");

}

[java] view
plain copy

<span style="font-family: Arial, Helvetica, sans-serif;"> //是在找不到,那么就从quartz的包找了</span>

[java] view
plain copy

if (in == null) {

in = cl.getResourceAsStream(

"org/quartz/quartz.properties");

}

if (in == null) {

initException = new SchedulerException(

"Default quartz.properties not found in class path");

throw initException;

}

try {

props.load(in);

} catch (IOException ioe) {

initException = new SchedulerException(

"Resource properties file: 'org/quartz/quartz.properties' "

+ "could not be read from the classpath.", ioe);

throw initException;

}

}

} finally {

if(in != null) {

try { in.close(); } catch(IOException ignore) { /* ignore */ }

}

}

//最后解析为一个Properties去配置schuduler

initialize(overrideWithSysProps(props));

上面都在准备配置信息,这些最后会生成一个Cfg对象,用它去获取属性,配置Scheduler,在调用

[java] view
plain copy

Scheduler getScheduler()

的时候根据会首先检查是否已经创建了,没有创建会根据这些属性创建新的Scheduler:

[java] view
plain copy

instantiate()

instantiate的关键代码:

关键属性:

[java] view
plain copy

JobStore js = null;

ThreadPool tp = null;

QuartzScheduler qs = null;

DBConnectionManager dbMgr = null;

String instanceIdGeneratorClass = null;

Properties tProps = null;

String userTXLocation = null;

boolean wrapJobInTx = false;

boolean autoId = false;

long idleWaitTime = -1;

long dbFailureRetry = 15000L; // 15 secs

String classLoadHelperClass;

String jobFactoryClass;

ThreadExecutor threadExecutor;

schedulerId的生成:

[java] view
plain copy

String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,

DEFAULT_INSTANCE_ID);

//如果配置成自动生成,那么就去获取自动生成器生成

if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) {

autoId = true;

instanceIdGeneratorClass = cfg.getStringProperty(

PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,

"org.quartz.simpl.SimpleInstanceIdGenerator");

}

[java] view
plain copy

//如果配置成系统属性Id,那么就用相应的class生成

else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) {

autoId = true;

instanceIdGeneratorClass =

"org.quartz.simpl.SystemPropertyInstanceIdGenerator";

}

获取UserTansaction的Jndi url,在分布式事务中使用

[java] view
plain copy

userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL,

userTXLocation);

是否用UserTansaction来执行Job:

[java] view
plain copy

wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,

wrapJobInTx);

job实例化factory:

[java] view
plain copy

jobFactoryClass = cfg.getStringProperty(

PROP_SCHED_JOB_FACTORY_CLASS, null);

jmx设置相关:

[java] view
plain copy

boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);

String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);

boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);

String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);

rmi相关:

[java] view
plain copy

boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false);

boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false);

String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost");

int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099);

int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1);

String rmiCreateRegistry = cfg.getStringProperty(

PROP_SCHED_RMI_CREATE_REGISTRY,

QuartzSchedulerResources.CREATE_REGISTRY_NEVER);

String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);

不过Jmx和rmi是不能同时开启的:

[java] view
plain copy

if (jmxProxy && rmiProxy) {

throw new SchedulerConfigException("Cannot proxy both RMI and JMX.");

}

如果是rmi代理的就创建remoteScheduler:(不会有集群效果,毕竟所有操作都是远程那边在做)

[java] view
plain copy

if (rmiProxy) {

if (autoId) {

schedInstId = DEFAULT_INSTANCE_ID;

}

String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(

schedName, schedInstId) : rmiBindName;

RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);

schedRep.bind(remoteScheduler);

return remoteScheduler;

}

如果是jmx远程代理就用jmxscheduler:

[java] view
plain copy

if (jmxProxy) {

if (autoId) {

schedInstId = DEFAULT_INSTANCE_ID;

}

if (jmxProxyClass == null) {

throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");

}

RemoteMBeanScheduler jmxScheduler = null;

try {

jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)

.newInstance();

} catch (Exception e) {

throw new SchedulerConfigException(

"Unable to instantiate RemoteMBeanScheduler class.", e);

}

if (jmxObjectName == null) {

jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);

}

jmxScheduler.setSchedulerObjectName(jmxObjectName);

tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);

try {

setBeanProps(jmxScheduler, tProps);

} catch (Exception e) {

initException = new SchedulerException("RemoteMBeanScheduler class '"

+ jmxProxyClass + "' props could not be configured.", e);

throw initException;

}

jmxScheduler.initialize();

schedRep.bind(jmxScheduler);

return jmxScheduler;

}

初始化jobFactory:

[java] view
plain copy

JobFactory jobFactory = null;

if(jobFactoryClass != null) {

try {

jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)

.newInstance();

} catch (Exception e) {

throw new SchedulerConfigException(

"Unable to instantiate JobFactory class: "

+ e.getMessage(), e);

}

//获取jobFactory需要的属性,这里的方法就是定义自定义属性名(固定前缀)的实现方式,比如说常用的datasource

tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);

try {

setBeanProps(jobFactory, tProps);

} catch (Exception e) {

initException = new SchedulerException("JobFactory class '"

+ jobFactoryClass + "' props could not be configured.", e);

throw initException;

}

}

配置ThreadPool,属性值的获取方式和JobFactory一直:

[java] view
plain copy

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

if (tpClass == null) {

initException = new SchedulerException(

"ThreadPool class not specified. ");

throw initException;

}

try {

tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();

} catch (Exception e) {

initException = new SchedulerException("ThreadPool class '"

+ tpClass + "' could not be instantiated.", e);

throw initException;

}

tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);

try {

setBeanProps(tp, tProps);

} catch (Exception e) {

initException = new SchedulerException("ThreadPool class '"

+ tpClass + "' props could not be configured.", e);

throw initException;

}

jobStore的配置:

[java] view
plain copy

String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,

RAMJobStore.class.getName());

if (jsClass == null) {

initException = new SchedulerException(

"JobStore class not specified. ");

throw initException;

}

try {

js = (JobStore) loadHelper.loadClass(jsClass).newInstance();

} catch (Exception e) {

initException = new SchedulerException("JobStore class '" + jsClass

+ "' could not be instantiated.", e);

throw initException;

}

//设置JobStore的scheduleName和scheduleId两个值,就是简单的反射设置而已

SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});

try {

setBeanProps(js, tProps);

} catch (Exception e) {

initException = new SchedulerException("JobStore class '" + jsClass

+ "' props could not be configured.", e);

throw initException;

}

//如果是JobStoreSupport的话,那么就需要设置锁控制器,用于集群的调度同步

if (js instanceof JobStoreSupport) {

// Install custom lock handler (Semaphore)

String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);

if (lockHandlerClass != null) {

try {

Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();

tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);

// If this lock handler requires the table prefix, add it to its properties.

if (lockHandler instanceof TablePrefixAware) {

tProps.setProperty(

PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());

tProps.setProperty(

PROP_SCHED_NAME, schedName);

}

try {

setBeanProps(lockHandler, tProps);

} catch (Exception e) {

initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass

+ "' props could not be configured.", e);

throw initException;

}

((JobStoreSupport)js).setLockHandler(lockHandler);

getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);

} catch (Exception e) {

initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass

+ "' could not be instantiated.", e);

throw initException;

}

}

}

获取datasource,可能多个:

[java] view
plain copy

String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);//获取多个dataSource,所以需要JTX分布式事务的支持,因为有多数据源的job存在的

for (int i = 0; i < dsNames.length; i++) {

PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(

PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

//先检测connectionProvider这种配置方式,本质上所有的datasource都会被封装为某种Provider,Provider是quartz对datasource的一中封装而已

String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

// custom connectionProvider...

if(cpClass != null) {

ConnectionProvider cp = null;

try {

cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();

} catch (Exception e) {

initException = new SchedulerException("ConnectionProvider class '" + cpClass

+ "' could not be instantiated.", e);

throw initException;

}

try {//移除Provider这个配置后,把其他配置关联到provider

// remove the class name, so it isn't attempted to be set

pp.getUnderlyingProperties().remove(

PROP_CONNECTION_PROVIDER_CLASS);

setBeanProps(cp, pp.getUnderlyingProperties());

cp.initialize();

} catch (Exception e) {

initException = new SchedulerException("ConnectionProvider class '" + cpClass

+ "' props could not be configured.", e);

throw initException;

}

//把所有的Datasource加到manager

dbMgr = DBConnectionManager.getInstance();

dbMgr.addConnectionProvider(dsNames[i], cp);

} else {

[java] view
plain copy

//查找Jndi这种配置

String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);

if (dsJndi != null) {

boolean dsAlwaysLookup = pp.getBooleanProperty(

PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);

String dsJndiInitial = pp.getStringProperty(

PROP_DATASOURCE_JNDI_INITIAL);

String dsJndiProvider = pp.getStringProperty(

PROP_DATASOURCE_JNDI_PROVDER);

String dsJndiPrincipal = pp.getStringProperty(

PROP_DATASOURCE_JNDI_PRINCIPAL);

String dsJndiCredentials = pp.getStringProperty(

PROP_DATASOURCE_JNDI_CREDENTIALS);

Properties props = null;

if (null != dsJndiInitial || null != dsJndiProvider

|| null != dsJndiPrincipal || null != dsJndiCredentials) {

props = new Properties();

if (dsJndiInitial != null) {

props.put(PROP_DATASOURCE_JNDI_INITIAL,

dsJndiInitial);

}

if (dsJndiProvider != null) {

props.put(PROP_DATASOURCE_JNDI_PROVDER,

dsJndiProvider);

}

if (dsJndiPrincipal != null) {

props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,

dsJndiPrincipal);

}

if (dsJndiCredentials != null) {

props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,

dsJndiCredentials);

}

}

JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,

props, dsAlwaysLookup);

dbMgr = DBConnectionManager.getInstance();

dbMgr.addConnectionProvider(dsNames[i], cp);

} else {

[java] view
plain copy

//检查local driver这种配置

String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);

String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);

if (dsDriver == null) {

initException = new SchedulerException(

"Driver not specified for DataSource: "

+ dsNames[i]);

throw initException;

}

if (dsURL == null) {

initException = new SchedulerException(

"DB URL not specified for DataSource: "

+ dsNames[i]);

throw initException;

}

try {

PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());

dbMgr = DBConnectionManager.getInstance();

dbMgr.addConnectionProvider(dsNames[i], cp);

} catch (SQLException sqle) {

initException = new SchedulerException(

"Could not initialize DataSource: " + dsNames[i],

sqle);

throw initException;

}

}

}

接下来还有SchedulerPlugins、JobListeners、TriggerListeners、ThreadExecutor等的配置。基本都大同小异,毕竟都是配置文件解析而已。

其中ThreadExecutor是执行池,主要是执行schedule 和refire任务。

然后初始化JobRunShellFactory,这个是一个封装执行Job的factory,主要是为了一些listener的执行和异常处理:

[java] view
plain copy

JobRunShellFactory jrsf = null; // Create correct run-shell factory...

if (userTXLocation != null) {

UserTransactionHelper.setUserTxLocation(userTXLocation);

}

//更具JTA的配置情况初始化特定的RunShellFactory

if (wrapJobInTx) {

jrsf = new JTAJobRunShellFactory();

} else {

jrsf = new JTAAnnotationAwareJobRunShellFactory();

}

对于db JobStore的话是有一些处理的:

[java] view
plain copy

if (js instanceof JobStoreSupport) {

JobStoreSupport jjs = (JobStoreSupport)js;

jjs.setDbRetryInterval(dbFailureRetry);

if(threadsInheritInitalizersClassLoader)

jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);

jjs.setThreadExecutor(threadExecutor);//所以 ThreadExecutor也用来处理数据库的链接的(比如说定时重试)

}

初始化以上对象和资源后,quartz会把他保存到QuartzSchedulerResources里面,所以你会看到代码的很多地方都有用到这个对象:

[java] view
plain copy

QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();

rsrcs.setName(schedName);

rsrcs.setThreadName(threadName);

rsrcs.setInstanceId(schedInstId);

rsrcs.setJobRunShellFactory(jrsf);

rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);

rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);

rsrcs.setRunUpdateCheck(!skipUpdateCheck);

rsrcs.setBatchTimeWindow(batchTimeWindow);

rsrcs.setMaxBatchSize(maxBatchSize);

rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);

rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);

rsrcs.setJMXExport(jmxExport);

rsrcs.setJMXObjectName(jmxObjectName);

Sheduler的初始化和设置在下面:

[java] view
plain copy

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);//实例化Scheduler

qsInited = true;

// Create Scheduler ref...

Scheduler scheduler = instantiate(rsrcs, qs);//封装QuartzScheduler为StdQuartzScheduler,其实<span style="font-family: Arial, Helvetica, sans-serif;">StdQuartzScheduler就是一个装饰模式而已,QuartzSch//eduler是没有实现Sheduler接口的</span>

// set job factory if specified

if(jobFactory != null) {

qs.setJobFactory(jobFactory);

}

// Initialize plugins now that we have a Scheduler instance.

for (int i = 0; i < plugins.length; i++) {

plugins[i].initialize(pluginNames[i], scheduler, loadHelper);

}

// add listeners

for (int i = 0; i < jobListeners.length; i++) {

qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());

}

for (int i = 0; i < triggerListeners.length; i++) {

qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());

}

// set scheduler context data...

for(Object key: schedCtxtProps.keySet()) {

String val = schedCtxtProps.getProperty((String) key);

scheduler.getContext().put((String)key, val);

}

// fire up job store, and runshell factory

//启动job

js.setInstanceId(schedInstId);

js.setInstanceName(schedName);

js.setThreadPoolSize(tp.getPoolSize());

js.initialize(loadHelper, qs.getSchedulerSignaler());//是一个回掉Job Scheduler的接口<span style="font-family: Arial, Helvetica, sans-serif;">SchedulerSignaler</span>

//启动jobRunShellFactory,<span style="font-family: Arial, Helvetica, sans-serif;">initialize方法就是标准的初始化方法,是quartz的代码习惯</span>

jrsf.initialize(scheduler);

//启动sheduler

qs.initialize();

getLog().info(

"Quartz scheduler '" + scheduler.getSchedulerName()

+ "' initialized from " + propSrc);

getLog().info("Quartz scheduler version: " + qs.getVersion());

//添加资源引用防止被垃圾回收

// prevents the repository from being garbage collected

qs.addNoGCObject(schedRep);

// prevents the db manager from being garbage collected

if (dbMgr != null) {

qs.addNoGCObject(dbMgr);

}

总结下来,其实挺简单的:读取配置文件,安装前缀的风格初始化各种类和属性,然后包装到QuartzSchedulerResources 给Sheduler用,然后初始化:

jobStore、ThreadPool、JobRunShellFactory、ThreadExecutor、jobFactory、开启各种mananger、初始化Scheduler并为其添加pluger和listener,
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  web调度框架