您的位置:首页 > 其它

quartz的schedulerFactory实现解析

2014-10-17 16:00 281 查看
由于需要实现一个scheduler,用于添加job,更改job等功能,但是没有调度功能。没错主要是用于监控和管理集群用的。所以需要仔细看scheduler的接口规范,于是又了这篇文章。

SchedulerFactory是一个接口,用于Scheduler的创建和管理。接口很简单,这里主要是解析StdSchedulerFactory的实现方式。

1、构造函数:


publicStdSchedulerFactory(Propertiesprops)throwsSchedulerException{//把初始化参数代理给<spanstyle="font-family:Arial,Helvetica,sans-serif;">initialize,通过Properties来做初始化</span>
initialize(props);
}
publicStdSchedulerFactory(StringfileName)throwsSchedulerException{//通过配置文件初始化,这也会把文件解析为Properties
initialize(fileName);
}

2、初始化过程

初始化过程,首先会看是否传入了配置文件或Properties属性,没有的话,会根据系统变量--》工程classpath--》quartz的包的顺序查找quartz.properties文件

查找配置文件过程如下:

publicvoidinitialize()throwsSchedulerException{
//short-circuitifalreadyinitialized
if(cfg!=null){//找到文件后会构建一个cfg属性,所以如果cfg!=null说明已经初始化过了
return;
}
if(initException!=null){
throwinitException;
}
//通过查找系统变量查找配置文件
StringrequestedFile=System.getProperty(PROPERTIES_FILE);//如果没有找到就用默认的文件名
StringpropFileName=requestedFile!=null?requestedFile
:"quartz.properties";
FilepropFile=newFile(propFileName);

Propertiesprops=newProperties();

InputStreamin=null;

try{//先查看指定的配置文件是否存在,存在就读取,注意这里的读取方式用文件系统的路径来读取的
if(propFile.exists()){
try{
if(requestedFile!=null){
propSrc="specifiedfile:'"+requestedFile+"'";
}else{
propSrc="defaultfileincurrentworkingdir:'quartz.properties'";
}

in=newBufferedInputStream(newFileInputStream(propFileName));
props.load(in);

}catch(IOExceptionioe){
initException=newSchedulerException("Propertiesfile:'"
+propFileName+"'couldnotberead.",ioe);
throwinitException;
}//如果通过文件系统的路径读不到,那么就通过<spanstyle="font-family:Arial,Helvetica,sans-serif;">ContextClassLoader的</span>路径来读取
}elseif(requestedFile!=null){
in=
Thread.currentThread().getContextClassLoader().getResourceAsStream(requestedFile);

if(in==null){
initException=newSchedulerException("Propertiesfile:'"
+requestedFile+"'couldnotbefound.");
throwinitException;
}

propSrc="specifiedfile:'"+requestedFile+"'intheclassresourcepath.";

in=newBufferedInputStream(in);
try{
props.load(in);
}catch(IOExceptionioe){
initException=newSchedulerException("Propertiesfile:'"
+requestedFile+"'couldnotberead.",ioe);
throwinitException;
}

}else{
propSrc="defaultresourcefileinQuartzpackage:'quartz.properties'";
//通过当前的classLoader来读取,但是一般情况下这个和当前线程的classLoader基本是一个
ClassLoadercl=getClass().getClassLoader();
if(cl==null)
cl=findClassloader();
if(cl==null)
thrownewSchedulerConfigException("Unabletofindaclassloaderonthecurrentthreadorclass.");

in=cl.getResourceAsStream(
"quartz.properties");

if(in==null){
in=cl.getResourceAsStream(
"/quartz.properties");
}<spanstyle="font-family:Arial,Helvetica,sans-serif;">//是在找不到,那么就从quartz的包找了</span>if(in==null){
in=cl.getResourceAsStream(
"org/quartz/quartz.properties");
}
if(in==null){
initException=newSchedulerException(
"Defaultquartz.propertiesnotfoundinclasspath");
throwinitException;
}
try{
props.load(in);
}catch(IOExceptionioe){
initException=newSchedulerException(
"Resourcepropertiesfile:'org/quartz/quartz.properties'"
+"couldnotbereadfromtheclasspath.",ioe);
throwinitException;
}
}
}finally{
if(in!=null){
try{in.close();}catch(IOExceptionignore){/*ignore*/}
}
}
//最后解析为一个Properties去配置schuduler
initialize(overrideWithSysProps(props));

