您的位置:首页 > 编程语言 > Java开发

个人学习笔记----基于Spring4.3.1+mybatis+postgresql+maven搭建的个人用调度平台(三)

2016-11-29 09:49 591 查看

九、调度功能

逻辑如下:任务间存在着依赖关系,故每次只取出可执行的任务,当任务已取出后,数据库中状态为等待,当任务完成,状态为已完成,依赖的任务就可以执行,便取出执行。
取出的逻辑由sql实现,只需要调用sql就可以获取能执行的任务。

9.1 任务调度

采用接口回调的方式实现动态加载任务类,使得添加新类型任务时,减少代码修改量,以求通用,类路径保存在数据库的classname中,输入参数在para,以json字符串格式
回调的部分在前面scheduleservice中实现,估计后期会将其提炼出来,但现在就这样吧
接口:
package jobs;

import org.json.JSONObject;

/**
* Created by lu on 2016/11/20.
*/
public interface JobInterface {

public int work(JSONObject param);

}

9.2 并行控制

采用FJP的方式进行并行工作,连接池数量在常量类中设置
连接池采用单例

Schedule_Thread
package service;

import jobs.Job;
import schedule.JobThread;
import schedule.ScheduleTask;
import utils.Constants;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

//监控任务线程,主要用于启动调度模块,为调度模块主入口
public class Schedule_Thread extends Thread {
//单例线程
private static Schedule_Thread thread=new Schedule_Thread();

public static Schedule_Thread getInstance(){
return thread;
}

//启动线程池并开始执行任务
@Override
public void run() {
//新建线程池
ForkJoinPool pool = new ForkJoinPool(Constants.THREADCOUNT);

try {
//循环检查执行任务列表,有可以执行的就进行调度,没有进行休眠
while(true) {
if(!ScheduleService.getInstance().isEmpty()) {
//此处是实现调度算法的
Job runJob = null;
while (!ScheduleService.getInstance().isEmpty()) {
runJob = ScheduleTask.getjob();
if(runJob ==null){
continue;
}
JobThread jobThread = new JobThread(runJob);
pool.execute(jobThread);
}
}else{
sleep(Constants.SLEEPTIME);
}
}
}
catch(Exception e){
e.printStackTrace();
}
}

//测试用任务类
private class MonitorTask extends RecursiveAction {

@Override
protected void compute() {
System.out.println("test");
}
}

}


任务类:
package schedule;

import dao.ScheduleDao;
import jobs.Job;
import jobs.JobInterface;
import model.Task_List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import utils.Constants;
import utils.LogUtil;

import java.util.Date;
import java.util.concurrent.RecursiveAction;

public class JobThread extends RecursiveAction {

private Job job;

private static Logger logger = LogManager.getLogger(JobThread.class.getName());

private ScheduleDao scheduleDao;

public JobThread (Job job){
this.job=job;
WebApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
String scheduleDaoBean=(context.getBeanNamesForType(ScheduleDao.class))[0];
this.scheduleDao= (ScheduleDao) context.getBean(scheduleDaoBean);
}

@Override
protected void compute() {
try{
//开始时间
Date begin=new Date();
//执行任务
Class jobclass=Class.forName(job.getJobClass());
JobInterface work=(JobInterface)jobclass.newInstance();
//获取结果
int result=work.work(job.getParam());
//结束时间
Date end=new Date();
//更新任务状态
int st=4;
if(result== Constants.SUCCESS){
st=3;
}
job.setTask_st(st);
//更新数据库任务列表状态
Task_List task_list=new Task_List(job.getTask_id(),job.getTask_st());
scheduleDao.updateTaskListByTask_List(task_list);
LogUtil.SuccessLogAdd(logger,
Constants.LOG_INFO,
"JobThread task_id "+job.getTask_id(),"执行",true);
} catch (Exception e) {
e.printStackTrace();
LogUtil.ErrorLogAdd(logger,
Constants.LOG_ERROR,
"JobThread task_id "+job.getTask_id(),"执行",e.getClass().getName(),true);
}

}
}


轮询可执行任务,以便取出执行

package service;

import dao.ScheduleDao;
import jobs.Job;
import model.RunnableList;
import model.Task_List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import utils.Constants;
import utils.LogUtil;

import java.util.List;

public class Monitor_Thread extends Thread {
//单例线程
private static Monitor_Thread thread=new Monitor_Thread();

public static Monitor_Thread getInstance(){
return thread;
}

private static Logger logger = LogManager.getLogger(Monitor_Thread.class.getName());

private ScheduleDao scheduleDao;

private Monitor_Thread(){
WebApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
String scheduleDaoBean=(context.getBeanNamesForType(ScheduleDao.class))[0];
this.scheduleDao= (ScheduleDao) context.getBean(scheduleDaoBean);
}

@Override
public void run() {
//测试输出
System.out.println("Monitor_Thread begin");
try {
while (true) {

//获取可跑任务
List<RunnableList> runnableLists = scheduleDao.queryRunnableList();
//存在可跑任务
if (!runnableLists.isEmpty()) {
//测试输出
System.out.println("Monitor_Thread add");
for (RunnableList runnableList : runnableLists) {
int result = ScheduleService.checkJob(runnableList.getTask_id()
, runnableList.getPara()
, runnableList.getTaskclassname());
if (result == Constants.FAIL) {
continue;
}
Job job = new Job(runnableList.getTask_id()
, new JSONObject(runnableList.getPara())
, runnableList.getTaskclassname(), Constants.TASK_READY);
ScheduleService.getInstance().add(job);
Task_List task_list = new Task_List(runnableList.getTask_id(), Constants.TASK_WAIT);
scheduleDao.updateTaskListByTask_List(task_list);
LogUtil.SuccessLogAdd(logger
, Constants.LOG_INFO
, "Monitor_Thread task_id " + runnableList.getTask_id()
, Monitor_Thread.class.getName()
, true);
}
} else {
//测试输出
System.out.println("Monitor_Thread skip");
sleep(Constants.MONITORSLEEPTIME);
}

}
}catch (Exception e){
e.printStackTrace();
LogUtil.ErrorLogAdd(logger
, Constants.LOG_INFO
, "Monitor_Thread "
, Monitor_Thread.class.getName()
,e.getClass().getName()
, true);
}
}

}


sql:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!-- 定义命名空间 -->
<mapper namespace="mapperNS.Schedule">

<!-- 根据依赖查询目前可执行任务信息 -->
<select id="queryRunnableList" resultType="RunnableList">
select task_list.task_id,task_list.st,task.para,task.taskclassname
from t_task_list task_list
left join t_dep dep on (task_list.task_id=dep.task_id)
inner join t_task_list parent_st on (dep.parent_id=parent_st.task_id)
left join t_task task on (task.task_id=task_list.task_id)
where (
parent_st.st=3
or parent_st.st=5
)
and parent_st.t_date=task_list.t_date
and task_list.t_date=to_char(now(),'yyyymmdd')
and task_list.st=0
</select>

<!-- 初始化任务列表 -->
<insert id="initTask_List">
insert into t_task_list
select task_id,(case when task_id=0 then 3 else 0 end) as st,to_char(now(),'yyyymmdd') as t_date
from t_task
ON CONFLICT(task_id,t_date) DO NOTHING
</insert>

<!-- 获取之前是否存在错误或者未完成任务-->
<select id="queryBeforeErrorCount" resultType="int">
select count(1) from t_task_list
where st in (0,1,2,4) and t_date < to_char(now(),'yyyymmdd')
</select>

<!-- 获取当前任务是否需要初始化 -->
<select id="queryIsInit" resultType="int">
select count(1) from t_task_list
where t_date = to_char(now(),'yyyymmdd')
</select>

<!-- 更新任务列表任务状态(全表变更) -->
<update id="updateAllTaskList">
UPDATE t_task_list
set st=0
where t_date=to_char(now(),'yyyymmdd')
and st in (1,2,4)
</update>

<!-- 更新任务状态,根据Task_List -->
<update id="updateTaskListByTask_List" parameterType="Task_List">
UPDATE t_task_list
set st = #{st}
where t_date = #{t_date} and task_id = #{task_id}
</update>

<!-- 插入数据
<insert id="" parameterType="">
</insert>-->

<!-- 删除数据
<delete id="" parameterType="">

</delete>-->
</mapper>


调度功能在scheduleservice中初始化函数中初始化并运行

项目至此,差不多都说完了,剩下一些个人收集的工具类
项目代码在https://github.com/269219362ljf/basis_schedule_sys 有需要的请到这里下载源代码,里面有详细的食用说明
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息