您的位置:首页 > 编程语言 > Java开发

Spring boot集成Quartz与RabbitMQ

2018-02-14 13:47 603 查看
完成一个本地demo的小项目。

在Spring boot中集成Quartz与RabbitMQ,完成通过Quartz定时向RabbitMQ发送消息,并且在客户端监听RabbitMQ队列的消息并进行处理。同时针对Quartz使用了集群特性。

数据层采用Spring boot jpa,用Hibernate简易处理了下CRUD。

环境准备

数据库中建立业务存储表

t_account 账户表

t_order 订单表

t_order_process_detail 订单处理详情表

DROP TABLE IF EXISTS t_order;
DROP TABLE IF EXISTS t_order_process_detail;
DROP TABLE IF EXISTS t_account;

CREATE TABLE t_order(
id int(20) NOT NULL AUTO_INCREMENT,
account_id int(20),
order_process_detail_id int(20),
order_no varchar(40) NOT NULL,
order_price decimal(10,2),
pay_status varchar(20),
process_status varchar(20),
primary KEY (id)
)ENGINE=InnoDB;

CREATE TABLE t_order_process_detail(
id int(20) NOT NULL AUTO_INCREMENT,
detail varchar(500),
primary KEY (id)
)ENGINE=InnoDB;

CREATE TABLE t_account(
id int(20) NOT NULL AUTO_INCREMENT,
name varchar(200),
balance decimal(10,2),
primary KEY (id)
)ENGINE=InnoDB;


需要在本地,或者虚拟机中安装RabbitMQ服务器(安装过程略)

为了实现Quartz的集群配置,需要在数据库中添加Quartz的存储表。建表语句可以在Quartz的jar包的docs文件下找到适合所使用数据库的sql文件。这里使用的MySQL的innodb建表语句。

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(200) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;

CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);

CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);

CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);

commit;


项目配置

maven .pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>ecloud.demo</groupId>
<artifactId>rabbitquartz</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>rabbitquartz</name>
<url>http://maven.apache.org</url>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath />
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.13</version>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.2.1</version>
</dependency>

</dependencies>
</project>


Spring boot的application.yml配置

server:
port: 80

spring:
application:
name: rabbitquartz
datasource:
name: druid
url: jdbc:mysql://localhost:3306/quartz
username: admin
password: admin
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 'x' FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,log4j
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
rabbitmq:
host: 192.168.200.214
port: 5672
username: root
password: root
virtual-host: /
publisher-confirms: true
listener:
prefetch: 1
jpa:
database: MYSQL
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL5Dialect


Quartz的配置

quartz.properties

#============================================================================
# Configure JobStore
# Using Spring datasource in SchedulerConfig.java
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
#============================================================================
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.txIsolationLevelReadCommitted = true
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Configure Main Scheduler Properties
# Needed to manage cluster instances
#============================================================================
org.quartz.scheduler.instanceName = ClusterQuartz
org.quartz.scheduler.instanceId= AUTO
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

#============================================================================
# Configure ThreadPool
# Can also be configured in spring configuration
#============================================================================
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount = 5
#org.quartz.threadPool.threadPriority = 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true


QuartzConfig.java

package ecloud.demo.rabbitquartz.config;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;

import javax.sql.DataSource;

import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import ecloud.demo.rabbitquartz.infra.quartz.AutowiringJobFactory;
import ecloud.demo.rabbitquartz.infra.quartz.QuartzJob;

@Configuration
public class QuartzConfig {

// TODO 多台服务器的时间同步问题

@Autowired
private DataSource dataSource;

@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();

factory.setSchedulerName("Cluster_Scheduler");
factory.setDataSource(dataSource);
factory.setApplicationContextSchedulerContextKey("applicationContext");
factory.setTaskExecutor(schedulerThreadPool());
factory.setTriggers(quartzTrigger().getObject());
factory.setQuartzProperties(quartzProperties());
factory.setJobFactory(autowiringJobFactory());
factory.setOverwriteExistingJobs(true);
factory.setStartupDelay(15);
return factory;
}

@Bean
public JobFactory autowiringJobFactory() {
return new AutowiringJobFactory();
}

@Bean
public Scheduler scheduler() throws Exception {
Scheduler scheduler = schedulerFactoryBean().getScheduler();
scheduler.start();
return scheduler;
}

@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));

// 在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}

