Spring boot集成Quartz与RabbitMQ
2018-02-14 13:47
603 查看
完成一个本地demo的小项目。
在Spring boot中集成Quartz与RabbitMQ,完成通过Quartz定时向RabbitMQ发送消息,并且在客户端监听RabbitMQ队列的消息并进行处理。同时针对Quartz使用了集群特性。
数据层采用Spring boot jpa,用Hibernate简易处理了下CRUD。
t_account 账户表
t_order 订单表
t_order_process_detail 订单处理详情表
需要在本地,或者虚拟机中安装RabbitMQ服务器(安装过程略)
为了实现Quartz的集群配置,需要在数据库中添加Quartz的存储表。建表语句可以在Quartz的jar包的docs文件下找到适合所使用数据库的sql文件。这里使用的MySQL的innodb建表语句。
Spring boot的application.yml配置
Quartz的配置
quartz.properties
QuartzConfig.java
Quartz的两个相关类
AutowiringJobFactory.java
这个类是用来为QuartzJob中的属性进行Spring IoC注入
QuartzJob类
RabbitMQ的配置
RabbitMQ的一部分配置包含在Spring boot的application.yml文件中
RabbitConfig.java
Hibernate配置
HibernateConfig.java
剩余的部分就是传统的Spring项目,业务层,dao层等等。
项目结构:
![](https://img-blog.csdn.net/20180214125542693?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ2hhb2FsYWlu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
丰富Controller层的接口以后,可以通过postman等工具来完成订单创建等。
我们也可以观察到多个服务器同时运行模拟了一个Quartz集群,只在一个服务器中有QuartzJob的执行,并且一旦该服务器被关闭,QuartzJob可以在剩余可用的服务器中被执行。
同时观察RabbitMQ服务器,可以发现在配置的队列中(这里是order_queue),有消息的进入以及被消费。
https://github.com/wanghaoalain/rabbitquartz.git
在Spring boot中集成Quartz与RabbitMQ,完成通过Quartz定时向RabbitMQ发送消息,并且在客户端监听RabbitMQ队列的消息并进行处理。同时针对Quartz使用了集群特性。
数据层采用Spring boot jpa,用Hibernate简易处理了下CRUD。
环境准备
数据库中建立业务存储表t_account 账户表
t_order 订单表
t_order_process_detail 订单处理详情表
DROP TABLE IF EXISTS t_order; DROP TABLE IF EXISTS t_order_process_detail; DROP TABLE IF EXISTS t_account; CREATE TABLE t_order( id int(20) NOT NULL AUTO_INCREMENT, account_id int(20), order_process_detail_id int(20), order_no varchar(40) NOT NULL, order_price decimal(10,2), pay_status varchar(20), process_status varchar(20), primary KEY (id) )ENGINE=InnoDB; CREATE TABLE t_order_process_detail( id int(20) NOT NULL AUTO_INCREMENT, detail varchar(500), primary KEY (id) )ENGINE=InnoDB; CREATE TABLE t_account( id int(20) NOT NULL AUTO_INCREMENT, name varchar(200), balance decimal(10,2), primary KEY (id) )ENGINE=InnoDB;
需要在本地,或者虚拟机中安装RabbitMQ服务器(安装过程略)
为了实现Quartz的集群配置,需要在数据库中添加Quartz的存储表。建表语句可以在Quartz的jar包的docs文件下找到适合所使用数据库的sql文件。这里使用的MySQL的innodb建表语句。
DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS; DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS; DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE; DROP TABLE IF EXISTS QRTZ_LOCKS; DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS; DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS; DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS; DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS; DROP TABLE IF EXISTS QRTZ_TRIGGERS; DROP TABLE IF EXISTS QRTZ_JOB_DETAILS; DROP TABLE IF EXISTS QRTZ_CALENDARS; CREATE TABLE QRTZ_JOB_DETAILS( SCHED_NAME VARCHAR(120) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, JOB_CLASS_NAME VARCHAR(250) NOT NULL, IS_DURABLE VARCHAR(1) NOT NULL, IS_NONCONCURRENT VARCHAR(1) NOT NULL, IS_UPDATE_DATA VARCHAR(1) NOT NULL, REQUESTS_RECOVERY VARCHAR(1) NOT NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, JOB_NAME VARCHAR(200) NOT NULL, JOB_GROUP VARCHAR(200) NOT NULL, DESCRIPTION VARCHAR(250) NULL, NEXT_FIRE_TIME BIGINT(13) NULL, PREV_FIRE_TIME BIGINT(13) NULL, PRIORITY INTEGER NULL, TRIGGER_STATE VARCHAR(16) NOT NULL, TRIGGER_TYPE VARCHAR(8) NOT NULL, START_TIME BIGINT(13) NOT NULL, END_TIME BIGINT(13) NULL, CALENDAR_NAME VARCHAR(200) NULL, MISFIRE_INSTR SMALLINT(2) NULL, JOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP) REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPLE_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, REPEAT_COUNT BIGINT(7) NOT NULL, REPEAT_INTERVAL BIGINT(12) NOT NULL, TIMES_TRIGGERED BIGINT(10) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CRON_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, CRON_EXPRESSION VARCHAR(120) NOT NULL, TIME_ZONE_ID VARCHAR(80), PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_SIMPROP_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, STR_PROP_1 VARCHAR(512) NULL, STR_PROP_2 VARCHAR(512) NULL, STR_PROP_3 VARCHAR(512) NULL, INT_PROP_1 INT NULL, INT_PROP_2 INT NULL, LONG_PROP_1 BIGINT NULL, LONG_PROP_2 BIGINT NULL, DEC_PROP_1 NUMERIC(13,4) NULL, DEC_PROP_2 NUMERIC(13,4) NULL, BOOL_PROP_1 VARCHAR(1) NULL, BOOL_PROP_2 VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_BLOB_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, BLOB_DATA BLOB NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP), INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP), FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP) REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_CALENDARS ( SCHED_NAME VARCHAR(120) NOT NULL, CALENDAR_NAME VARCHAR(200) NOT NULL, CALENDAR BLOB NOT NULL, PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS ( SCHED_NAME VARCHAR(120) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)) ENGINE=InnoDB; CREATE TABLE QRTZ_FIRED_TRIGGERS ( SCHED_NAME VARCHAR(120) NOT NULL, ENTRY_ID VARCHAR(95) NOT NULL, TRIGGER_NAME VARCHAR(200) NOT NULL, TRIGGER_GROUP VARCHAR(200) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, FIRED_TIME BIGINT(13) NOT NULL, SCHED_TIME BIGINT(13) NOT NULL, PRIORITY INTEGER NOT NULL, STATE VARCHAR(16) NOT NULL, JOB_NAME VARCHAR(200) NULL, JOB_GROUP VARCHAR(200) NULL, IS_NONCONCURRENT VARCHAR(1) NULL, REQUESTS_RECOVERY VARCHAR(1) NULL, PRIMARY KEY (SCHED_NAME,ENTRY_ID)) ENGINE=InnoDB; CREATE TABLE QRTZ_SCHEDULER_STATE ( SCHED_NAME VARCHAR(120) NOT NULL, INSTANCE_NAME VARCHAR(200) NOT NULL, LAST_CHECKIN_TIME BIGINT(13) NOT NULL, CHECKIN_INTERVAL BIGINT(13) NOT NULL, PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)) ENGINE=InnoDB; CREATE TABLE QRTZ_LOCKS ( SCHED_NAME VARCHAR(120) NOT NULL, LOCK_NAME VARCHAR(40) NOT NULL, PRIMARY KEY (SCHED_NAME,LOCK_NAME)) ENGINE=InnoDB; CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME); CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE); CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME); CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY); CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP); CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP); CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP); commit;
项目配置
maven .pom文件<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ecloud.demo</groupId> <artifactId>rabbitquartz</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitquartz</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.1</version> </dependency> </dependencies> </project>
Spring boot的application.yml配置
server: port: 80 spring: application: name: rabbitquartz datasource: name: druid url: jdbc:mysql://localhost:3306/quartz username: admin password: admin type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver initialSize: 5 minIdle: 5 maxActive: 20 maxWait: 60000 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 validationQuery: SELECT 'x' FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false poolPreparedStatements: true maxPoolPreparedStatementPerConnectionSize: 20 filters: stat,wall,log4j connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 rabbitmq: host: 192.168.200.214 port: 5672 username: root password: root virtual-host: / publisher-confirms: true listener: prefetch: 1 jpa: database: MYSQL properties: hibernate: dialect: org.hibernate.dialect.MySQL5Dialect
Quartz的配置
quartz.properties
#============================================================================ # Configure JobStore # Using Spring datasource in SchedulerConfig.java # Spring uses LocalDataSourceJobStore extension of JobStoreCMT #============================================================================ org.quartz.jobStore.useProperties=false org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.clusterCheckinInterval = 5000 org.quartz.jobStore.misfireThreshold = 60000 org.quartz.jobStore.txIsolationLevelReadCommitted = true org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate #============================================================================ # Configure Main Scheduler Properties # Needed to manage cluster instances #============================================================================ org.quartz.scheduler.instanceName = ClusterQuartz org.quartz.scheduler.instanceId= AUTO org.quartz.scheduler.rmi.export = false org.quartz.scheduler.rmi.proxy = false org.quartz.scheduler.wrapJobExecutionInUserTransaction = false #============================================================================ # Configure ThreadPool # Can also be configured in spring configuration #============================================================================ #org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool #org.quartz.threadPool.threadCount = 5 #org.quartz.threadPool.threadPriority = 5 #org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
QuartzConfig.java
package ecloud.demo.rabbitquartz.config; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executor; import javax.sql.DataSource; import org.quartz.Scheduler; import org.quartz.spi.JobFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import ecloud.demo.rabbitquartz.infra.quartz.AutowiringJobFactory; import ecloud.demo.rabbitquartz.infra.quartz.QuartzJob; @Configuration public class QuartzConfig { // TODO 多台服务器的时间同步问题 @Autowired private DataSource dataSource; @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("Cluster_Scheduler"); factory.setDataSource(dataSource); factory.setApplicationContextSchedulerContextKey("applicationContext"); factory.setTaskExecutor(schedulerThreadPool()); factory.setTriggers(quartzTrigger().getObject()); factory.setQuartzProperties(quartzProperties()); factory.setJobFactory(autowiringJobFactory()); factory.setOverwriteExistingJobs(true); factory.setStartupDelay(15); return factory; } @Bean public JobFactory autowiringJobFactory() { return new AutowiringJobFactory(); } @Bean public Scheduler scheduler() throws Exception { Scheduler scheduler = schedulerFactoryBean().getScheduler(); scheduler.start(); return scheduler; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); // 在quartz.properties中的属性被读取并注入后再初始化对象 propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } @Bean public JobDetailFactoryBean quartzJob() { JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean(); jobDetailFactoryBean.setJobClass(QuartzJob.class); jobDetailFactoryBean.setDurability(true); jobDetailFactoryBean.setGroup("order"); jobDetailFactoryBean.setName("order_job"); jobDetailFactoryBean.setRequestsRecovery(true); return jobDetailFactoryBean; } @Bean public CronTriggerFactoryBean quartzTrigger() { CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(quartzJob().getObject()); cronTriggerFactoryBean.setCronExpression("0/15 * * * * ?"); return cronTriggerFactoryBean; } @Bean public Executor schedulerThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(15); executor.setMaxPoolSize(25); executor.setQueueCapacity(100); return executor; } }
Quartz的两个相关类
AutowiringJobFactory.java
这个类是用来为QuartzJob中的属性进行Spring IoC注入
package ecloud.demo.rabbitquartz.infra.quartz; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.scheduling.quartz.AdaptableJobFactory; public class AutowiringJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); // IoC injection capableBeanFactory.autowireBean(jobInstance); return jobInstance; } }
QuartzJob类
package ecloud.demo.rabbitquartz.infra.quartz; import java.util.List; import org.quartz.DisallowConcurrentExecution; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.stereotype.Component; import ecloud.demo.rabbitquartz.business.entity.Order; import ecloud.demo.rabbitquartz.business.service.RabbitQuartzService; import ecloud.demo.rabbitquartz.infra.rabbitmq.RabbitSender; @Component @PersistJobDataAfterExecution // 禁止任务并发执行 @DisallowConcurrentExecution public class QuartzJob extends QuartzJobBean { private final Logger logger = LoggerFactory.getLogger(QuartzJob.class); @Autowired private RabbitQuartzService rabbitQuartzService; @Autowired private RabbitSender rabbitSender; @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { List<Order> orderList = rabbitQuartzService.getAllOrdersToBeProcessed(); if (orderList.size() > 0) { rabbitSender.send(orderList.get(0).getId()); } logger.info("End quartz job \n"); } }
RabbitMQ的配置
RabbitMQ的一部分配置包含在Spring boot的application.yml文件中
RabbitConfig.java
package ecloud.demo.rabbitquartz.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import ecloud.demo.rabbitquartz.business.service.RabbitQuartzService; /* * Spring Boot中消息接受处理完后,若无异常,则由Spring Boot发送消息接受确认。 * spring.rabbitmq.listener.prefetch控制QoS。 */ @Configuration @EnableRabbit public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); public static final String ORDER_ROUTING_KEY = "order_key"; public static final String ORDER_QUEUE = "order_queue"; public static final String ORDER_EXCHANGE = "order_exchange"; @Bean Queue orderQueue() { return new Queue(ORDER_QUEUE); } @Bean DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE); } @Bean Binding bindingExangeQueue() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, RabbitQuartzService rabbitQuartzService) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 发送确认的回调方法,配合rabbitmq.publisher-confirms=true rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { if (b) { logger.info("Sending OK"); } else { logger.error("Sending failed, cause: " + s); } }); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } @Bean Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
Hibernate配置
HibernateConfig.java
package ecloud.demo.rabbitquartz.config; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.annotation.Configuration; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; @Configuration @EntityScan(basePackages = "ecloud.demo.rabbitquartz.business") @EnableJpaRepositories(basePackages = "ecloud.demo.rabbitquartz.business") public class HibernateConfig { }
剩余的部分就是传统的Spring项目,业务层,dao层等等。
项目结构:
项目运行
复制项目后,只需要修改application.yml中的server port,就可以模拟同时启动多个服务器实例。丰富Controller层的接口以后,可以通过postman等工具来完成订单创建等。
我们也可以观察到多个服务器同时运行模拟了一个Quartz集群,只在一个服务器中有QuartzJob的执行,并且一旦该服务器被关闭,QuartzJob可以在剩余可用的服务器中被执行。
同时观察RabbitMQ服务器,可以发现在配置的队列中(这里是order_queue),有消息的进入以及被消费。
待解决问题
在当前项目中,Quartz的集群是在同一个主机上运行的。如果服务器的示例是在不同的服务器中需要考虑服务器的时间同步。关于这一点,我还没想好。代码下载
全部代码下载链接:https://github.com/wanghaoalain/rabbitquartz.git
相关文章推荐
- Spring Boot集成Quartz-动态任务管理
- RabbitMq在spring boot中集成和应用
- spring-boot 集成 rabbitmq
- Spring Boot 集成RabbitMQ
- springboot中rabbitmq集成——单项目
- spring boot 集成 RabbitMQ
- [springboot]集成org.quartz-scheduler进行任务调度
- SpringBoot实践之---集成RabbitMQ
- Spring Boot集成Quartz注入Spring管理的类
- 【springboot】 springboot集成quartz实现定时任务
- spring-boot 集成 rabbitmq
- 第5篇 RabbitMQ集成SpringBoot实现Direct模式
- Spring Boot集成Quartz注入Spring管理的类
- SpringBoot 集成 rabbitmq 简单实现通过队列进行,订单系统与库存系统,物流系统之间的信息交互
- spring-boot集成Quartz
- springboot-26-springboot 集成rabbitmq
- springboot集成rabbitMQ之对象传输的方法
- spring boot 集成rabbitmq(9步搞定)
- spring boot 集成RabbitMQ
- Quartz与Spring Boot集成使用