您的位置:首页 > 其它

Phaser运行阶段性并发任务【在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表】

2016-07-17 22:49 483 查看

运行阶段性并发任务

Java 并发 API 提供的一个非常复杂且强大的功能是,能够使用Phaser类运行阶段性的并发任务。当某些并发任务是分成多个步骤来执行时,那么此机制是非常有用的。Phaser类提供的机制是在每个步骤的结尾同步线程,所以除非全部线程完成第一个步骤,否则线程不能开始进行第二步。

相对于其他同步应用,我们必须初始化Phaser类与这次同步操作有关的任务数,我们可以通过增加或者减少来不断的改变这个数。

在这个指南,你将学习如果使用Phaser类来同步3个并发任务。这3个任务会在3个不同的文件夹和它们的子文件夹中搜索扩展名是.log并在24小时内修改过的文件。这个任务被分成3个步骤:

在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表。

过滤第一步的列表中修改超过24小时的文件。

在操控台打印结果。

在步骤1和步骤2的结尾我们要检查列表是否为空。如果为空,那么线程直接结束运行并从phaser类中淘汰。

package com.packtpub.java7.concurrency.chapter3.recipe5.task;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
* This class search for files with an extension in a directory
*/
public class FileSearch implements Runnable {

/**
* Initial path for the search
*/
private String initPath;

/**
* Extension of the file we are searching for
*/
private String end;

/**
* List that stores the full path of the files that have the extension we are searching for
*/
private List<String> results;

/**
* Phaser to control the execution of the FileSearch objects. Their execution will be divided
* in three phases
*  1st: Look in the folder and its subfolders for the files with the extension
*  2nd: Filter the results. We only want the files modified today
*  3rd: Print the results
*/
private Phaser phaser;

/**
* Constructor of the class. Initializes its attributes
* @param initPath Initial path for the search
* @param end Extension of the files we are searching for
* @param phaser Phaser object to control the execution
*/
public FileSearch(String initPath, String end, Phaser phaser) {
this.initPath = initPath;
this.end = end;
this.phaser=phaser;
results=new ArrayList<>();
}

/**
* Main method of the class. See the comments inside to a better description of it
*/
@Override
public void run() {

// Waits for the creation of all the FileSearch objects
phaser.arriveAndAwaitAdvance();

System.out.printf("%s: Starting.\n",Thread.currentThread().getName());

// 1st Phase: Look for the files
File file = new File(initPath);
if (file.isDirectory()) {
directoryProcess(file);
}

// If no results, deregister in the phaser and ends
if (!checkResults()){
return;
}

// 2nd Phase: Filter the results
filterResults();

// If no results after the filter, deregister in the phaser and ends
if (!checkResults()){
return;
}

// 3rd Phase: Show info
showInfo();
phaser.arriveAndDeregister();
System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());

}

/**
* This method prints the final results of the search
*/
private void showInfo() {
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
}
// Waits for the end of all the FileSearch threads that are registered in the phaser
phaser.arriveAndAwaitAdvance();
}

/**
* This method checks if there are results after the execution of a phase. If there aren't
* results, deregister the thread of the phaser.
* @return true if there are results, false if not
*/
private boolean checkResults() {
if (results.isEmpty()) {
System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
// No results. Phase is completed but no more work to do. Deregister for the phaser
phaser.arriveAndDeregister();
return false;
} else {
// There are results. Phase is completed. Wait to continue with the next phase
System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
phaser.arriveAndAwaitAdvance();
return true;
}
}

/**
* Method that filter the results to delete the files modified more than a day before now
*/
private void filterResults() {
List<String> newResults=new ArrayList<>();
long actualDate=new Date().getTime();
for (int i=0; i<results.size(); i++){
File file=new File(results.get(i));
long fileDate=file.lastModified();

if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
newResults.add(results.get(i));
}
}
results=newResults;
}

/**
* Method that process a directory
*
* @param file
*            : Directory to process
*/
private void directoryProcess(File file) {

// Get the content of the directory
File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {
if (list[i].isDirectory()) {
// If is a directory, process it
directoryProcess(list[i]);
} else {
// If is a file, process it
fileProcess(list[i]);
}
}
}
}

/**
* Method that process a File
*
* @param file
*            : File to process
*/
private void fileProcess(File file) {
if (file.getName().endsWith(end)) {
results.add(file.getAbsolutePath());
}
}

public static void main(String[] args) {

// Creates a Phaser with three participants
Phaser phaser=new Phaser(3);

// Creates 3 FileSearch objects. Each of them search in different directory
FileSearch system=new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps=new FileSearch("C:\\Program Files","log",phaser);
FileSearch documents=new FileSearch("C:\\Documents And Settings","log",phaser);

// Creates a thread to run the system FileSearch and starts it
Thread systemThread=new Thread(system,"System");
systemThread.start();

// Creates a thread to run the apps FileSearch and starts it
Thread appsThread=new Thread(apps,"Apps");
appsThread.start();

// Creates a thread to run the documents  FileSearch and starts it
Thread documentsThread=new Thread(documents,"Documents");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.printf("Terminated: %s\n",phaser.isTerminated());

}
}


结果:

Documents: Starting.

Apps: Starting.

System: Starting.

Documents: Phase 1: 0 results.

Documents: Phase 1: End.

Apps: Phase 1: 6 results.

System: Phase 1: 22 results.

Apps: Phase 2: 1 results.

System: Phase 2: 4 results.

System: C:\Windows\inf\setupapi.app.log

Apps: C:\Program Files\Intel\WiFi\UnifiedLogging\ds-debug.log

System: C:\Windows\PFRO.log

System: C:\Windows\setupact.log

System: C:\Windows\System32\catroot2\edb.log

System: Work completed.

Apps: Work completed.

Terminated: true

它是怎么工作的…

这程序开始创建的 Phaser 对象是用来在每个phase的末端控制线程的同步。Phaser的构造函数接收参与者的数量作为参数。在这里,Phaser有3个参与者。这个数向Phaser表示 Phaser改变phase之前执行 arriveAndAwaitAdvance() 方法的线程数,并叫醒正在休眠的线程。

一旦Phaser被创建,我们运行3个线程分别执行3个不同的FileSearch对象。

在例子里,我们使用 Windows operating system 的路径。如果你使用的是其他操作系统,那么修改成适应你的环境的路径。

FileSearch对象的 run() 方法中的第一个指令是调用Phaser对象的 arriveAndAwaitAdvance() 方法。像之前提到的,Phaser知道我们要同步的线程的数量。当某个线程调用此方法,Phaser减少终结actual phase的线程数,并让这个线程进入休眠 直到全部其余线程结束phase。在run() 方法前面调用此方法,没有任何 FileSearch 线程可以开始他们的工作,直到全部线程被创建。

在phase 1 和 phase 2 的末端,我们检查phase 是否生成有元素的结果list,或者它没有生成结果且list为空。在第一个情况,checkResults() 方法之前提的调用 arriveAndAwaitAdvance()。在第二个情况,如果list为空,那就没有必要让线程继续了,就直接返回吧。但是你必须通知phaser,将会少一个参与者。为了这个,我们使用arriveAndDeregister()。它通知phaser线程结束了actual phase, 但是它将不会继续参见后面的phases,所以请phaser不要再等待它了。

在phase3的结尾实现了 showInfo() 方法, 调用了 phaser 的 arriveAndAwaitAdvance() 方法。这个调用,保证了全部线程在同一时间结束。当此方法结束执行,有一个调用phaser的arriveAndDeregister() 方法。这个调用,我们撤销了对phaser线程的注册,所以当全部线程结束时,phaser 有0个参与者。

最后,main() 方法等待3个线程的完成并调用phaser的 isTerminated() 方法。当phaser 有0个参与者时,它进入termination状态,此状态与此调用将会打印true到操控台。

Phaser 对象可能是在这2中状态:

Active: 当 Phaser 接受新的参与者注册,它进入这个状态,并且在每个phase的末端同步。 在此状态,Phaser像在这个指南里解释的那样工作。此状态不在Java 并发 API中。
Termination: 默认状态,当Phaser里全部的参与者都取消注册,它进入这个状态,所以这时 Phaser 有0个参与者。更具体的说,当onAdvance() 方法返回真值时,Phaser 是在这个状态里。如果你覆盖那个方法,你可以改变它的默认行为。当 Phaser 在这个状态,同步方法 arriveAndAwaitAdvance()会 立刻返回,不会做任何同步。


Phaser 类的一个显著特点是你不需要控制任何与phaser相关的方法的异常。不像其他同步应用,线程们在phaser休眠不会响应任何中断也不会抛出 InterruptedException 异常。只有一个异常会在下面的‘更多’里解释。

本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González

转自:http://ifeve.com/thread-synchronization-utilities-6-2/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  并发 api 扩展 线程 Phaser