您的位置:首页 > 其它

观察者模式实际应用:监听线程,意外退出线程后自动重启

2017-06-21 00:13 375 查看

摘要:  观察者模式,定义对象之间的一种一对多的依赖关系,当对象的状态发生改变时,所有依赖于它的对象都得到通知并且被自动更新。观察者模式在JDK中有现成的实现,java.util.Obserable。


  首先说下需求:通过ftp上传约定格式的文件到服务器指定目录下,应用程序能实时监控该目录下文件变化,如果上传的文件格式符合要求,将将按照每一行读取解析再写入到数据库,解析完之后再将文件改名。(这个是原先已经实现了的功能,请看我的一篇文章java利用WatchService实时监控某个目录下的文件变化并按行解析(注:附源代码)

但项目上线一段时间后,发现再利用FileZilla登陆上传文件,文件不能被解析,而重启tomcat之后再上传,又能解析,于是判定是监控指定目录的那个线程挂掉了,导致上传后的文件不能被检测到,故也不能被解析。之后查看日志也最终验证了我推断。

  所以关键的问题就是:如何监听线程,当意外退出线程后进行自动重启,这也是本文所要利用观察者模式实现的。

下面请看实现过程(尤其见红色注解部分):

  1、web.xml监听器配置文件监控监听器,初始化创建一个监控指定目录的线程  

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"> 
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:root-context.xml</param-value>
</context-param>

<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<filter>
<filter-name>sitemesh</filter-name>
<filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class>
</filter>

<filter-mapping>
<filter-name>sitemesh</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

<servlet>
<servlet-name>appServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:servlet-context.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>

<servlet-mapping>
<servlet-name>appServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>

<!-- 配置spring监听器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- 配置监控文件变化监听器 -->
<listener>
<listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class>
</listener>
<listener>
<listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class>
</listener>

<jsp-config>
<taglib>
<taglib-uri>/tag</taglib-uri>
<taglib-location>/WEB-INF/tag/tag.tld</taglib-location>
</taglib>
</jsp-config>

<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>

<session-config>
<session-timeout>45</session-timeout>
</session-config>
</web-app>


  2、编写一个观察者实现类,用于监听“监控指定目录线程”,当“监控指定目录线程”挂掉后,自动重启该线程

package com.zealer.ad.listener;

import java.util.Observable;
import java.util.Observer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.zealer.ad.task.WatchFilePathTask;
public class ObserverListener implements Observer{
private Log log = LogFactory.getLog(ObserverListener.class);

@Override
public void update(Observable o, Object arg)
{
log.info("WatchFilePathTask挂掉");
WatchFilePathTask run = new WatchFilePathTask();
run.addObserver(this);
new Thread(run).start();
log.info("WatchFilePathTask重启");
}
}


  3、编写一个ThreadStartUpListenser类,实现ServletContextListener,tomcat启动时创建后台线程

package com.zealer.ad.listener;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import com.zealer.ad.task.WatchFilePathTask;

@Component
public class ThreadStartUpListenser implements ServletContextListener
{
private static WatchFilePathTask r = new WatchFilePathTask();

private Log log = LogFactory.getLog(ThreadStartUpListenser.class);

@Override
public void contextDestroyed(ServletContextEvent paramServletContextEvent)
{
//        r.interrupt();

}

@Override
public void contextInitialized(ServletContextEvent paramServletContextEvent)
{
ObserverListener listen = new ObserverListener();
       //给“监控指定目录下的线程”(被观察者),添加一个观察者
r.addObserver(listen);
new Thread(r).start();
//            r.start();
log.info("ImportUserFromFileTask is started!");
}

}


  4、创建指定目录文件变化监控类[b]WatchFilePathTask[/b]

package com.zealer.ad.task;

import java.io.File;
import java.io.FileFilter;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Observable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;

import com.zealer.ad.util.ConfigUtils;
import com.zealer.ad.util.SpringUtils;

/**
* 指定目录文件变化监控类
* @author cancer
*
*/
public class WatchFilePathTask extends Observable implements Runnable
{
private Log log = LogFactory.getLog(WatchFilePathTask.class);

private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path");

private WatchService watchService;

// 此方法一经调用,立马可以通知观察者,在本例中是监听线程
public void doBusiness()
  {
if(true)
     {
super.setChanged();
}
     //通知观察者,重启线程
notifyObservers();
}

@Override
public void run()
{
try
{
//获取监控服务
watchService = FileSystems.getDefault().newWatchService();
log.debug("获取监控服务"+watchService);
Path path = FileSystems.getDefault().getPath(filePath);
log.debug("@@@:Path:"+path);

final String todayFormat = DateTime.now().toString("yyyyMMdd");

File existFiles = new File(filePath);

//启动时检查是否有未解析的符合要求的文件
if(existFiles.isDirectory())
{
File[] matchFile = existFiles.listFiles(new FileFilter()
{

@Override
public boolean accept(File pathname)
{
if((todayFormat+".txt").equals(pathname.getName()))
{
return true;
}
else
{
return false;
}
}
});

if(null != matchFile)
{
for (File file : matchFile)
{
//找到符合要求的文件,开始解析
ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");
task.setFileName(file.getAbsolutePath());
task.start();
}
}
}

//注册监控服务,监控新增事件
WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
while (true)
{
key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {

//获取目录下新增的文件名
String fileName = event.context().toString();

//检查文件名是否符合要求
if((todayFormat+".txt").equals(fileName))
{
String filePath = path.toFile().getAbsolutePath()+File.separator+fileName;
log.info("import filePath:"+filePath);

//启动线程导入用户数据
ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath);
task.setFileName(filePath);
task.start();
log.debug("启动线程导入用户数据"+task);
}
}
key.reset();
}
}
     catch (Exception e)
{
e.printStackTrace();
System.out.println("已经到这里来了");
doBusiness();//在抛出异常时调用,通知观察者,让其重启线程
}
}
}


  5、创建解析用户文件及导入数据库线程,由WatchFilePathTask启动

package com.zealer.ad.task;

import com.zealer.ad.entity.AutoPutUser;
import com.zealer.ad.entity.Bmsuser;
import com.zealer.ad.service.AutoPutUserService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.joda.time.DateTime;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;

import java.util.Date;

import javax.annotation.Resource;

/**
* 解析用户文件及入库线程,由WatchFilePathTask启动
* @author cancer
*
*/
public class ImportUserFromFileTask extends Thread {
private Log log = LogFactory.getLog(ImportUserFromFileTask.class);
private String fileName;
@Resource(name = "autoPutUserService")
private AutoPutUserService autoPutUserService;

@Override
public void run() {
File file = new File(fileName);

if (file.exists() && file.isFile()) {
log.debug(":@@@准备开始休眠10秒钟:" + file);

//休眠十分钟,防止文件过大还没完全拷贝到指定目录下,这里的线程就开始读取文件
try {
sleep(10000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

InputStreamReader read;

try {
read = new InputStreamReader(new FileInputStream(file), "UTF-8");

BufferedReader bufferedReader = new BufferedReader(read);
String lineTxt = null;
int count = 0;
Boolean f = false;

while ((lineTxt = bufferedReader.readLine()) != null) {
if ((null == lineTxt) || "".equals(lineTxt)) {
continue;
}

if (lineTxt.startsWith("'")) {
lineTxt = lineTxt.substring(1, lineTxt.length());
}

//解析分隔符为', '
String[] lines = lineTxt.split("', '");
int length = lines.length;

if (length < 2) {
continue;
}

Bmsuser bmsuser = new Bmsuser();
bmsuser.setName(lines[0]);if (!"".equals(lines[1])) {
bmsuser.setCity(lines[1]);
}
            //根据唯一索引已经存在的数据则不插入
f = autoPutUserService.insertIgnore(bmsuser);

if (f) {
count++;
}
}

//汇总数据
AutoPutUser autoPutUser = new AutoPutUser();
autoPutUser.setTotalCount(autoPutUserService.getUserCount());
autoPutUser.setCount(count);
autoPutUser.setCountDate(new Date(System.currentTimeMillis()));

String today = DateTime.now().toString("yyyy-MM-dd");
Integer oldCount = autoPutUserService.getOldCount(today);

//如果今天导入过了就更新否则插入
if (!oldCount.equals(0)) {
autoPutUserService.updateUserData(autoPutUser, today,
oldCount);
} else {
autoPutUserService.gatherUserData(autoPutUser);
}

//注意:要关闭流
read.close();
} catch (Exception e) {
log.error(e.getMessage(), e);
}

File newFile = new File(file.getPath() +
System.currentTimeMillis() + ".complate");
file.renameTo(newFile);
} else {
log.error(fileName + " file is not exists");
}
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public AutoPutUserService getAutoPutUserService() {
return autoPutUserService;
}

public void setAutoPutUserService(AutoPutUserService autoPutUserService) {
this.autoPutUserService = autoPutUserService;
}
}


附带:

1、sql脚本

CREATE TABLE `bmsuser` (
`id` int(255) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL ,
`city` varchar(32) DEFAULT NULL COMMENT ,
PRIMARY KEY (`bmsid`),
UNIQUE KEY `bbLoginName` (`bbLoginName`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;


2、文件格式,命名为yyyyMMdd.txt

'张三', '深圳'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: