您的位置:首页 > Web前端 > BootStrap

Quartzs的job,trriger监听器源码分析

2016-09-08 10:21 274 查看
在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。

Java代码  

public class QuartzScheduler  

    implements RemotableQuartzScheduler  

{  

 public Date scheduleJob(JobDetail jobDetail, Trigger trigger)  

        throws SchedulerException  

    {  

        validateState();  

        if(jobDetail == null)  

            throw new SchedulerException("JobDetail cannot be null");  

        if(trigger == null)  

            throw new SchedulerException("Trigger cannot be null");  

        if(jobDetail.getKey() == null)  

            throw new SchedulerException("Job's key cannot be null");  

        if(jobDetail.getJobClass() == null)  

            throw new SchedulerException("Job's class cannot be null");  

    //包装触发器  

        OperableTrigger trig = (OperableTrigger)trigger;  

        if(trigger.getJobKey() == null)  

        //设置触发器jobKey  

            trig.setJobKey(jobDetail.getKey());  

        else  

        if(!trigger.getJobKey().equals(jobDetail.getKey()))  

            throw new SchedulerException("Trigger does not reference given job!");  

        trig.validate();  

        Calendar cal = null;  

        if(trigger.getCalendarName() != null)  

            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());  

        Date ft = trig.computeFirstFireTime(cal);  

        if(ft == null)  

        {  

            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());  

        } else  

        {  

        //存储job and trriger到jobStrore  

            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);  

        //添加jobDetail到调度监听器  

            notifySchedulerListenersJobAdded(jobDetail);  

            //通知调度器下一刻调度时间  

            notifySchedulerThread(trigger.getNextFireTime().getTime());  

             //添加trigger到调度监听器  

            notifySchedulerListenersSchduled(trigger);  

            return ft;  

        }  

     //添加jobDetail  

     public void notifySchedulerListenersJobAdded(JobDetail jobDetail)  

    {  

        List schedListeners = buildSchedulerListenerList();  

        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)  

        {  

            SchedulerListener sl = (SchedulerListener)i$.next();  

            try  

            {  

                sl.jobAdded(jobDetail);  

            }  

            catch(Exception e)  

            {  

                getLog().error("Error while notifying SchedulerListener of JobAdded.", e);  

            }  

        }  

  

    }  

    //获取调度监听器  

     private List buildSchedulerListenerList()  

    {  

        List allListeners = new LinkedList();  

        allListeners.addAll(getListenerManager().getSchedulerListeners());  

        allListeners.addAll(getInternalSchedulerListeners());  

        return allListeners;  

    }  

    //获取内部调度器  

      public List getInternalSchedulerListeners()  

    {  

        ArrayList arraylist = internalSchedulerListeners;  

        JVM INSTR monitorenter ;  

        return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners));  

        Exception exception;  

        exception;  

        throw exception;  

    }  

     //通知调度器下一刻调度时间  

    protected void notifySchedulerThread(long candidateNewNextFireTime)  

    {  

        if(isSignalOnSchedulingChange())  

            signaler.signalSchedulingChange(candidateNewNextFireTime);  

    }  

    //添加trigger到调度监听器  

    public void notifySchedulerListenersSchduled(Trigger trigger)  

    {  

        List schedListeners = buildSchedulerListenerList();  

        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)  

        {  

            SchedulerListener sl = (SchedulerListener)i$.next();  

            try  

            {  

                sl.jobScheduled(trigger);  

            }  

            catch(Exception e)  

            {  

                getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job.  Triger=").append(trigger.getKey()).toString(), e);  

            }  

        }  

  

    }  

    private static String VERSION_MAJOR;  

    private static String VERSION_MINOR;  

    private static String VERSION_ITERATION;  

    private QuartzSchedulerResources resources;  

    private QuartzSchedulerThread schedThread;  

    private ThreadGroup threadGroup;  

    private SchedulerContext context;  

    private ListenerManager listenerManager;//监听管理器  

    private HashMap internalJobListeners;  

    private HashMap internalTriggerListeners;  

    private ArrayList internalSchedulerListeners;  

    private JobFactory jobFactory;  

    ExecutingJobsManager jobMgr;  

    ErrorLogger errLogger;  

    private SchedulerSignaler signaler;  

    private Random random;  

    private ArrayList holdToPreventGC;  

    private boolean signalOnSchedulingChange;  

    private volatile boolean closed;  

    private volatile boolean shuttingDown;  

    private boolean boundRemotely;  

    private QuartzSchedulerMBean jmxBean;  

    private Date initialStart;  

    private final Timer updateTimer;  

}  

//调度监听器
下载

Java代码  

 public class ListenerManagerImpl  

    implements ListenerManager  

{  

public ListenerManagerImpl()  

    {  

        globalJobListeners = new LinkedHashMap(10);  

        globalTriggerListeners = new LinkedHashMap(10);  

        globalJobListenersMatchers = new LinkedHashMap(10);  

        globalTriggerListenersMatchers = new LinkedHashMap(10);  

        schedulerListeners = new ArrayList(10);  

    }  

    //添加调度监听器  

     public void addSchedulerListener(SchedulerListener schedulerListener)  

    {  

        synchronized(schedulerListeners)  

        {  

            schedulerListeners.add(schedulerListener);  

        }  

    }  

    //添加job监听器  

    public void addJobListener(JobListener jobListener, List matchers)  

    {  

        if(jobListener.getName() == null || jobListener.getName().length() == 0)  

            throw new IllegalArgumentException("JobListener name cannot be empty.");  

        synchronized(globalJobListeners)  

        {  

            globalJobListeners.put(jobListener.getName(), jobListener);  

            LinkedList matchersL = new LinkedList();  

            if(matchers != null && matchers.size() > 0)  

                matchersL.addAll(matchers);  

            else  

                matchersL.add(EverythingMatcher.allJobs());  

            globalJobListenersMatchers.put(jobListener.getName(), matchersL);  

        }  

    }  

     //添加Trigger监听器  

      public void addTriggerListener(TriggerListener triggerListener, List matchers)  

    {  

        if(triggerListener.getName() == null || triggerListener.getName().length() == 0)  

            throw new IllegalArgumentException("TriggerListener name cannot be empty.");  

        synchronized(globalTriggerListeners)  

        {  

            globalTriggerListeners.put(triggerListener.getName(), triggerListener);  

            LinkedList matchersL = new LinkedList();  

            if(matchers != null && matchers.size() > 0)  

                matchersL.addAll(matchers);  

            else  

                matchersL.add(EverythingMatcher.allTriggers());  

            globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL);  

        }  

    }  

     public List getSchedulerListeners()  

    {  

        ArrayList arraylist = schedulerListeners;  

        JVM INSTR monitorenter ;  

        return Collections.unmodifiableList(new ArrayList(schedulerListeners));  

        Exception exception;  

        exception;  

        throw exception;  

    }  

  

    private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器  

    private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器  

    private Map globalJobListenersMatchers;  

    private Map globalTriggerListenersMatchers;  

    private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器  

}  

//QuartzSchedulerMBean 下载

Java代码  

public class QuartzSchedulerMBeanImpl extends StandardMBean  

    implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener  

{  

  //添加jobDetail通知到JMX  

  public void jobAdded(JobDetail jobDetail)  

    {  

        sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail));  

    }  

 //添加trigger通知到JMX  

  public void jobScheduled(Trigger trigger)  

    {  

        sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger));  

    }  

  public void sendNotification(String eventType, Object data)  

    {  

        sendNotification(eventType, data, null);  

    }  

  

 public void sendNotification(String eventType, Object data, String msg)  

    {  

        Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg);  

        if(data != null)  

            notif.setUserData(data);  

        emitter.sendNotification(notif);  

    }  

}  

//调度通知器 下载

Java代码  

public class SchedulerSignalerImpl  

    implements SchedulerSignaler  

{  

  

    public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread)  

    {  

        log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl);  

        this.sched = sched;  

        this.schedThread = schedThread;  

        log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString());  

    }  

  

    public void signalSchedulingChange(long candidateNewNextFireTime)  

    {  

        schedThread.signalSchedulingChange(candidateNewNextFireTime);  

    }  

  

    Logger log;  

    protected QuartzScheduler sched;  

    protected QuartzSchedulerThread schedThread;  

}  

//调度器线程 下载

Java代码  

public class QuartzSchedulerThread extends Thread  

{  

QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)  

    {  

        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);  

    }  

    //唤醒所有等待sigLock的调度器  

     public void signalSchedulingChange(long candidateNewNextFireTime)  

    {  

        synchronized(sigLock)  

        {  

            signaled = true;  

            signaledNextFireTime = candidateNewNextFireTime;  

            sigLock.notifyAll();  

        }  

    }  

 private QuartzScheduler qs;  

    private QuartzSchedulerResources qsRsrcs;  

    private final Object sigLock;  

    private boolean signaled;  

    private long signaledNextFireTime;  

    private boolean paused;  

    private AtomicBoolean halted;  

    private Random random;  

    private static long DEFAULT_IDLE_WAIT_TIME = 30000L;  

    private long idleWaitTime;  

    private int idleWaitVariablness;  

    private final Logger log;  

  

}  

//SchedulerListener

下载

Java代码  

public interface SchedulerListener  

{  

    public abstract void jobScheduled(Trigger trigger);  

    public abstract void jobUnscheduled(TriggerKey triggerkey);  

    public abstract void triggerFinalized(Trigger trigger);  

    public abstract void triggerPaused(TriggerKey triggerkey);  

    public abstract void triggersPaused(String s);  

    public abstract void triggerResumed(TriggerKey triggerkey);  

    public abstract void triggersResumed(String s);  

    public abstract void jobAdded(JobDetail jobdetail);  

    public abstract void jobDeleted(JobKey jobkey);  

    public abstract void jobPaused(JobKey jobkey);  

    public abstract void jobsPaused(String s);  

    public abstract void jobResumed(JobKey jobkey);  

    public abstract void jobsResumed(String s);  

    public abstract void schedulerError(String s, SchedulerException schedulerexception);  

    public abstract void schedulerInStandbyMode();  

    public abstract void schedulerStarted();  

    public abstract void schedulerStarting();  

    public abstract void schedulerShutdown();  

    public abstract void schedulerShuttingdown();  

    public abstract void schedulingDataCleared();  

}  

//job监听器 下载

Java代码  

public interface JobListener  

{  

    public abstract String getName();  

    public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext);  

    public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext);  

    public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception);  

}  

//Trigger监听器 

Java代码  

public interface TriggerListener  

{  

    public abstract String getName();  

    public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext);  

    public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext);  

    public abstract void triggerMisfired(Trigger trigger);  

    public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction);  

}  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息