@Bean
public JobDetailFactoryBean quartzJob() {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();

jobDetailFactoryBean.setJobClass(QuartzJob.class);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setGroup("order");
jobDetailFactoryBean.setName("order_job");
jobDetailFactoryBean.setRequestsRecovery(true);

return jobDetailFactoryBean;
}

@Bean
public CronTriggerFactoryBean quartzTrigger() {
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();

cronTriggerFactoryBean.setJobDetail(quartzJob().getObject());
cronTriggerFactoryBean.setCronExpression("0/15 * * * * ?");

return cronTriggerFactoryBean;
}

@Bean
public Executor schedulerThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(15);
executor.setMaxPoolSize(25);
executor.setQueueCapacity(100);

return executor;
}
}


Quartz的两个相关类

AutowiringJobFactory.java

这个类是用来为QuartzJob中的属性进行Spring IoC注入

package ecloud.demo.rabbitquartz.infra.quartz;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;

public class AutowiringJobFactory extends AdaptableJobFactory {

@Autowired
private AutowireCapableBeanFactory capableBeanFactory;

@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
// IoC injection
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}


QuartzJob类

package ecloud.demo.rabbitquartz.infra.quartz;

import java.util.List;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import ecloud.demo.rabbitquartz.business.entity.Order;
import ecloud.demo.rabbitquartz.business.service.RabbitQuartzService;
import ecloud.demo.rabbitquartz.infra.rabbitmq.RabbitSender;

@Component
@PersistJobDataAfterExecution
// 禁止任务并发执行
@DisallowConcurrentExecution
public class QuartzJob extends QuartzJobBean {

private final Logger logger = LoggerFactory.getLogger(QuartzJob.class);

@Autowired
private RabbitQuartzService rabbitQuartzService;

@Autowired
private RabbitSender rabbitSender;

@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

List<Order> orderList = rabbitQuartzService.getAllOrdersToBeProcessed();

if (orderList.size() > 0) {
rabbitSender.send(orderList.get(0).getId());
}

logger.info("End quartz job \n");
}

}


RabbitMQ的配置

RabbitMQ的一部分配置包含在Spring boot的application.yml文件中

RabbitConfig.java

package ecloud.demo.rabbitquartz.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import ecloud.demo.rabbitquartz.business.service.RabbitQuartzService;

/*
* Spring Boot中消息接受处理完后,若无异常,则由Spring Boot发送消息接受确认。
* spring.rabbitmq.listener.prefetch控制QoS。
*/
@Configuration
@EnableRabbit
public class RabbitConfig {

private final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

public static final String ORDER_ROUTING_KEY = "order_key";

public static final String ORDER_QUEUE = "order_queue";

public static final String ORDER_EXCHANGE = "order_exchange";

@Bean
Queue orderQueue() {
return new Queue(ORDER_QUEUE);
}

@Bean
DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}

@Bean
Binding bindingExangeQueue() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}

@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, RabbitQuartzService rabbitQuartzService) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 发送确认的回调方法,配合rabbitmq.publisher-confirms=true
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {
if (b) {
logger.info("Sending OK");
} else {
logger.error("Sending failed, cause: " + s);
}
});

rabbitTemplate.setMessageConverter(messageConverter());

return rabbitTemplate;
}

@Bean
Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}


Hibernate配置

HibernateConfig.java

package ecloud.demo.rabbitquartz.config;

import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@Configuration
@EntityScan(basePackages = "ecloud.demo.rabbitquartz.business")
@EnableJpaRepositories(basePackages = "ecloud.demo.rabbitquartz.business")
public class HibernateConfig {

}


剩余的部分就是传统的Spring项目,业务层,dao层等等。

项目结构:



项目运行

复制项目后,只需要修改application.yml中的server port,就可以模拟同时启动多个服务器实例。

丰富Controller层的接口以后,可以通过postman等工具来完成订单创建等。

我们也可以观察到多个服务器同时运行模拟了一个Quartz集群,只在一个服务器中有QuartzJob的执行,并且一旦该服务器被关闭,QuartzJob可以在剩余可用的服务器中被执行。

同时观察RabbitMQ服务器,可以发现在配置的队列中(这里是order_queue),有消息的进入以及被消费。

待解决问题

在当前项目中,Quartz的集群是在同一个主机上运行的。如果服务器的示例是在不同的服务器中需要考虑服务器的时间同步。关于这一点,我还没想好。

代码下载

全部代码下载链接:

https://github.com/wanghaoalain/rabbitquartz.git
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: