您的位置:首页 > 其它

一个简单高效的多线程解决方案

2014-11-06 15:50 274 查看
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 多线程抓取数据的简单程序
*/
public class MultithreadFetcher {

/** 阻塞队列的最大长度,防止内存溢出。  */
public static final int MAX_QUEUE_SIZE = 100;
/** 最大递归深度,防止递归死循环  */
public static final int RECURSION_LEVEL = Integer.MAX_VALUE;
/** 致命毒药,优雅关闭后续的工作线程  */
private static final File DEADLY_POISON = new File("./deadly.tmp");

/**
* 递归遍历文件夹,将遍历的文件放入队列中。
* @param folder 目标文件夹
* @param fileQueue 文件队列
* @param level 递归深度
*/
private static void visitFolder(File folder, BlockingQueue<File> fileQueue, int level) throws InterruptedException{
if(level<=0){//控制递归深度,防止递归死循环。
return;
}
File[] files = folder.listFiles();
for(File file : files){
if(file.isDirectory()){
visitFolder(file,fileQueue,level-1);
}else if(file.getName().toLowerCase().endsWith(".xml")){
fileQueue.put(file);
}else{
//do nothing ...
}
}
}
/**
* 创建目标文件。通过原文件的名称创建一个新的文件。
* @param file 原始文件
* @param targetFolder 目标文件夹
* @return 新的文件,目标文件
*/
private static File createTargetFile(File file, File targetFolder){
String targetFileName = file.getName();
return new File(targetFolder,targetFileName);
}
/**
* 处理文件的操作,可以在这个里面读取文件数据,解析文件,抓取网页,写入备份。
* @param file 原始文件,待解析的文件
* @param target 目标文件,备份文件
*/
private static void travelFile(File file, File target) throws Throwable{
//详细操作从略
}

/** 递归文件夹的线程。不支持多线程并发递归。 */
static class VisitFolderThread extends Thread{
private File folder;
private BlockingQueue<File> fileQueue;
public VisitFolderThread(File folder, BlockingQueue<File> fileQueue) {
super("visit-folder-thread");
this.folder = folder;
this.fileQueue = fileQueue;
}
@Override
public void run() {
try {
visitFolder(folder, fileQueue, RECURSION_LEVEL);
fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
} catch (InterruptedException e) {
// 在这里可以做一些异常处理
e.printStackTrace();
}
}
}

/** 处理文件的线程,可以多线程并发处理,每个线程处理一个文件  */
static class TravelFileThread extends Thread{
private static final AtomicInteger ThreadCount = new AtomicInteger();
private File targetFolder;
private BlockingQueue<File> fileQueue;
public TravelFileThread(File targetFolder, BlockingQueue<File> fileQueue) {
super("travel-file-thread-"+ThreadCount.incrementAndGet());
this.targetFolder = targetFolder;
this.fileQueue = fileQueue;
}
@Override
public void run() {
File file = null;
try {
while((file=fileQueue.take())!=DEADLY_POISON){
File target = createTargetFile(file, targetFolder);
try {
travelFile(file, target);
} catch (Throwable e) {
onException(e,file,target);
}
}
fileQueue.put(DEADLY_POISON);//放置毒药,优雅关闭
} catch (InterruptedException e) {
// 在这里可以做一些异常处理
e.printStackTrace();
}
}
/** 在处理文件的过程中,如果抛出异常,则进入下面的处理程序,从略。 */
private void onException(Throwable e, File file, File target) {
// 如果travelFile抛出异常,则在此处进行处理。
e.printStackTrace();
}
}

private BlockingQueue<File> fileQueue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE);
private Thread visitFolderThread;
private Thread[] travelFileThreads;

public MultithreadFetcher(File sourceFolder, File targetFolder, int travelThreads) {
super();
visitFolderThread = new VisitFolderThread(sourceFolder, fileQueue);
travelFileThreads = new TravelFileThread[travelThreads];
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i] = new TravelFileThread(targetFolder, fileQueue);
}
}

/**
* 开始执行
*/
public void start(){
visitFolderThread.start();
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i].start();
}
}
/**
* 强行终止。请慎用。程序会自动关闭
*/
public void terminate(){
visitFolderThread.interrupt();
for(int i=0;i<travelFileThreads.length;i++){
travelFileThreads[i].interrupt();
}
}

/**
* 测试用例
*/
public static void main(String[] args) {
final File sourceFolder = new File("");
final File targetFolder = new File("");
final int travelThreads = 20;
MultithreadFetcher fetcher = new MultithreadFetcher(sourceFolder,targetFolder,travelThreads);
fetcher.start();
}

}
采取了数据分离的形式,消除了线程间互斥。效率不错。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息