Java for Web学习笔记(六八):Service和Repository(3)异步Async和调度Schedule
2017-07-08 15:40
531 查看
异步和调度的配置
接上次学习的例子,我们增加两个功能:有人跟贴时,向订购该帖的用户发送邮件 ➤ 需要开启一个后台运行线程来执行
删除过期的老帖 ➤ 需要定时执行任务,且在后台线程执行
后台任务执行
上面两者均需要后台任务执行,前者是直接开启一个后台任务,后者是定时调度开启的后台任务,而定时调度本身就是一个后台任务。后台任务是在异步线程中执行的,对于后台线程,在性能上我们要注意:线程是消耗性能的,总数太多,可能会导致崩溃,因此尽可能限制线程开启的数量
开启和关闭一个异步线程是相当消耗性能的,可通过线程池的方式,重复使用
在web app结束之际,需要正确处理正在运行的后台线程(或者在队列中等待执行的)。
web app不是一个进程,是运行在tomcat这类web容器中,容器对web app的关闭,并不能关闭它后台线程,除非将容器关闭。我们可以做个小例子来证明,启动一个线程,定期在文件中加入内容。通过tomcat的app管理,关闭该app,我们发现仍在定期添加内容。这点很容易忽略,因此开发环境中,调测web app是通过开启或者停止tomcat的。但在生产环境中,web app可不一定独享tomcat。但这是产生严重的内存泄漏问题,我们可以查看tomcat的catalina.out有没有相关的告警。
Spring帮助解决了上述问题(是个强大的工具^_^),通过@Async和@Scheduled来实现异步运行,且自动异步运行。
配置执行器和调度器
执行器executors用于执行任务,调度器schedulers用于按计划启动任务。java.util.concurrent.Executor接口定义了执行器,可以通过Runnable实现异步,Spring将这个接口扩展为org.springframework.core.task.TaskExecutor。Spring还提供了org.springframework.scheduling.TaskScheduler接口,当中的方法可在未来执行一次或多次。在配置中,我们需要给出执行器和调度器的实例,最常用的是org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler:提供了执行器和调度器,同时实现了java.util.concurrent.ThreadFactory接口
提供线程池来避免线程数量过大以及更好的使用效率
在web app关闭时确保关闭其所产生的线程,避免线程泄漏。
一句话,ThreadPoolTaskScheduler解决之前的三大问题。我们在Root上下文进行配置,因为这两个后台运行,应该是Service的一部分,不涉及UI,在root中,可以在所有的SerlvetDispather都能使用。
//【1】允许异步执行@Async的方法,需要在配置中标记@EnableAsync,允许@Scheduled有效,需标记@EnableScheduling // proxyTargetClass表示使用对异步和计划方法使用CGLIB library to proxy classes,而非标准的java接口proxies; // - 设置为true,表示可以在实例上创建非接口指定的异步或调度方法(我们在后面有具体的事例) // - 设置为false则表示必须在接口中指定。 @Configuration @EnableAsync(proxyTargetClass = true) @EnableScheduling @ComponentScan( basePackages = "cn.wei.chapter14.site", excludeFilters = @ComponentScan.Filter(Controller.class)) //【2】@EnableAsync和@EnableScheduling将采用缺省的配置,但我们需要进行定制,包括使用ThreadPoolTaskScheduler,并进行设置,如设置线程池的大小。要定制化,就要实现AsyncConfigurer和SchedulingConfigurer接口 public class RootContextConfiguration implements AsyncConfigurer,SchedulingConfigurer{ private static final Logger logger = LogManager.getLogger(); private static final Logger schedulingLogger = LogManager.getLogger(logger.getName() + ".[scheduling]"); ..... // 【3】通过@Bean创建实例。我们采用ThreadPoolTaskScheduler实例同时作为执行器和调度器。getAsyncExecutor()将给出执行器,configureTasks()将注册调度器。要注意 @Bean只实例化了一次,Spring proxies调用所有的@Bean方法,都不会调用第二次,结果会被缓存。我们通过log可以看出,在getAsyncExecutor()和configureTasks()中调度器和执行器中使用的都是同一个实例. @Bean public ThreadPoolTaskScheduler taskScheduler(){ logger.info("Setting up thread pool task scheduler with 20 threads."); ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(20); scheduler.setThreadNamePrefix("task-"); //处理web app关闭时,不强制中断task的执行,有60秒的缓冲期,完成当前需要完成的任务。缺省为false,强制中断 scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); scheduler.setErrorHandler(t -> schedulingLogger.error( "Unknown error occurred while executing task.", t)); scheduler.setRejectedExecutionHandler((r,e)->schedulingLogger.error( "Execution of task {} was rejected for unknown reasons.",r,e)); return scheduler; } // 【3.1】SchedulingConfigurer接口,注册调度器。 @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { TaskScheduler scheduler = this.taskScheduler(); logger.info("Configuring scheduled method executor {}.", scheduler); taskRegistrar.setTaskScheduler(scheduler); } // 【3.2】是AsyncConfigurer接口,返回执行器实例 @Override public Executor getAsyncExecutor() { Executor executor = this.taskScheduler(); logger.info("Configuring asynchronous method executor {}.", executor); return executor; } // 【3.3】完善接口实现 @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
异步方法
我们通过异步线程实现跟帖通知发送。定义相关的Service接口
根据分层编程的方式,新增一个NotificationService来实现通知发送:public interface NotificationService { // 原则上我们可以不在接口中标记@Async,但是强烈建议这样做,告知这是一个后台运行的任务 @Async public void sendNotification(String subject, String message,Collection<String> recipients); }
实现异步任务的方法
编写一个伪邮件发送的服务进行模拟,中间sleep了5秒。@Service public class FakeNotificationService implements NotificationService{ private static final Logger logger = LogManager.getLogger(); // @Async是加载在方法上,注入的不是bean,而是proxy,调用proxy的方法。配置提到执行器是使用proxy,配置为使用CGLIB类。Spring要在Spring的bean中识别@Async,例如本例是@Service实例,不能在非Spring component中使用。 @Override @Async public void sendNotification(String subject, String message, Collection<String> recipients) { logger.info("Started notifying recipients {}.", recipients); try { Thread.sleep(5_000L); } catch (InterruptedException ignore) { } logger.info("Finished notifying recipients."); } }
调用这个异步任务
在ReplyService中具体实现跟帖发送通知@Service public class DefaultReplyService implements ReplyService{ @Inject private NotificationService notificationService; ...... @Override public void saveReply(Reply reply) { Discussion discussion = this.discussionService.getDisscussion(reply.getDiscussionId()); if(reply.getId() < 1){ //new:user first time reply discussion.getSubscribedUsers().add(reply.getUser()); reply.setCreated(Instant.now()); this.replyRepostory.add(reply); //有新的reply需要通知所有的,除了发布者 Set<String> recipients = new HashSet<>(discussion.getSubscribedUsers()); recipients.remove(reply.getUser()); logger.info("Want to send notification."); this.notificationService.sendNotification("Reply posted", "Someone replied to \"" + discussion.getSubject()+ ".\"", recipients); }else{ //old: user reply is not the first,just update his reply this.replyRepostory.update(reply); } this.discussionService.saveDiscussion(discussion); } }
执行结果,可以看出是异步任务执行。
17:12:34.591 [http-nio-8080-exec-8] [INFO ] DefaultReplyService:48 saveReply() - Want to send notification. 17:12:34.626 [task-4] [INFO ] FakeNotificationService:17 sendNotification() - Started notifying recipients [one@189.cn]. 17:12:39.627 [task-4] [INFO ] FakeNotificationService:21 sendNotification() - Finished notifying recipients.
调度器
@Service public class DefaultDiscussionService implements DiscussionService{ ... ... // 标记@Scheduled的方法表示调度执行,方法不允许有任何的参数 // 一般而言,我们不会主动调用这个方法(应避免这样做),如果非要这么做,该方法是不会运行在后台,除非加上@Async的标记。 // deleteStaleDiscussions()没有在Interface中定义,在配置中我们设置了proxyTargetClass = true,说明异步和计划可以用在非接口定义的方法。 // 第一次调度在web启动后15秒开始,然后每隔10秒钟调度一次 @Scheduled(fixedDelay = 10_000L, initialDelay = 15_000L) public void deleteStaleDiscussions(){ Instant oneYearAgo = Instant.now().minus(365L, ChronoUnit.DAYS); logger.info("Deleting discussions stale since {}.", oneYearAgo); List<Discussion> list = this.discussionRepository.getAll(); list.removeIf(d -> d.getLastUpdated().isAfter(oneYearAgo)); for(Discussion old : list){ this.discussionRepository.delete(old.getId()); } } }
运行结果:
14:40:28.860 [task-13] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:40:28.860Z. 14:40:38.864 [task-13] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:40:38.864Z. 14:40:48.865 [task-13] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:40:48.865Z. 14:40:58.870 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:40:58.870Z. 14:41:08.871 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:41:08.871Z. 14:41:18.873 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:41:18.873Z. 14:41:28.874 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:41:28.874Z. 14:41:38.875 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:41:38.875Z. 14:41:48.878 [task-4] [INFO ] DefaultDiscussionService:69 deleteStaleDiscussions() - Deleting discussions stale since 2016-06-20T06:41:48.878Z.
相关链接:
我的Professional Java for Web Applications相关文章
相关文章推荐
- Java for Web学习笔记(七十):Service和Repository(5)回调处理Consumer
- Java for Web学习笔记(六七):Service和Repository(2)抽象分层例子
- Java for Web学习笔记(七一):Service和Repository(6)在Spring框架中使用Listener
- Java for Web学习笔记(四十):Filter(2)AsyncContext和Filter
- Java for Web学习笔记(四一):Filter(3)用于Log
- Java WebService学习笔记 - Axis进阶(二)
- Java for Web学习笔记(四二):Filter(4)用于压缩
- .net调用java webservice基于JBOSS服务器 学习笔记(一)
- Java for Web学习笔记(三八):自定义tag(6)一些注意
- Java for Web学习笔记(二八):JSTL(4)Core Tag(下)
- Java for Web学习笔记(二):Web Containers
- Java for Web学习笔记(二六):JSTL(2)Core Tag(上)
- Java for Web学习笔记(六):Servlet(4)HttpServletResponse
- Java Web Service 学习笔记(一)
- Java for Web学习笔记(二一):EL(1)什么是EL
- Java for Web学习笔记(二三):EL(3)EL的视图
- Java for Web学习笔记(十四):JSP(4)JSP Tag
- Java for Web学习笔记(十):Servlet(8)下发文件
- Java for Web学习笔记(十六):JSP(6)jspx
- Java for Web学习笔记(八):Servlet(6)doGet()和doPost()是线程还是队列