个人学习笔记----基于Spring4.3.1+mybatis+postgresql+maven搭建的个人用调度平台(三)
2016-11-29 09:49
591 查看
九、调度功能
逻辑如下:任务间存在着依赖关系,故每次只取出可执行的任务,当任务已取出后,数据库中状态为等待,当任务完成,状态为已完成,依赖的任务就可以执行,便取出执行。取出的逻辑由sql实现,只需要调用sql就可以获取能执行的任务。
9.1 任务调度
采用接口回调的方式实现动态加载任务类,使得添加新类型任务时,减少代码修改量,以求通用,类路径保存在数据库的classname中,输入参数在para,以json字符串格式回调的部分在前面scheduleservice中实现,估计后期会将其提炼出来,但现在就这样吧
接口:
package jobs; import org.json.JSONObject; /** * Created by lu on 2016/11/20. */ public interface JobInterface { public int work(JSONObject param); }
9.2 并行控制
采用FJP的方式进行并行工作,连接池数量在常量类中设置连接池采用单例
Schedule_Thread
package service; import jobs.Job; import schedule.JobThread; import schedule.ScheduleTask; import utils.Constants; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; //监控任务线程,主要用于启动调度模块,为调度模块主入口 public class Schedule_Thread extends Thread { //单例线程 private static Schedule_Thread thread=new Schedule_Thread(); public static Schedule_Thread getInstance(){ return thread; } //启动线程池并开始执行任务 @Override public void run() { //新建线程池 ForkJoinPool pool = new ForkJoinPool(Constants.THREADCOUNT); try { //循环检查执行任务列表,有可以执行的就进行调度,没有进行休眠 while(true) { if(!ScheduleService.getInstance().isEmpty()) { //此处是实现调度算法的 Job runJob = null; while (!ScheduleService.getInstance().isEmpty()) { runJob = ScheduleTask.getjob(); if(runJob ==null){ continue; } JobThread jobThread = new JobThread(runJob); pool.execute(jobThread); } }else{ sleep(Constants.SLEEPTIME); } } } catch(Exception e){ e.printStackTrace(); } } //测试用任务类 private class MonitorTask extends RecursiveAction { @Override protected void compute() { System.out.println("test"); } } }
任务类:
package schedule; import dao.ScheduleDao; import jobs.Job; import jobs.JobInterface; import model.Task_List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; import utils.Constants; import utils.LogUtil; import java.util.Date; import java.util.concurrent.RecursiveAction; public class JobThread extends RecursiveAction { private Job job; private static Logger logger = LogManager.getLogger(JobThread.class.getName()); private ScheduleDao scheduleDao; public JobThread (Job job){ this.job=job; WebApplicationContext context = ContextLoader.getCurrentWebApplicationContext(); String scheduleDaoBean=(context.getBeanNamesForType(ScheduleDao.class))[0]; this.scheduleDao= (ScheduleDao) context.getBean(scheduleDaoBean); } @Override protected void compute() { try{ //开始时间 Date begin=new Date(); //执行任务 Class jobclass=Class.forName(job.getJobClass()); JobInterface work=(JobInterface)jobclass.newInstance(); //获取结果 int result=work.work(job.getParam()); //结束时间 Date end=new Date(); //更新任务状态 int st=4; if(result== Constants.SUCCESS){ st=3; } job.setTask_st(st); //更新数据库任务列表状态 Task_List task_list=new Task_List(job.getTask_id(),job.getTask_st()); scheduleDao.updateTaskListByTask_List(task_list); LogUtil.SuccessLogAdd(logger, Constants.LOG_INFO, "JobThread task_id "+job.getTask_id(),"执行",true); } catch (Exception e) { e.printStackTrace(); LogUtil.ErrorLogAdd(logger, Constants.LOG_ERROR, "JobThread task_id "+job.getTask_id(),"执行",e.getClass().getName(),true); } } }
轮询可执行任务,以便取出执行
package service; import dao.ScheduleDao; import jobs.Job; import model.RunnableList; import model.Task_List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; import org.springframework.web.context.ContextLoader; import org.springframework.web.context.WebApplicationContext; import utils.Constants; import utils.LogUtil; import java.util.List; public class Monitor_Thread extends Thread { //单例线程 private static Monitor_Thread thread=new Monitor_Thread(); public static Monitor_Thread getInstance(){ return thread; } private static Logger logger = LogManager.getLogger(Monitor_Thread.class.getName()); private ScheduleDao scheduleDao; private Monitor_Thread(){ WebApplicationContext context = ContextLoader.getCurrentWebApplicationContext(); String scheduleDaoBean=(context.getBeanNamesForType(ScheduleDao.class))[0]; this.scheduleDao= (ScheduleDao) context.getBean(scheduleDaoBean); } @Override public void run() { //测试输出 System.out.println("Monitor_Thread begin"); try { while (true) { //获取可跑任务 List<RunnableList> runnableLists = scheduleDao.queryRunnableList(); //存在可跑任务 if (!runnableLists.isEmpty()) { //测试输出 System.out.println("Monitor_Thread add"); for (RunnableList runnableList : runnableLists) { int result = ScheduleService.checkJob(runnableList.getTask_id() , runnableList.getPara() , runnableList.getTaskclassname()); if (result == Constants.FAIL) { continue; } Job job = new Job(runnableList.getTask_id() , new JSONObject(runnableList.getPara()) , runnableList.getTaskclassname(), Constants.TASK_READY); ScheduleService.getInstance().add(job); Task_List task_list = new Task_List(runnableList.getTask_id(), Constants.TASK_WAIT); scheduleDao.updateTaskListByTask_List(task_list); LogUtil.SuccessLogAdd(logger , Constants.LOG_INFO , "Monitor_Thread task_id " + runnableList.getTask_id() , Monitor_Thread.class.getName() , true); } } else { //测试输出 System.out.println("Monitor_Thread skip"); sleep(Constants.MONITORSLEEPTIME); } } }catch (Exception e){ e.printStackTrace(); LogUtil.ErrorLogAdd(logger , Constants.LOG_INFO , "Monitor_Thread " , Monitor_Thread.class.getName() ,e.getClass().getName() , true); } } }
sql:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!-- 定义命名空间 --> <mapper namespace="mapperNS.Schedule"> <!-- 根据依赖查询目前可执行任务信息 --> <select id="queryRunnableList" resultType="RunnableList"> select task_list.task_id,task_list.st,task.para,task.taskclassname from t_task_list task_list left join t_dep dep on (task_list.task_id=dep.task_id) inner join t_task_list parent_st on (dep.parent_id=parent_st.task_id) left join t_task task on (task.task_id=task_list.task_id) where ( parent_st.st=3 or parent_st.st=5 ) and parent_st.t_date=task_list.t_date and task_list.t_date=to_char(now(),'yyyymmdd') and task_list.st=0 </select> <!-- 初始化任务列表 --> <insert id="initTask_List"> insert into t_task_list select task_id,(case when task_id=0 then 3 else 0 end) as st,to_char(now(),'yyyymmdd') as t_date from t_task ON CONFLICT(task_id,t_date) DO NOTHING </insert> <!-- 获取之前是否存在错误或者未完成任务--> <select id="queryBeforeErrorCount" resultType="int"> select count(1) from t_task_list where st in (0,1,2,4) and t_date < to_char(now(),'yyyymmdd') </select> <!-- 获取当前任务是否需要初始化 --> <select id="queryIsInit" resultType="int"> select count(1) from t_task_list where t_date = to_char(now(),'yyyymmdd') </select> <!-- 更新任务列表任务状态(全表变更) --> <update id="updateAllTaskList"> UPDATE t_task_list set st=0 where t_date=to_char(now(),'yyyymmdd') and st in (1,2,4) </update> <!-- 更新任务状态,根据Task_List --> <update id="updateTaskListByTask_List" parameterType="Task_List"> UPDATE t_task_list set st = #{st} where t_date = #{t_date} and task_id = #{task_id} </update> <!-- 插入数据 <insert id="" parameterType=""> </insert>--> <!-- 删除数据 <delete id="" parameterType=""> </delete>--> </mapper>
调度功能在scheduleservice中初始化函数中初始化并运行
项目至此,差不多都说完了,剩下一些个人收集的工具类
项目代码在https://github.com/269219362ljf/basis_schedule_sys 有需要的请到这里下载源代码,里面有详细的食用说明
相关文章推荐
- 个人学习笔记----基于Spring4.3.1+mybatis+postgresql+maven搭建的个人用调度平台(二)
- 个人学习笔记----基于Spring4.3.1+mybatis+postgresql+maven搭建的个人用调度平台(一)
- 个人学习笔记----基于Spring4.3.1+mybatis+postgresql+maven搭建的个人用调度平台(四)
- Spring 、SpringMVC、Mybatis、MySQL、Maven、Tomcat搭建JavaWeb项目流程---学习笔记(一)工具准备及环境配置
- 基于Maven的Springboot项目搭建学习笔记
- mybatis学习笔记(六) -- maven+spring+mybatis从零开始搭建整合详细过程(下)
- IntelliJ IDEA 搭建基于Maven 的SSM(一)(spring,springMvc,Mybatis)框架整合
- java 搭建基于springboot的ssm(spring + springmvc + mybatis)的maven项目
- 基于maven的mybatis+springmvc环境搭建以及集成bootstraps
- 个人学习笔记--MyBatis-的搭建及第一个程序
- 基于maven的spring+mybatis+springMVC框架搭建
- SpringMVC学习笔记(二)使用IntelliJ IDEA开发Spring MVC HelloWorld(基于Maven)
- 基于Maven的Spring + Spring MVC + Mybatis的环境搭建
- 基于vagrant 搭建Ruby开发平台(Ruby及Rails学习笔记第一篇)
- spring io 平台 模块个人学习笔记
- 快速搭建基于Maven的通用Web项目(Spring-Mybatis-JSF-Jersey-Boostrap)
- 基于Maven的Spring + Spring MVC + Mybatis的环境搭建
- springMVC学习笔记---day02 springMVC+spring+mybatis整合开发框架搭建
- 基于maven搭建ssm开发框架(1) mybatis和spring的整合