上面都在准备配置信息,这些最后会生成一个Cfg对象,用它去获取属性,配置Scheduler,在调用

SchedulergetScheduler()的时候根据会首先检查是否已经创建了,没有创建会根据这些属性创建新的Scheduler:

instantiate()instantiate的关键代码:

关键属性:

JobStorejs=null;
ThreadPooltp=null;
QuartzSchedulerqs=null;
DBConnectionManagerdbMgr=null;
StringinstanceIdGeneratorClass=null;
PropertiestProps=null;
StringuserTXLocation=null;
booleanwrapJobInTx=false;
booleanautoId=false;
longidleWaitTime=-1;
longdbFailureRetry=15000L;//15secs
StringclassLoadHelperClass;
StringjobFactoryClass;
ThreadExecutorthreadExecutor;schedulerId的生成:

StringschedInstId=cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,
DEFAULT_INSTANCE_ID);

//如果配置成自动生成,那么就去获取自动生成器生成
if(schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)){
autoId=true;
instanceIdGeneratorClass=cfg.getStringProperty(
PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,
"org.quartz.simpl.SimpleInstanceIdGenerator");
}//如果配置成系统属性Id,那么就用相应的class生成
elseif(schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)){
autoId=true;
instanceIdGeneratorClass=
"org.quartz.simpl.SystemPropertyInstanceIdGenerator";
}获取UserTansaction的Jndiurl,在分布式事务中使用

userTXLocation=cfg.getStringProperty(PROP_SCHED_USER_TX_URL,
userTXLocation);是否用UserTansaction来执行Job:

wrapJobInTx=cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,
wrapJobInTx);

job实例化factory:

jobFactoryClass=cfg.getStringProperty(
PROP_SCHED_JOB_FACTORY_CLASS,null);

jmx设置相关:

booleanjmxExport=cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);
StringjmxObjectName=cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);

booleanjmxProxy=cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);
StringjmxProxyClass=cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);

rmi相关:

booleanrmiExport=cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT,false);
booleanrmiProxy=cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY,false);
StringrmiHost=cfg.getStringProperty(PROP_SCHED_RMI_HOST,"localhost");
intrmiPort=cfg.getIntProperty(PROP_SCHED_RMI_PORT,1099);
intrmiServerPort=cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT,-1);
StringrmiCreateRegistry=cfg.getStringProperty(
PROP_SCHED_RMI_CREATE_REGISTRY,
QuartzSchedulerResources.CREATE_REGISTRY_NEVER);
StringrmiBindName=cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);

不过Jmx和rmi是不能同时开启的:

if(jmxProxy&&rmiProxy){
thrownewSchedulerConfigException("CannotproxybothRMIandJMX.");
}

如果是rmi代理的就创建remoteScheduler:(不会有集群效果,毕竟所有操作都是远程那边在做)

if(rmiProxy){

if(autoId){
schedInstId=DEFAULT_INSTANCE_ID;
}

Stringuid=(rmiBindName==null)?QuartzSchedulerResources.getUniqueIdentifier(
schedName,schedInstId):rmiBindName;

RemoteSchedulerremoteScheduler=newRemoteScheduler(uid,rmiHost,rmiPort);

schedRep.bind(remoteScheduler);

returnremoteScheduler;
}

如果是jmx远程代理就用jmxscheduler:

if(jmxProxy){
if(autoId){
schedInstId=DEFAULT_INSTANCE_ID;
}

if(jmxProxyClass==null){
thrownewSchedulerConfigException("NoJMXProxySchedulerclassprovided");
}

RemoteMBeanSchedulerjmxScheduler=null;
try{
jmxScheduler=(RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
.newInstance();
}catch(Exceptione){
thrownewSchedulerConfigException(
"UnabletoinstantiateRemoteMBeanSchedulerclass.",e);
}

if(jmxObjectName==null){
jmxObjectName=QuartzSchedulerResources.generateJMXObjectName(schedName,schedInstId);
}

jmxScheduler.setSchedulerObjectName(jmxObjectName);

tProps=cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY,true);
try{
setBeanProps(jmxScheduler,tProps);
}catch(Exceptione){
initException=newSchedulerException("RemoteMBeanSchedulerclass'"
+jmxProxyClass+"'propscouldnotbeconfigured.",e);
throwinitException;
}

jmxScheduler.initialize();

schedRep.bind(jmxScheduler);

returnjmxScheduler;
}

初始化jobFactory:

JobFactoryjobFactory=null;
if(jobFactoryClass!=null){
try{
jobFactory=(JobFactory)loadHelper.loadClass(jobFactoryClass)
.newInstance();
}catch(Exceptione){
thrownewSchedulerConfigException(
"UnabletoinstantiateJobFactoryclass:"
+e.getMessage(),e);
}
//获取jobFactory需要的属性,这里的方法就是定义自定义属性名(固定前缀)的实现方式,比如说常用的datasource
tProps=cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX,true);
try{
setBeanProps(jobFactory,tProps);
}catch(Exceptione){
initException=newSchedulerException("JobFactoryclass'"
+jobFactoryClass+"'propscouldnotbeconfigured.",e);
throwinitException;
}
}配置ThreadPool,属性值的获取方式和JobFactory一直:

StringtpClass=cfg.getStringProperty(PROP_THREAD_POOL_CLASS,SimpleThreadPool.class.getName());

if(tpClass==null){
initException=newSchedulerException(
"ThreadPoolclassnotspecified.");
throwinitException;
}

try{
tp=(ThreadPool)loadHelper.loadClass(tpClass).newInstance();
}catch(Exceptione){
initException=newSchedulerException("ThreadPoolclass'"
+tpClass+"'couldnotbeinstantiated.",e);
throwinitException;
}
tProps=cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX,true);
try{
setBeanProps(tp,tProps);
}catch(Exceptione){
initException=newSchedulerException("ThreadPoolclass'"
+tpClass+"'propscouldnotbeconfigured.",e);
throwinitException;
}jobStore的配置:

StringjsClass=cfg.getStringProperty(PROP_JOB_STORE_CLASS,
RAMJobStore.class.getName());

if(jsClass==null){
initException=newSchedulerException(
"JobStoreclassnotspecified.");
throwinitException;
}

try{
js=(JobStore)loadHelper.loadClass(jsClass).newInstance();
}catch(Exceptione){
initException=newSchedulerException("JobStoreclass'"+jsClass
+"'couldnotbeinstantiated.",e);
throwinitException;
}
//设置JobStore的scheduleName和scheduleId两个值,就是简单的反射设置而已
SchedulerDetailsSetter.setDetails(js,schedName,schedInstId);

tProps=cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX,true,newString[]{PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
try{
setBeanProps(js,tProps);
}catch(Exceptione){
initException=newSchedulerException("JobStoreclass'"+jsClass
+"'propscouldnotbeconfigured.",e);
throwinitException;
}
//如果是JobStoreSupport的话,那么就需要设置锁控制器,用于集群的调度同步
if(jsinstanceofJobStoreSupport){
//Installcustomlockhandler(Semaphore)
StringlockHandlerClass=cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);
if(lockHandlerClass!=null){
try{
SemaphorelockHandler=(Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();

tProps=cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX,true);

//Ifthislockhandlerrequiresthetableprefix,addittoitsproperties.
if(lockHandlerinstanceofTablePrefixAware){
tProps.setProperty(
PROP_TABLE_PREFIX,((JobStoreSupport)js).getTablePrefix());
tProps.setProperty(
PROP_SCHED_NAME,schedName);
}

try{
setBeanProps(lockHandler,tProps);
}catch(Exceptione){
initException=newSchedulerException("JobStoreLockHandlerclass'"+lockHandlerClass
+"'propscouldnotbeconfigured.",e);
throwinitException;
}

((JobStoreSupport)js).setLockHandler(lockHandler);
getLog().info("Usingcustomdataaccesslocking(synchronization):"+lockHandlerClass);
}catch(Exceptione){
initException=newSchedulerException("JobStoreLockHandlerclass'"+lockHandlerClass
+"'couldnotbeinstantiated.",e);
throwinitException;
}
}
}获取datasource,可能多个:



String[]dsNames=cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);//获取多个dataSource,所以需要JTX分布式事务的支持,因为有多数据源的job存在的
for(inti=0;i<dsNames.length;i++){
PropertiesParserpp=newPropertiesParser(cfg.getPropertyGroup(
PROP_DATASOURCE_PREFIX+"."+dsNames[i],true));

//先检测connectionProvider这种配置方式,本质上所有的datasource都会被封装为某种Provider,Provider是quartz对datasource的一中封装而已
StringcpClass=pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS,null);

//customconnectionProvider...
if(cpClass!=null){
ConnectionProvidercp=null;
try{
cp=(ConnectionProvider)loadHelper.loadClass(cpClass).newInstance();
}catch(Exceptione){
initException=newSchedulerException("ConnectionProviderclass'"+cpClass
+"'couldnotbeinstantiated.",e);
throwinitException;
}

try{//移除Provider这个配置后,把其他配置关联到provider
//removetheclassname,soitisn'tattemptedtobeset
pp.getUnderlyingProperties().remove(
PROP_CONNECTION_PROVIDER_CLASS);

setBeanProps(cp,pp.getUnderlyingProperties());
cp.initialize();
}catch(Exceptione){
initException=newSchedulerException("ConnectionProviderclass'"+cpClass
+"'propscouldnotbeconfigured.",e);
throwinitException;
}

//把所有的Datasource加到manager
dbMgr=DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i],cp);
}else{//查找Jndi这种配置
StringdsJndi=pp.getStringProperty(PROP_DATASOURCE_JNDI_URL,null);

if(dsJndi!=null){
booleandsAlwaysLookup=pp.getBooleanProperty(
PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
StringdsJndiInitial=pp.getStringProperty(
PROP_DATASOURCE_JNDI_INITIAL);
StringdsJndiProvider=pp.getStringProperty(
PROP_DATASOURCE_JNDI_PROVDER);
StringdsJndiPrincipal=pp.getStringProperty(
PROP_DATASOURCE_JNDI_PRINCIPAL);
StringdsJndiCredentials=pp.getStringProperty(
PROP_DATASOURCE_JNDI_CREDENTIALS);
Propertiesprops=null;
if(null!=dsJndiInitial||null!=dsJndiProvider
||null!=dsJndiPrincipal||null!=dsJndiCredentials){
props=newProperties();
if(dsJndiInitial!=null){
props.put(PROP_DATASOURCE_JNDI_INITIAL,
dsJndiInitial);
}
if(dsJndiProvider!=null){
props.put(PROP_DATASOURCE_JNDI_PROVDER,
dsJndiProvider);
}
if(dsJndiPrincipal!=null){
props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
dsJndiPrincipal);
}
if(dsJndiCredentials!=null){
props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
dsJndiCredentials);
}
}
JNDIConnectionProvidercp=newJNDIConnectionProvider(dsJndi,
props,dsAlwaysLookup);
dbMgr=DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i],cp);
}else{//检查localdriver这种配置
StringdsDriver=pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
StringdsURL=pp.getStringProperty(PoolingConnectionProvider.DB_URL);

if(dsDriver==null){
initException=newSchedulerException(
"DrivernotspecifiedforDataSource:"
+dsNames[i]);
throwinitException;
}
if(dsURL==null){
initException=newSchedulerException(
"DBURLnotspecifiedforDataSource:"
+dsNames[i]);
throwinitException;
}
try{
PoolingConnectionProvidercp=newPoolingConnectionProvider(pp.getUnderlyingProperties());
dbMgr=DBConnectionManager.getInstance();
dbMgr.addConnectionProvider(dsNames[i],cp);
}catch(SQLExceptionsqle){
initException=newSchedulerException(
"CouldnotinitializeDataSource:"+dsNames[i],
sqle);
throwinitException;
}
}

}

}


接下来还有SchedulerPlugins、JobListeners、TriggerListeners、ThreadExecutor等的配置。基本都大同小异,毕竟都是配置文件解析而已。

其中ThreadExecutor是执行池,主要是执行schedule和refire任务。

然后初始化JobRunShellFactory,这个是一个封装执行Job的factory,主要是为了一些listener的执行和异常处理:

JobRunShellFactoryjrsf=null;//Createcorrectrun-shellfactory...

if(userTXLocation!=null){
UserTransactionHelper.setUserTxLocation(userTXLocation);
}
//更具JTA的配置情况初始化特定的RunShellFactory
if(wrapJobInTx){
jrsf=newJTAJobRunShellFactory();
}else{
jrsf=newJTAAnnotationAwareJobRunShellFactory();
}

对于dbJobStore的话是有一些处理的:

if(jsinstanceofJobStoreSupport){
JobStoreSupportjjs=(JobStoreSupport)js;
jjs.setDbRetryInterval(dbFailureRetry);
if(threadsInheritInitalizersClassLoader)
jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);

jjs.setThreadExecutor(threadExecutor);//所以ThreadExecutor也用来处理数据库的链接的(比如说定时重试)
}


初始化以上对象和资源后,quartz会把他保存到QuartzSchedulerResources里面,所以你会看到代码的很多地方都有用到这个对象:

QuartzSchedulerResourcesrsrcs=newQuartzSchedulerResources();
rsrcs.setName(schedName);
rsrcs.setThreadName(threadName);
rsrcs.setInstanceId(schedInstId);
rsrcs.setJobRunShellFactory(jrsf);
rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
rsrcs.setRunUpdateCheck(!skipUpdateCheck);
rsrcs.setBatchTimeWindow(batchTimeWindow);
rsrcs.setMaxBatchSize(maxBatchSize);
rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
rsrcs.setJMXExport(jmxExport);
rsrcs.setJMXObjectName(jmxObjectName);Sheduler的初始化和设置在下面:

qs=newQuartzScheduler(rsrcs,idleWaitTime,dbFailureRetry);//实例化Scheduler
qsInited=true;

//CreateSchedulerref...
Schedulerscheduler=instantiate(rsrcs,qs);//封装QuartzScheduler为StdQuartzScheduler,其实<spanstyle="font-family:Arial,Helvetica,sans-serif;">StdQuartzScheduler就是一个装饰模式而已,QuartzSch//eduler是没有实现Sheduler接口的</span>


//setjobfactoryifspecified
if(jobFactory!=null){
qs.setJobFactory(jobFactory);
}

//InitializepluginsnowthatwehaveaSchedulerinstance.
for(inti=0;i<plugins.length;i++){
plugins[i].initialize(pluginNames[i],scheduler,loadHelper);
}

//addlisteners
for(inti=0;i<jobListeners.length;i++){
qs.getListenerManager().addJobListener(jobListeners[i],EverythingMatcher.allJobs());
}
for(inti=0;i<triggerListeners.length;i++){
qs.getListenerManager().addTriggerListener(triggerListeners[i],EverythingMatcher.allTriggers());
}

//setschedulercontextdata...
for(Objectkey:schedCtxtProps.keySet()){
Stringval=schedCtxtProps.getProperty((String)key);
scheduler.getContext().put((String)key,val);
}

//fireupjobstore,andrunshellfactory
//启动job
js.setInstanceId(schedInstId);
js.setInstanceName(schedName);
js.setThreadPoolSize(tp.getPoolSize());
js.initialize(loadHelper,qs.getSchedulerSignaler());//是一个回掉JobScheduler的接口<spanstyle="font-family:Arial,Helvetica,sans-serif;">SchedulerSignaler</span>

//启动jobRunShellFactory,<spanstyle="font-family:Arial,Helvetica,sans-serif;">initialize方法就是标准的初始化方法,是quartz的代码习惯</span>

jrsf.initialize(scheduler);
//启动sheduler
qs.initialize();

getLog().info(
"Quartzscheduler'"+scheduler.getSchedulerName()
+"'initializedfrom"+propSrc);

getLog().info("Quartzschedulerversion:"+qs.getVersion());
//添加资源引用防止被垃圾回收
//preventstherepositoryfrombeinggarbagecollected
qs.addNoGCObject(schedRep);
//preventsthedbmanagerfrombeinggarbagecollected
if(dbMgr!=null){
qs.addNoGCObject(dbMgr);
}

总结下来,其实挺简单的:读取配置文件,安装前缀的风格初始化各种类和属性,然后包装到QuartzSchedulerResources给Sheduler用,然后初始化:

jobStore、ThreadPool、JobRunShellFactory、ThreadExecutor、jobFactory、开启各种mananger、初始化Scheduler并为其添加pluger和listener,
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